1 module concurrency.stream.slide;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concepts;
6 
7 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream.
8 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) {
9   import std.traits : ReturnType;
10   alias Properties = StreamProperties!Stream;
11   static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do.");
12   import std.exception : enforce;
13   enforce(window > 0, "window must be greater than 0.");
14   enforce(step <= window, "step can't be bigger than window.");
15   return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp!Stream)(stream, window, step);
16 }
17 
18 template SlideStreamOp(Stream) {
19   alias Properties = StreamProperties!Stream;
20   alias DG = CollectDelegate!(Properties.ElementType[]);
21   struct SlideStreamOp(Receiver) {
22     alias Op = OpType!(Properties.Sender, Receiver);
23     size_t window, step;
24     Properties.ElementType[] arr;
25     DG dg;
26     Op op;
27     @disable this(ref return scope typeof(this) rhs);
28     @disable this(this);
29     this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted {
30       this.window = window;
31       this.step = step;
32       this.arr.reserve(window);
33       this.dg = dg;
34       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
35     }
36     void item(Properties.ElementType t) {
37       import std.algorithm : moveAll;
38       if (arr.length == window) {
39         arr[window-1] = t;
40       } else {
41         arr ~= t;
42         if (arr.length < window)
43           return;
44       }
45       dg(arr);
46       if (step != window) {
47         moveAll(arr[step .. $], arr[0..$-step]);
48         if (step > 1)
49           arr.length -= step;
50       }
51     }
52     void start() nothrow @safe {
53       op.start();
54     }
55   }
56 }