1 module concurrency.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 import concurrency.sender : isSender, OpType;
6 import concepts;
7 import std.traits : hasFunctionAttributes;
8 
9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender.
10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called.
11 
12 template CollectDelegate(ElementType) {
13   static if (is(ElementType == void)) {
14     alias CollectDelegate = void delegate() @safe shared;
15   } else {
16     alias CollectDelegate = void delegate(ElementType) @safe shared;
17   }
18 }
19 
20 /// checks that T is a Stream
21 void checkStream(T)() {
22   import std.traits : ReturnType;
23   alias DG = CollectDelegate!(T.ElementType);
24   static if (is(typeof(T.collect!DG)))
25     alias Sender = ReturnType!(T.collect!(DG));
26   else
27     alias Sender = ReturnType!(T.collect);
28   static assert (models!(Sender, isSender));
29 }
30 enum isStream(T) = is(typeof(checkStream!T));
31 
32 /// A polymorphic stream with elements of type T
33 interface StreamObjectBase(T) {
34   import concurrency.sender : SenderObjectBase;
35   alias ElementType = T;
36   static assert (models!(typeof(this), isStream));
37   alias DG = CollectDelegate!(ElementType);
38 
39   SenderObjectBase!void collect(DG dg) @safe;
40 }
41 
42 /// A class extending from StreamObjectBase that wraps any Stream
43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) {
44   import concurrency.receiver : ReceiverObjectBase;
45   static assert (models!(typeof(this), isStream));
46   private Stream stream;
47   this(Stream stream) {
48     this.stream = stream;
49   }
50   alias DG = CollectDelegate!(Stream.ElementType);
51 
52   SenderObjectBase!void collect(DG dg) @safe {
53     import concurrency.sender : toSenderObject;
54     return stream.collect(dg).toSenderObject();
55   }
56 }
57 
58 /// Converts any Stream to a polymorphic StreamObject
59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) {
60   return new StreamObjectImpl!(Stream)(stream);
61 }
62 
63 /*
64   catch?
65   combineLatest
66   count
67   debounce
68   distinctUntilChanged
69   drop
70   dropWhile
71   filter
72   first
73   firstOrNull
74   flatMapConcat
75   flatMapLatest
76   flatMapMerge
77   fold
78   map
79   mapLatest
80   merge
81   onEach
82   onEmpty
83   onStart
84   onSubscription
85   reduce (fold with no seed)
86   retry
87   retryWhen
88   runningReduce
89   sample
90   scan (like runningReduce but with initial value)
91   take
92   takeWhile
93   toList
94   transform
95   transformLatest
96   zip
97 */
98 
99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop
100 template loopStream(E) {
101   alias DG = CollectDelegate!(E);
102   auto loopStream(T)(T t) {
103     static struct LoopStream {
104       static assert(models!(typeof(this), isStream));
105       alias ElementType = E;
106       static struct LoopOp(Receiver) {
107         T t;
108         DG dg;
109         Receiver receiver;
110         @disable this(ref return scope typeof(this) rhs);
111         @disable this(this);
112         this(T t, DG dg, Receiver receiver) {
113           this.t = t;
114           this.dg = dg;
115           this.receiver = receiver;
116         }
117         void start() @trusted nothrow scope {
118           try {
119             t.loop(dg, receiver.getStopToken);
120           } catch (Exception e) {
121             receiver.setError(e);
122           }
123           if (receiver.getStopToken().isStopRequested)
124             receiver.setDone();
125           else
126             receiver.setValueOrError();
127         }
128       }
129       static struct LoopSender {
130         alias Value = void;
131         T t;
132         DG dg;
133         auto connect(Receiver)(return Receiver receiver) @safe scope return {
134           // ensure NRVO
135           auto op = LoopOp!(Receiver)(t, dg, receiver);
136           return op;
137         }
138       }
139       T t;
140       auto collect(DG dg) @safe {
141         return LoopSender(t, dg);
142       }
143     }
144     return LoopStream(t);
145   }
146 }
147 
148 /// Stream that emit the same value until cancelled
149 auto infiniteStream(T)(T t) {
150   alias DG = CollectDelegate!(T);
151   struct Loop {
152     T val;
153     void loop(StopToken)(DG emit, StopToken stopToken) {
154       while(!stopToken.isStopRequested)
155         emit(val);
156     }
157   }
158   return Loop(t).loopStream!T;
159 }
160 
161 /// Stream that emits from start..end or until cancelled
162 auto iotaStream(T)(T start, T end) {
163   alias DG = CollectDelegate!(T);
164   struct Loop {
165     T b,e;
166     void loop(StopToken)(DG emit, StopToken stopToken) {
167       foreach(i; b..e) {
168         emit(i);
169         if (stopToken.isStopRequested)
170           break;
171       }
172     }
173   }
174   return Loop(start, end).loopStream!T;
175 }
176 
177 /// Stream that emits each value from the array or until cancelled
178 auto arrayStream(T)(T[] arr) {
179   alias DG = CollectDelegate!(T);
180   struct Loop {
181     T[] arr;
182     void loop(StopToken)(DG emit, StopToken stopToken) @safe {
183       foreach(item; arr) {
184         emit(item);
185         if (stopToken.isStopRequested)
186           break;
187       }
188     }
189   }
190   return Loop(arr).loopStream!T;
191 }
192 
193 import core.time : Duration;
194 
195 auto intervalStream(Duration duration) {
196   alias DG = CollectDelegate!(void);
197   static struct ItemReceiver(Op) {
198     Op* op;
199     void setValue() @safe {
200       if (op.receiver.getStopToken.isStopRequested) {
201         op.receiver.setDone();
202         return;
203       }
204       try {
205         op.dg();
206         if (op.receiver.getStopToken.isStopRequested) {
207           op.receiver.setDone();
208           return;
209         }
210         op.start();
211       } catch (Exception e) {
212         op.receiver.setError(e);
213       }
214     }
215     void setDone() @safe nothrow {
216       op.receiver.setDone();
217     }
218     void setError(Exception e) @safe nothrow {
219       op.receiver.setError(e);
220     }
221     auto getStopToken() @safe {
222       return op.receiver.getStopToken();
223     }
224     auto getScheduler() @safe {
225       return op.receiver.getScheduler();
226     }
227   }
228   static struct Op(Receiver) {
229     import std.traits : ReturnType;
230     Duration duration;
231     DG dg;
232     Receiver receiver;
233     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
234     alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this)));
235     Op op;
236     @disable this(this);
237     @disable this(ref return scope typeof(this) rhs);
238     this(Duration duration, DG dg, Receiver receiver) {
239       this.duration = duration;
240       this.dg = dg;
241       this.receiver = receiver;
242     }
243     void start() @trusted nothrow {
244       try {
245         op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
246         op.start();
247       } catch (Exception e) {
248         receiver.setError(e);
249       }
250     }
251   }
252   static struct Sender {
253     alias Value = void;
254     Duration duration;
255     DG dg;
256     auto connect(Receiver)(return Receiver receiver) @safe scope return {
257       // ensure NRVO
258       auto op = Op!(Receiver)(duration, dg, receiver);
259       return op;
260     }
261   }
262   static struct IntervalStream {
263     alias ElementType = void;
264     Duration duration;
265     auto collect(DG dg) @safe {
266       return Sender(duration, dg);
267     }
268   }
269   return IntervalStream(duration);
270 }
271 
272 template StreamProperties(Stream) {
273   import std.traits : ReturnType;
274   alias ElementType = Stream.ElementType;
275   alias DG = CollectDelegate!(ElementType);
276   alias Sender = ReturnType!(Stream.collect);
277   alias Value = Sender.Value;
278 }
279 
280 /// takes the first n values from a stream or until cancelled
281 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) {
282   alias Properties = StreamProperties!Stream;
283   static struct TakeReceiver(Receiver) {
284     Receiver receiver;
285     StopSource stopSource;
286     static if (is(Properties.Sender.Value == void))
287       void setValue() @safe { receiver.setValue(); }
288     else
289       void setValue(Properties.Sender.Value e) @safe { receiver.setValue(e); }
290     void setDone() nothrow @safe {
291       import concurrency.receiver : setValueOrError;
292       static if (is(Properties.Sender.Value == void)) {
293         if (stopSource.isStopRequested)
294           receiver.setValueOrError();
295         else
296           receiver.setDone();
297       } else
298         receiver.setDone();
299     }
300     void setError(Exception e) nothrow @safe {
301       receiver.setError(e);
302     }
303     mixin ForwardExtensionPoints!receiver;
304   }
305   static struct TakeOp(Receiver) {
306     import concurrency.operations : withStopSource;
307     import std.traits : ReturnType;
308     alias SS = ReturnType!(withStopSource!(Properties.Sender));
309     alias Op = OpType!(SS, TakeReceiver!Receiver);
310     size_t n;
311     Properties.DG dg;
312     StopSource stopSource;
313     Op op;
314     @disable this(ref return scope typeof(this) rhs);
315     @disable this(this);
316     private this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope {
317       stopSource = new StopSource();
318       this.dg = dg;
319       this.n = n;
320       op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!Receiver(receiver, stopSource));
321     }
322     static if (is(Properties.ElementType == void)) {
323       private void item() {
324         dg();
325         /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that
326         n--;
327         if (n == 0)
328           stopSource.stop();
329       }
330     } else {
331       private void item(Properties.ElementType t) {
332         dg(t);
333         n--;
334         if (n == 0)
335           stopSource.stop();
336       }
337     }
338     void start() nothrow @trusted scope {
339       op.start();
340     }
341   }
342   import std.exception : enforce;
343   enforce(n > 0, "cannot take 0");
344   return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp)(stream, n);
345 }
346 
347 auto transform(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) {
348   import std.traits : ReturnType;
349   alias Properties = StreamProperties!Stream;
350   alias DG = CollectDelegate!(ReturnType!Fun);
351   static struct TransformStreamOp(Receiver) {
352     alias Op = OpType!(Properties.Sender, Receiver);
353     Fun fun;
354     DG dg;
355     Op op;
356     @disable this(ref return scope typeof(this) rhs);
357     @disable this(this);
358     this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted {
359       this.fun = fun;
360       this.dg = dg;
361       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
362     }
363     static if (is(Properties.ElementType == void))
364       void item() {
365         dg(fun());
366       }
367     else
368       void item(Properties.ElementType t) {
369         dg(fun(t));
370       }
371     void start() nothrow @safe {
372       op.start();
373     }
374   }
375   return fromStreamOp!(ReturnType!Fun, Properties.Value, TransformStreamOp)(stream, fun);
376 }
377 
378 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) {
379   alias DG = CollectDelegate!(StreamElementType);
380   static struct FromStreamSender {
381     alias Value = SenderValue;
382     Args args;
383     DG dg;
384     auto connect(Receiver)(return Receiver receiver) @safe scope return {
385       // ensure NRVO
386       auto op = Op!(Receiver)(args, dg, receiver);
387       return op;
388     }
389   }
390   static struct FromStream {
391     static assert(models!(typeof(this), isStream));
392     alias ElementType = StreamElementType;
393     Args args;
394     auto collect(DG dg) @safe {
395       return FromStreamSender(args, dg);
396     }
397   }
398   return FromStream(args);
399 }
400 
401 /// Applies an accumulator to each value from the source
402 auto scan(Stream, ScanFn, Seed)(Stream stream, scope ScanFn scanFn, Seed seed) if (models!(Stream, isStream)) {
403   import std.traits : ReturnType;
404   alias Properties = StreamProperties!Stream;
405   alias DG = CollectDelegate!(Seed);
406   static struct ScanStreamOp(Receiver) {
407     alias Op = OpType!(Properties.Sender, Receiver);
408     ScanFn scanFn;
409     Seed acc;
410     DG dg;
411     Op op;
412     @disable this(ref return scope typeof(this) rhs);
413     @disable this(this);
414     this(Stream stream, ScanFn scanFn, Seed seed, DG dg, Receiver receiver) @trusted {
415       this.scanFn = scanFn;
416       this.acc = seed;
417       this.dg = dg;
418       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
419     }
420     static if (is(Properties.ElementType == void))
421       void item() {
422         acc = scanFn(acc);
423         dg(acc);
424       }
425     else
426       void item(Properties.ElementType t) {
427         acc = scanFn(acc, t);
428         dg(acc);
429       }
430     void start() nothrow @safe {
431       op.start();
432     }
433   }
434   return fromStreamOp!(Seed, Properties.Value, ScanStreamOp)(stream, scanFn, seed);
435 }
436 
437 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored
438 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) {
439   import concurrency.operations.raceall;
440   import concurrency.bitfield : SharedBitField;
441   enum Flags : size_t {
442     locked = 0x1,
443     valid = 0x2
444   }
445   alias PropertiesBase = StreamProperties!StreamBase;
446   alias PropertiesTrigger = StreamProperties!StreamTrigger;
447   static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly");
448   alias DG = PropertiesBase.DG;
449   static struct SampleStreamOp(Receiver) {
450     import std.traits : ReturnType;
451     alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender));
452     alias Op = OpType!(RaceAllSender, Receiver);
453     DG dg;
454     Op op;
455     PropertiesBase.ElementType element;
456     shared SharedBitField!Flags state;
457     shared size_t sampleState;
458     @disable this(ref return scope inout typeof(this) rhs);
459     @disable this(this);
460     this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope {
461       this.dg = dg;
462       op = raceAll(base.collect(cast(PropertiesBase.DG)&item),
463                    trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver);
464     }
465     void item(PropertiesBase.ElementType t) {
466       import core.atomic : atomicOp;
467       with(state.lock(Flags.valid)) {
468         element = t;
469       }
470     }
471     void trigger() {
472       import core.atomic : atomicOp;
473       with(state.lock()) {
474         if (was(Flags.valid)) {
475           auto localElement = element;
476           release(Flags.valid);
477           dg(localElement);
478         }
479       }
480     }
481     void start() {
482       op.start();
483     }
484   }
485   return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp)(base, trigger);
486 }
487 
488 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) {
489   alias Properties = StreamProperties!Stream;
490   alias DG = Properties.DG;
491   static struct ViaStreamOp(Receiver) {
492     import std.traits : ReturnType;
493     import concurrency.operations.via : senderVia = via;
494     alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver);
495     Op op;
496     @disable this(ref return scope typeof(this) rhs);
497     @disable this(this);
498     this(Stream stream, Sender sender, DG dg, Receiver receiver) {
499       op = stream.collect(dg).senderVia(sender).connect(receiver);
500     }
501     void start() nothrow @safe {
502       op.start();
503     }
504   }
505   return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender);
506 }
507 
508 auto doneStream() {
509   alias DG = CollectDelegate!void;
510   static struct DoneStreamOp(Receiver) {
511     Receiver receiver;
512     @disable this(ref return scope typeof(this) rhs);
513     @disable this(this);
514     this(DG dg, Receiver receiver) {
515       this.receiver = receiver;
516     }
517     void start() nothrow @safe {
518       receiver.setDone();
519     }
520   }
521   return fromStreamOp!(void, void, DoneStreamOp)();
522 }
523 
524 auto errorStream(Exception e) {
525   alias DG = CollectDelegate!void;
526   static struct ErrorStreamOp(Receiver) {
527     Exception e;
528     Receiver receiver;
529     @disable this(ref return scope typeof(this) rhs);
530     @disable this(this);
531     this(Exception e, DG dg, Receiver receiver) {
532       this.e = e;
533       this.receiver = receiver;
534     }
535     void start() nothrow @safe {
536       receiver.setError(e);
537     }
538   }
539   return fromStreamOp!(void, void, ErrorStreamOp)(e);
540 }
541 
542 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection.
543 auto sharedStream(T)() {
544   import concurrency.slist;
545   alias DG = CollectDelegate!(T);
546   return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG));
547 }
548 
549 shared struct SharedStream(T) {
550   alias ElementType = T;
551   alias SubscriberDG = void delegate(T) nothrow @safe shared;
552   import concurrency.slist;
553   private {
554     alias DG = CollectDelegate!T;
555     static struct Op(Receiver) {
556       shared SharedStream!T source;
557       DG dg;
558       Receiver receiver;
559       StopCallback cb;
560       @disable this(ref return scope typeof(this) rhs);
561       @disable this(this);
562       void start() nothrow @trusted {
563         auto stopToken = receiver.getStopToken();
564         cb = stopToken.onStop(&(cast(shared)this).onStop);
565         if (stopToken.isStopRequested) {
566           cb.dispose();
567           receiver.setDone();
568         } else {
569           source.add(&(cast(shared)this).onItem);
570         }
571       }
572       void onStop() nothrow @safe shared {
573         with(unshared) {
574           source.remove(&this.onItem);
575           receiver.setDone();
576         }
577       }
578       void onItem(T element) nothrow @safe shared {
579         with(unshared) {
580           try {
581             dg(element);
582           } catch (Exception e) {
583             source.remove(&this.onItem);
584             cb.dispose();
585             receiver.setError(e);
586           }
587         }
588       }
589       private auto ref unshared() nothrow @trusted shared {
590         return cast()this;
591       }
592     }
593     static struct SharedStreamSender {
594       alias Value = void;
595       shared SharedStream!T source;
596       DG dg;
597       auto connect(Receiver)(return Receiver receiver) @safe scope return {
598         // ensure NRVO
599         auto op = Op!(Receiver)(source, dg, receiver);
600         return op;
601       }
602     }
603     shared SList!SubscriberDG dgs;
604   }
605   this(shared SList!SubscriberDG dgs) {
606     this.dgs = dgs;
607   }
608   void emit(T t) nothrow @trusted {
609     foreach(dg; dgs[])
610       dg(t);
611   }
612   private void remove(SubscriberDG dg) nothrow @trusted {
613     dgs.remove(dg);
614   }
615   private void add(SubscriberDG dg) nothrow @trusted {
616     dgs.pushBack(dg);
617   }
618   auto collect(DG dg) @safe {
619     return SharedStreamSender(this, dg);
620   }
621 }
622 
623 template SchedulerType(Receiver) {
624   import std.traits : ReturnType;
625   alias SchedulerType = ReturnType!(Receiver.getScheduler);
626 }
627 
628 private enum ThrottleFlags : size_t {
629   locked = 0x1,
630   value_produced = 0x2,
631   doneOrError_produced = 0x4,
632   timerArmed = 0x8,
633   timerRearming = 0x10,
634   counter = 0x20
635 }
636 
637 enum ThrottleEmitLogic: uint {
638   first, // emit the first item in the window
639   last // emit the last item in the window
640 };
641 enum ThrottleTimerLogic: uint {
642   noop, // don't reset the timer on new items
643   rearm // reset the timer on new items
644 };
645 
646 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items
647 auto throttleFirst(Stream)(Stream s, Duration d) {
648   return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d);
649 }
650 
651 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period
652 auto throttleLast(Stream)(Stream s, Duration d) {
653   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d);
654 }
655 
656 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence
657 auto debounce(Stream)(Stream s, Duration d) {
658   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d);
659 }
660 
661 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) {
662   import std.traits : ReturnType;
663   import concurrency.bitfield : SharedBitField;
664   import core.atomic : MemoryOrder;
665   alias Properties = StreamProperties!Stream;
666   alias DG = Properties.DG;
667   static struct SenderReceiver(Op) {
668     Op* state;
669     static if (is(Properties.Value == void))
670       void setValue() {
671         with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
672           state.process(newState);
673         }
674       }
675     else
676       void setValue(Properties.Value value) {
677         with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
678           state.value = value;
679           release();
680           state.process(newState);
681         }
682       }
683     void setDone() {
684       with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
685         state.process(newState);
686       }
687     }
688     void setError(Exception e) nothrow @safe {
689       state.setError(e);
690     }
691     auto getStopToken() {
692       return StopToken(state.stopSource);
693     }
694     auto getScheduler() {
695       return state.receiver.getScheduler();
696     }
697   }
698   static struct TimerReceiver(Op) {
699     Op* state;
700     void setValue() @safe {
701       with (state.flags.lock()) {
702         if (was(ThrottleFlags.timerRearming))
703           return;
704 
705         static if (!is(Properties.ElementType == void) && emitLogic == ThrottleEmitLogic.last)
706           auto item = state.item;
707         release(ThrottleFlags.timerArmed);
708         static if (emitLogic == ThrottleEmitLogic.last) {
709           static if (!is(Properties.ElementType == void))
710             state.push(item);
711           else
712             state.push();
713         }
714       }
715     }
716     void setDone() @safe nothrow {
717       // TODO: would be nice if we can merge in next update...
718       if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
719         return;
720       with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
721         state.process(newState);
722       }
723     }
724     void setError(Exception e) nothrow @safe {
725       // TODO: would be nice if we can merge in next lock...
726       if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
727         return;
728       state.setError(e);
729     }
730     auto getStopToken() {
731       return StopToken(state.timerStopSource);
732     }
733     auto getScheduler() {
734       return state.receiver.getScheduler();
735     }
736   }
737   template ThrottleStreamOp(Stream) {
738     static struct ThrottleStreamOp(Receiver) {
739       Duration dur;
740       DG dg;
741       Receiver receiver;
742       static if (emitLogic == ThrottleEmitLogic.last)
743         static if (!is(Properties.ElementType == void))
744           Properties.ElementType item;
745       static if (!is(Properties.Value == void))
746         Properties.Value value;
747       alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
748       StopSource stopSource;
749       StopSource timerStopSource;
750       StopCallback cb;
751       Exception exception;
752       alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this)));
753       alias TimerOp = OpType!(SchedulerAfterSender, TimerReceiver!(typeof(this)));
754       Op op;
755       TimerOp timerOp;
756       shared SharedBitField!ThrottleFlags flags;
757       @disable this(ref return scope inout typeof(this) rhs);
758       @disable this(this);
759       this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope {
760         this.dur = dur;
761         this.dg = dg;
762         this.receiver = receiver;
763         stopSource = new StopSource();
764         timerStopSource = new StopSource();
765         op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this))(&this));
766       }
767       static if (is(Properties.ElementType == void)) {
768         private void onItem() {
769           with (flags.update(ThrottleFlags.timerArmed)) {
770             if ((oldState & ThrottleFlags.timerArmed) == 0) {
771               static if (emitLogic == ThrottleEmitLogic.first) {
772                 if (!push(t))
773                   return;
774               }
775               armTimer();
776             } else {
777               static if (timerLogic == ThrottleTimerLogic.rearm) {
778                 // release();
779                 rearmTimer();
780               }
781             }
782           }
783         }
784         private bool push() {
785           try {
786             dg();
787             return true;
788           } catch (Exception e) {
789             with (flags.lock(ThrottleFlags.doneOrError_produced)) {
790               if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
791                 exception = e;
792               }
793               release();
794               process(newState);
795             }
796             return false;
797           }
798         }
799       } else {
800         private void onItem(Properties.ElementType t) {
801           with (flags.lock(ThrottleFlags.timerArmed)) {
802             static if (emitLogic == ThrottleEmitLogic.last)
803               item = t;
804             release();
805             if ((oldState & ThrottleFlags.timerArmed) == 0) {
806               static if (emitLogic == ThrottleEmitLogic.first) {
807                 if (!push(t))
808                   return;
809               }
810               armTimer();
811             } else {
812               static if (timerLogic == ThrottleTimerLogic.rearm) {
813                 rearmTimer();
814               }
815             }
816           }
817         }
818         private bool push(Properties.ElementType t) {
819           try {
820             dg(t);
821             return true;
822           } catch (Exception e) {
823             with (flags.lock(ThrottleFlags.doneOrError_produced)) {
824               if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
825                 exception = e;
826               }
827               release();
828               process(newState);
829             }
830             return false;
831           }
832         }
833       }
834       private void setError(Exception e) {
835         with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
836           if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
837             exception = e;
838           }
839           release();
840           process(newState);
841         }
842       }
843       void armTimer() {
844         timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this));
845         timerOp.start();
846       }
847       void rearmTimer() @trusted {
848         flags.update(ThrottleFlags.timerRearming);
849         timerStopSource.stop();
850 
851         auto localFlags = flags.load!(MemoryOrder.acq);
852         // if old timer happens to trigger anyway (or the source is done) we can stop
853         if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0)
854           return;
855 
856         timerStopSource.reset();
857 
858         flags.update(0,0,ThrottleFlags.timerRearming);
859         timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this));
860         timerOp.start();
861       }
862       void process(size_t newState) {
863         auto count = newState / ThrottleFlags.counter;
864         bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0);
865 
866         if (!isDone) {
867           stopSource.stop();
868           timerStopSource.stop();
869           return;
870         }
871 
872         cb.dispose();
873 
874         if (receiver.getStopToken().isStopRequested)
875           receiver.setDone();
876         else if ((newState & ThrottleFlags.value_produced) > 0) {
877           static if (emitLogic == ThrottleEmitLogic.last) {
878             if ((newState & ThrottleFlags.timerArmed) > 0) {
879               try {
880                 static if (!is(Properties.ElementType == void))
881                   dg(item);
882                 else
883                   dg();
884               } catch (Exception e) {
885                 receiver.setError(e);
886                 return;
887               }
888             }
889           }
890           import concurrency.receiver : setValueOrError;
891           static if (is(Properties.Value == void))
892             receiver.setValueOrError();
893           else
894             receiver.setValueOrError(value);
895         } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) {
896           if (exception)
897             receiver.setError(exception);
898           else
899             receiver.setDone();
900         }
901       }
902       private void stop() @trusted nothrow {
903         stopSource.stop();
904         timerStopSource.stop();
905       }
906       void start() @trusted nothrow scope {
907         cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload
908         op.start();
909       }
910     }
911   }
912   return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream))(stream, dur);
913 }
914 
915 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream.
916 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) {
917   import std.traits : ReturnType;
918   alias Properties = StreamProperties!Stream;
919   static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do.");
920   import std.exception : enforce;
921   enforce(window > 0, "window must be greater than 0.");
922   enforce(step <= window, "step can't be bigger than window.");
923   alias DG = CollectDelegate!(Properties.ElementType[]);
924   static struct SlideStreamOp(Receiver) {
925     alias Op = OpType!(Properties.Sender, Receiver);
926     size_t window, step;
927     Properties.ElementType[] arr;
928     DG dg;
929     Op op;
930     @disable this(ref return scope typeof(this) rhs);
931     @disable this(this);
932     this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted {
933       this.window = window;
934       this.step = step;
935       this.arr.reserve(window);
936       this.dg = dg;
937       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
938     }
939     void item(Properties.ElementType t) {
940       import std.algorithm : moveAll;
941       if (arr.length == window) {
942         arr[window-1] = t;
943       } else {
944         arr ~= t;
945         if (arr.length < window)
946           return;
947       }
948       dg(arr);
949       if (step != window) {
950         moveAll(arr[step .. $], arr[0..$-step]);
951         if (step > 1)
952           arr.length -= step;
953       }
954     }
955     void start() nothrow @safe {
956       op.start();
957     }
958   }
959   return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp)(stream, window, step);
960 }
961 
962 /// toList collects all the stream's values and emits the array as a Sender
963 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) {
964   alias Properties = StreamProperties!Stream;
965   static assert(is(Properties.Value == void), "sender must produce void for toList to work");
966   static struct ToListReceiver(Op) {
967     Op* op;
968     void setValue() @safe {
969       op.receiver.setValue(op.arr);
970     }
971     void setDone() @safe nothrow {
972       op.receiver.setDone();
973     }
974     void setError(Exception e) nothrow @safe {
975       op.receiver.setError(e);
976     }
977     auto getStopToken() nothrow @safe {
978       return op.receiver.getStopToken();
979     }
980     auto getScheduler() nothrow @safe {
981       return op.receiver.getScheduler();
982     }
983   }
984   static struct ToListOp(Receiver) {
985     alias Op = OpType!(Properties.Sender, ToListReceiver!(typeof(this)));
986     Op op;
987     Receiver receiver;
988     Properties.ElementType[] arr;
989     @disable this(this);
990     @disable this(ref return scope typeof(this) rhs);
991     this(Stream stream, return Receiver receiver) @trusted scope return {
992       this.receiver = receiver;
993       op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(typeof(this))(&this));
994     }
995     void item(Properties.ElementType t) {
996       arr ~= t;
997     }
998     void start() nothrow @safe {
999       op.start();
1000     }
1001   }
1002   static struct ToListSender {
1003     alias Value = Properties.ElementType[];
1004     Stream stream;
1005     auto connect(Receiver)(return Receiver receiver) @safe scope return {
1006       // ensure NRVO
1007       auto op = ToListOp!(Receiver)(stream, receiver);
1008       return op;
1009     }
1010   }
1011   return ToListSender(stream);
1012 }