1 module concurrency.stream.slide; 2 3 import concurrency.stream.stream; 4 import concurrency.sender : OpType; 5 import concepts; 6 7 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream. 8 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) { 9 import std.traits : ReturnType; 10 alias Properties = StreamProperties!Stream; 11 static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do."); 12 import std.exception : enforce; 13 enforce(window > 0, "window must be greater than 0."); 14 enforce(step <= window, "step can't be bigger than window."); 15 return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp!Stream)(stream, window, step); 16 } 17 18 template SlideStreamOp(Stream) { 19 alias Properties = StreamProperties!Stream; 20 alias DG = CollectDelegate!(Properties.ElementType[]); 21 struct SlideStreamOp(Receiver) { 22 alias Op = OpType!(Properties.Sender, Receiver); 23 size_t window, step; 24 Properties.ElementType[] arr; 25 DG dg; 26 Op op; 27 @disable this(ref return scope typeof(this) rhs); 28 @disable this(this); 29 this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted { 30 this.window = window; 31 this.step = step; 32 this.arr.reserve(window); 33 this.dg = dg; 34 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 35 } 36 void item(Properties.ElementType t) { 37 import std.algorithm : moveAll; 38 if (arr.length == window) { 39 arr[window-1] = t; 40 } else { 41 arr ~= t; 42 if (arr.length < window) 43 return; 44 } 45 dg(arr); 46 if (step != window) { 47 moveAll(arr[step .. $], arr[0..$-step]); 48 if (step > 1) 49 arr.length -= step; 50 } 51 } 52 void start() nothrow @safe { 53 op.start(); 54 } 55 } 56 }