1 module concurrency.stream.slide;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concepts;
6 
7 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream.
8 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) {
9   import std.traits : ReturnType;
10   alias Properties = StreamProperties!Stream;
11   static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do.");
12   import std.exception : enforce;
13   enforce(window > 0, "window must be greater than 0.");
14   enforce(step > 0, "step must be greated than 0.");
15   return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp!Stream)(stream, window, step);
16 }
17 
18 template SlideStreamOp(Stream) {
19   alias Properties = StreamProperties!Stream;
20   alias DG = CollectDelegate!(Properties.ElementType[]);
21   struct SlideStreamOp(Receiver) {
22     alias Op = OpType!(Properties.Sender, Receiver);
23     size_t window, step, skip;
24     Properties.ElementType[] arr;
25     DG dg;
26     Op op;
27     @disable this(ref return scope typeof(this) rhs);
28     @disable this(this);
29     this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted {
30       this.window = window;
31       this.step = step;
32       this.arr.reserve(window);
33       this.dg = dg;
34       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
35     }
36     void item(Properties.ElementType t) {
37       if (skip > 0) {
38         skip--;
39         return;
40       }
41       import std.algorithm : moveAll;
42       if (arr.length == window) {
43         arr[window-1] = t;
44       } else {
45         arr ~= t;
46         if (arr.length < window)
47           return;
48       }
49       dg(arr);
50       if (step < window) {
51         moveAll(arr[step .. $], arr[0..$-step]);
52         if (step > 1)
53           arr.length -= step;
54       } else {
55         arr.length = 0;
56         skip = step - window;
57       }
58     }
59     void start() nothrow @safe {
60       op.start();
61     }
62   }
63 }