1 module concurrency.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 import concurrency.sender : isSender, OpType;
6 import concepts;
7 import std.traits : hasFunctionAttributes;
8 
9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender.
10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called.
11 
12 template CollectDelegate(ElementType) {
13   static if (is(ElementType == void)) {
14     alias CollectDelegate = void delegate() @safe shared;
15   } else {
16     alias CollectDelegate = void delegate(ElementType) @safe shared;
17   }
18 }
19 
20 /// checks that T is a Stream
21 void checkStream(T)() {
22   import std.traits : ReturnType;
23   alias DG = CollectDelegate!(T.ElementType);
24   static if (is(typeof(T.collect!DG)))
25     alias Sender = ReturnType!(T.collect!(DG));
26   else
27     alias Sender = ReturnType!(T.collect);
28   static assert (models!(Sender, isSender));
29 }
30 enum isStream(T) = is(typeof(checkStream!T));
31 
32 /// A polymorphic stream with elements of type T
33 interface StreamObjectBase(T) {
34   import concurrency.sender : SenderObjectBase;
35   alias ElementType = T;
36   static assert (models!(typeof(this), isStream));
37   alias DG = CollectDelegate!(ElementType);
38 
39   SenderObjectBase!void collect(DG dg) @safe;
40 }
41 
42 /// A class extending from StreamObjectBase that wraps any Stream
43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) {
44   import concurrency.receiver : ReceiverObjectBase;
45   static assert (models!(typeof(this), isStream));
46   private Stream stream;
47   this(Stream stream) {
48     this.stream = stream;
49   }
50   alias DG = CollectDelegate!(Stream.ElementType);
51 
52   SenderObjectBase!void collect(DG dg) @safe {
53     import concurrency.sender : toSenderObject;
54     return stream.collect(dg).toSenderObject();
55   }
56 }
57 
58 /// Converts any Stream to a polymorphic StreamObject
59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) {
60   return new StreamObjectImpl!(Stream)(stream);
61 }
62 
63 /*
64   catch?
65   combineLatest
66   count
67   debounce
68   distinctUntilChanged
69   drop
70   dropWhile
71   filter
72   first
73   firstOrNull
74   flatMapConcat
75   flatMapLatest
76   flatMapMerge
77   fold
78   map
79   mapLatest
80   merge
81   onEach
82   onEmpty
83   onStart
84   onSubscription
85   reduce (fold with no seed)
86   retry
87   retryWhen
88   runningReduce
89   sample
90   scan (like runningReduce but with initial value)
91   take
92   takeWhile
93   toList
94   transform
95   transformLatest
96   zip
97 */
98 
99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop
100 template loopStream(E) {
101   alias DG = CollectDelegate!(E);
102   auto loopStream(T)(T t) {
103     static struct LoopStream {
104       static assert(models!(typeof(this), isStream));
105       alias ElementType = E;
106       static struct LoopOp(Receiver) {
107         T t;
108         DG dg;
109         Receiver receiver;
110         @disable this(ref return scope typeof(this) rhs);
111         @disable this(this);
112         this(T t, DG dg, Receiver receiver) {
113           this.t = t;
114           this.dg = dg;
115           this.receiver = receiver;
116         }
117         void start() @trusted nothrow scope {
118           try {
119             t.loop(dg, receiver.getStopToken);
120           } catch (Exception e) {
121             receiver.setError(e);
122           }
123           if (receiver.getStopToken().isStopRequested)
124             receiver.setDone();
125           else
126             receiver.setValueOrError();
127         }
128       }
129       static struct LoopSender {
130         alias Value = void;
131         T t;
132         DG dg;
133         auto connect(Receiver)(return Receiver receiver) @safe scope return {
134           // ensure NRVO
135           auto op = LoopOp!(Receiver)(t, dg, receiver);
136           return op;
137         }
138       }
139       T t;
140       auto collect(DG dg) @safe {
141         return LoopSender(t, dg);
142       }
143     }
144     return LoopStream(t);
145   }
146 }
147 
148 /// Stream that emit the same value until cancelled
149 auto infiniteStream(T)(T t) {
150   alias DG = CollectDelegate!(T);
151   struct Loop {
152     T val;
153     void loop(StopToken)(DG emit, StopToken stopToken) {
154       while(!stopToken.isStopRequested)
155         emit(val);
156     }
157   }
158   return Loop(t).loopStream!T;
159 }
160 
161 /// Stream that emits from start..end or until cancelled
162 auto iotaStream(T)(T start, T end) {
163   alias DG = CollectDelegate!(T);
164   struct Loop {
165     T b,e;
166     void loop(StopToken)(DG emit, StopToken stopToken) {
167       foreach(i; b..e) {
168         emit(i);
169         if (stopToken.isStopRequested)
170           break;
171       }
172     }
173   }
174   return Loop(start, end).loopStream!T;
175 }
176 
177 /// Stream that emits each value from the array or until cancelled
178 auto arrayStream(T)(T[] arr) {
179   alias DG = CollectDelegate!(T);
180   struct Loop {
181     T[] arr;
182     void loop(StopToken)(DG emit, StopToken stopToken) @safe {
183       foreach(item; arr) {
184         emit(item);
185         if (stopToken.isStopRequested)
186           break;
187       }
188     }
189   }
190   return Loop(arr).loopStream!T;
191 }
192 
193 import core.time : Duration;
194 
195 auto intervalStream(Duration duration) {
196   alias DG = CollectDelegate!(void);
197   static struct ItemReceiver(Op) {
198     Op* op;
199     void setValue() @safe {
200       if (op.receiver.getStopToken.isStopRequested) {
201         op.receiver.setDone();
202         return;
203       }
204       try {
205         op.dg();
206         if (op.receiver.getStopToken.isStopRequested) {
207           op.receiver.setDone();
208           return;
209         }
210         op.start();
211       } catch (Exception e) {
212         op.receiver.setError(e);
213       }
214     }
215     void setDone() @safe nothrow {
216       op.receiver.setDone();
217     }
218     void setError(Exception e) @safe nothrow {
219       op.receiver.setError(e);
220     }
221     auto getStopToken() @safe {
222       return op.receiver.getStopToken();
223     }
224     auto getScheduler() @safe {
225       return op.receiver.getScheduler();
226     }
227   }
228   static struct Op(Receiver) {
229     import std.traits : ReturnType;
230     Duration duration;
231     DG dg;
232     Receiver receiver;
233     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
234     alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this)));
235     Op op;
236     @disable this(this);
237     @disable this(ref return scope typeof(this) rhs);
238     this(Duration duration, DG dg, Receiver receiver) {
239       this.duration = duration;
240       this.dg = dg;
241       this.receiver = receiver;
242     }
243     void start() @trusted nothrow {
244       try {
245         op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
246         op.start();
247       } catch (Exception e) {
248         receiver.setError(e);
249       }
250     }
251   }
252   static struct Sender {
253     alias Value = void;
254     Duration duration;
255     DG dg;
256     auto connect(Receiver)(return Receiver receiver) @safe scope return {
257       // ensure NRVO
258       auto op = Op!(Receiver)(duration, dg, receiver);
259       return op;
260     }
261   }
262   static struct IntervalStream {
263     alias ElementType = void;
264     Duration duration;
265     auto collect(DG dg) @safe {
266       return Sender(duration, dg);
267     }
268   }
269   return IntervalStream(duration);
270 }
271 
272 template StreamProperties(Stream) {
273   import std.traits : ReturnType;
274   alias ElementType = Stream.ElementType;
275   alias DG = CollectDelegate!(ElementType);
276   alias Sender = ReturnType!(Stream.collect);
277   alias Value = Sender.Value;
278 }
279 
280 /// takes the first n values from a stream or until cancelled
281 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) {
282   alias Properties = StreamProperties!Stream;
283   static struct TakeReceiver(Receiver) {
284     Receiver receiver;
285     StopSource stopSource;
286     static if (is(Properties.Sender.Value == void))
287       void setValue() @safe { receiver.setValue(); }
288     else
289       void setValue(Properties.Sender.Value e) @safe { receiver.setValue(e); }
290     void setDone() nothrow @safe {
291       import concurrency.receiver : setValueOrError;
292       static if (is(Properties.Sender.Value == void)) {
293         if (stopSource.isStopRequested)
294           receiver.setValueOrError();
295         else
296           receiver.setDone();
297       } else
298         receiver.setDone();
299     }
300     void setError(Exception e) nothrow @safe {
301       receiver.setError(e);
302     }
303     mixin ForwardExtensionPoints!receiver;
304   }
305   static struct TakeOp(Receiver) {
306     import concurrency.operations : withStopSource;
307     import std.traits : ReturnType;
308     alias SS = ReturnType!(withStopSource!(Properties.Sender));
309     alias Op = OpType!(SS, TakeReceiver!Receiver);
310     size_t n;
311     Properties.DG dg;
312     StopSource stopSource;
313     Op op;
314     @disable this(ref return scope typeof(this) rhs);
315     @disable this(this);
316     private this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope {
317       stopSource = new StopSource();
318       this.dg = dg;
319       this.n = n;
320       op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!Receiver(receiver, stopSource));
321     }
322     static if (is(Properties.ElementType == void)) {
323       private void item() {
324         dg();
325         /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that
326         n--;
327         if (n == 0)
328           stopSource.stop();
329       }
330     } else {
331       private void item(Properties.ElementType t) {
332         dg(t);
333         n--;
334         if (n == 0)
335           stopSource.stop();
336       }
337     }
338     void start() nothrow @trusted scope {
339       op.start();
340     }
341   }
342   import std.exception : enforce;
343   enforce(n > 0, "cannot take 0");
344   return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp)(stream, n);
345 }
346 
347 auto transform(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) {
348   import std.traits : ReturnType;
349   alias Properties = StreamProperties!Stream;
350   alias InnerElementType = ReturnType!Fun;
351   alias DG = CollectDelegate!(InnerElementType);
352   static struct TransformStreamOp(Receiver) {
353     alias Op = OpType!(Properties.Sender, Receiver);
354     Fun fun;
355     DG dg;
356     Op op;
357     @disable this(ref return scope typeof(this) rhs);
358     @disable this(this);
359     this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted {
360       this.fun = fun;
361       this.dg = dg;
362       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
363     }
364     static if (is(Properties.ElementType == void))
365       void item() {
366         static if (is(InnerElementType == void)) {
367           fun();
368           dg();
369         } else
370           dg(fun());
371       }
372     else
373       void item(Properties.ElementType t) {
374         static if (is(InnerElementType == void)) {
375           fun(t);
376           dg();
377         } else
378           dg(fun(t));
379       }
380     void start() nothrow @safe {
381       op.start();
382     }
383   }
384   return fromStreamOp!(ReturnType!Fun, Properties.Value, TransformStreamOp)(stream, fun);
385 }
386 
387 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) {
388   alias DG = CollectDelegate!(StreamElementType);
389   static struct FromStreamSender {
390     alias Value = SenderValue;
391     Args args;
392     DG dg;
393     auto connect(Receiver)(return Receiver receiver) @safe scope return {
394       // ensure NRVO
395       auto op = Op!(Receiver)(args, dg, receiver);
396       return op;
397     }
398   }
399   static struct FromStream {
400     static assert(models!(typeof(this), isStream));
401     alias ElementType = StreamElementType;
402     Args args;
403     auto collect(DG dg) @safe {
404       return FromStreamSender(args, dg);
405     }
406   }
407   return FromStream(args);
408 }
409 
410 /// Applies an accumulator to each value from the source
411 auto scan(Stream, ScanFn, Seed)(Stream stream, scope ScanFn scanFn, Seed seed) if (models!(Stream, isStream)) {
412   import std.traits : ReturnType;
413   alias Properties = StreamProperties!Stream;
414   alias DG = CollectDelegate!(Seed);
415   static struct ScanStreamOp(Receiver) {
416     alias Op = OpType!(Properties.Sender, Receiver);
417     ScanFn scanFn;
418     Seed acc;
419     DG dg;
420     Op op;
421     @disable this(ref return scope typeof(this) rhs);
422     @disable this(this);
423     this(Stream stream, ScanFn scanFn, Seed seed, DG dg, Receiver receiver) @trusted {
424       this.scanFn = scanFn;
425       this.acc = seed;
426       this.dg = dg;
427       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
428     }
429     static if (is(Properties.ElementType == void))
430       void item() {
431         acc = scanFn(acc);
432         dg(acc);
433       }
434     else
435       void item(Properties.ElementType t) {
436         acc = scanFn(acc, t);
437         dg(acc);
438       }
439     void start() nothrow @safe {
440       op.start();
441     }
442   }
443   return fromStreamOp!(Seed, Properties.Value, ScanStreamOp)(stream, scanFn, seed);
444 }
445 
446 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored
447 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) {
448   import concurrency.operations.raceall;
449   import concurrency.bitfield : SharedBitField;
450   enum Flags : size_t {
451     locked = 0x1,
452     valid = 0x2
453   }
454   alias PropertiesBase = StreamProperties!StreamBase;
455   alias PropertiesTrigger = StreamProperties!StreamTrigger;
456   static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly");
457   alias DG = PropertiesBase.DG;
458   static struct SampleStreamOp(Receiver) {
459     import std.traits : ReturnType;
460     alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender));
461     alias Op = OpType!(RaceAllSender, Receiver);
462     DG dg;
463     Op op;
464     PropertiesBase.ElementType element;
465     shared SharedBitField!Flags state;
466     shared size_t sampleState;
467     @disable this(ref return scope inout typeof(this) rhs);
468     @disable this(this);
469     this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope {
470       this.dg = dg;
471       op = raceAll(base.collect(cast(PropertiesBase.DG)&item),
472                    trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver);
473     }
474     void item(PropertiesBase.ElementType t) {
475       import core.atomic : atomicOp;
476       with(state.lock(Flags.valid)) {
477         element = t;
478       }
479     }
480     void trigger() {
481       import core.atomic : atomicOp;
482       with(state.lock()) {
483         if (was(Flags.valid)) {
484           auto localElement = element;
485           release(Flags.valid);
486           dg(localElement);
487         }
488       }
489     }
490     void start() {
491       op.start();
492     }
493   }
494   return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp)(base, trigger);
495 }
496 
497 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) {
498   alias Properties = StreamProperties!Stream;
499   alias DG = Properties.DG;
500   static struct ViaStreamOp(Receiver) {
501     import std.traits : ReturnType;
502     import concurrency.operations.via : senderVia = via;
503     alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver);
504     Op op;
505     @disable this(ref return scope typeof(this) rhs);
506     @disable this(this);
507     this(Stream stream, Sender sender, DG dg, Receiver receiver) {
508       op = stream.collect(dg).senderVia(sender).connect(receiver);
509     }
510     void start() nothrow @safe {
511       op.start();
512     }
513   }
514   return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender);
515 }
516 
517 auto doneStream() {
518   alias DG = CollectDelegate!void;
519   static struct DoneStreamOp(Receiver) {
520     Receiver receiver;
521     @disable this(ref return scope typeof(this) rhs);
522     @disable this(this);
523     this(DG dg, Receiver receiver) {
524       this.receiver = receiver;
525     }
526     void start() nothrow @safe {
527       receiver.setDone();
528     }
529   }
530   return fromStreamOp!(void, void, DoneStreamOp)();
531 }
532 
533 auto errorStream(Exception e) {
534   alias DG = CollectDelegate!void;
535   static struct ErrorStreamOp(Receiver) {
536     Exception e;
537     Receiver receiver;
538     @disable this(ref return scope typeof(this) rhs);
539     @disable this(this);
540     this(Exception e, DG dg, Receiver receiver) {
541       this.e = e;
542       this.receiver = receiver;
543     }
544     void start() nothrow @safe {
545       receiver.setError(e);
546     }
547   }
548   return fromStreamOp!(void, void, ErrorStreamOp)(e);
549 }
550 
551 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection.
552 auto sharedStream(T)() {
553   import concurrency.slist;
554   alias DG = CollectDelegate!(T);
555   return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG));
556 }
557 
558 shared struct SharedStream(T) {
559   alias ElementType = T;
560   alias SubscriberDG = void delegate(T) nothrow @safe shared;
561   import concurrency.slist;
562   private {
563     alias DG = CollectDelegate!T;
564     static struct Op(Receiver) {
565       shared SharedStream!T source;
566       DG dg;
567       Receiver receiver;
568       StopCallback cb;
569       @disable this(ref return scope typeof(this) rhs);
570       @disable this(this);
571       void start() nothrow @trusted {
572         auto stopToken = receiver.getStopToken();
573         cb = stopToken.onStop(&(cast(shared)this).onStop);
574         if (stopToken.isStopRequested) {
575           cb.dispose();
576           receiver.setDone();
577         } else {
578           source.add(&(cast(shared)this).onItem);
579         }
580       }
581       void onStop() nothrow @safe shared {
582         with(unshared) {
583           source.remove(&this.onItem);
584           receiver.setDone();
585         }
586       }
587       void onItem(T element) nothrow @safe shared {
588         with(unshared) {
589           try {
590             dg(element);
591           } catch (Exception e) {
592             source.remove(&this.onItem);
593             cb.dispose();
594             receiver.setError(e);
595           }
596         }
597       }
598       private auto ref unshared() nothrow @trusted shared {
599         return cast()this;
600       }
601     }
602     static struct SharedStreamSender {
603       alias Value = void;
604       shared SharedStream!T source;
605       DG dg;
606       auto connect(Receiver)(return Receiver receiver) @safe scope return {
607         // ensure NRVO
608         auto op = Op!(Receiver)(source, dg, receiver);
609         return op;
610       }
611     }
612     shared SList!SubscriberDG dgs;
613   }
614   this(shared SList!SubscriberDG dgs) {
615     this.dgs = dgs;
616   }
617   void emit(T t) nothrow @trusted {
618     foreach(dg; dgs[])
619       dg(t);
620   }
621   private void remove(SubscriberDG dg) nothrow @trusted {
622     dgs.remove(dg);
623   }
624   private void add(SubscriberDG dg) nothrow @trusted {
625     dgs.pushBack(dg);
626   }
627   auto collect(DG dg) @safe {
628     return SharedStreamSender(this, dg);
629   }
630 }
631 
632 template SchedulerType(Receiver) {
633   import std.traits : ReturnType;
634   alias SchedulerType = ReturnType!(Receiver.getScheduler);
635 }
636 
637 private enum ThrottleFlags : size_t {
638   locked = 0x1,
639   value_produced = 0x2,
640   doneOrError_produced = 0x4,
641   timerArmed = 0x8,
642   timerRearming = 0x10,
643   counter = 0x20
644 }
645 
646 enum ThrottleEmitLogic: uint {
647   first, // emit the first item in the window
648   last // emit the last item in the window
649 };
650 enum ThrottleTimerLogic: uint {
651   noop, // don't reset the timer on new items
652   rearm // reset the timer on new items
653 };
654 
655 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items
656 auto throttleFirst(Stream)(Stream s, Duration d) {
657   return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d);
658 }
659 
660 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period
661 auto throttleLast(Stream)(Stream s, Duration d) {
662   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d);
663 }
664 
665 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence
666 auto debounce(Stream)(Stream s, Duration d) {
667   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d);
668 }
669 
670 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) {
671   import std.traits : ReturnType;
672   import concurrency.bitfield : SharedBitField;
673   import core.atomic : MemoryOrder;
674   alias Properties = StreamProperties!Stream;
675   alias DG = Properties.DG;
676   static struct SenderReceiver(Op) {
677     Op* state;
678     static if (is(Properties.Value == void))
679       void setValue() {
680         with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
681           state.process(newState);
682         }
683       }
684     else
685       void setValue(Properties.Value value) {
686         with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
687           state.value = value;
688           release();
689           state.process(newState);
690         }
691       }
692     void setDone() {
693       with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
694         state.process(newState);
695       }
696     }
697     void setError(Exception e) nothrow @safe {
698       state.setError(e);
699     }
700     auto getStopToken() {
701       return StopToken(state.stopSource);
702     }
703     auto getScheduler() {
704       return state.receiver.getScheduler();
705     }
706   }
707   static struct TimerReceiver(Op) {
708     Op* state;
709     void setValue() @safe {
710       with (state.flags.lock()) {
711         if (was(ThrottleFlags.timerRearming))
712           return;
713 
714         static if (!is(Properties.ElementType == void) && emitLogic == ThrottleEmitLogic.last)
715           auto item = state.item;
716         release(ThrottleFlags.timerArmed);
717         static if (emitLogic == ThrottleEmitLogic.last) {
718           static if (!is(Properties.ElementType == void))
719             state.push(item);
720           else
721             state.push();
722         }
723       }
724     }
725     void setDone() @safe nothrow {
726       // TODO: would be nice if we can merge in next update...
727       if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
728         return;
729       with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
730         state.process(newState);
731       }
732     }
733     void setError(Exception e) nothrow @safe {
734       // TODO: would be nice if we can merge in next lock...
735       if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
736         return;
737       state.setError(e);
738     }
739     auto getStopToken() {
740       return StopToken(state.timerStopSource);
741     }
742     auto getScheduler() {
743       return state.receiver.getScheduler();
744     }
745   }
746   template ThrottleStreamOp(Stream) {
747     static struct ThrottleStreamOp(Receiver) {
748       Duration dur;
749       DG dg;
750       Receiver receiver;
751       static if (emitLogic == ThrottleEmitLogic.last)
752         static if (!is(Properties.ElementType == void))
753           Properties.ElementType item;
754       static if (!is(Properties.Value == void))
755         Properties.Value value;
756       alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
757       StopSource stopSource;
758       StopSource timerStopSource;
759       StopCallback cb;
760       Exception exception;
761       alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this)));
762       alias TimerOp = OpType!(SchedulerAfterSender, TimerReceiver!(typeof(this)));
763       Op op;
764       TimerOp timerOp;
765       shared SharedBitField!ThrottleFlags flags;
766       @disable this(ref return scope inout typeof(this) rhs);
767       @disable this(this);
768       this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope {
769         this.dur = dur;
770         this.dg = dg;
771         this.receiver = receiver;
772         stopSource = new StopSource();
773         timerStopSource = new StopSource();
774         op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this))(&this));
775       }
776       static if (is(Properties.ElementType == void)) {
777         private void onItem() {
778           with (flags.update(ThrottleFlags.timerArmed)) {
779             if ((oldState & ThrottleFlags.timerArmed) == 0) {
780               static if (emitLogic == ThrottleEmitLogic.first) {
781                 if (!push(t))
782                   return;
783               }
784               armTimer();
785             } else {
786               static if (timerLogic == ThrottleTimerLogic.rearm) {
787                 // release();
788                 rearmTimer();
789               }
790             }
791           }
792         }
793         private bool push() {
794           try {
795             dg();
796             return true;
797           } catch (Exception e) {
798             with (flags.lock(ThrottleFlags.doneOrError_produced)) {
799               if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
800                 exception = e;
801               }
802               release();
803               process(newState);
804             }
805             return false;
806           }
807         }
808       } else {
809         private void onItem(Properties.ElementType t) {
810           with (flags.lock(ThrottleFlags.timerArmed)) {
811             static if (emitLogic == ThrottleEmitLogic.last)
812               item = t;
813             release();
814             if ((oldState & ThrottleFlags.timerArmed) == 0) {
815               static if (emitLogic == ThrottleEmitLogic.first) {
816                 if (!push(t))
817                   return;
818               }
819               armTimer();
820             } else {
821               static if (timerLogic == ThrottleTimerLogic.rearm) {
822                 rearmTimer();
823               }
824             }
825           }
826         }
827         private bool push(Properties.ElementType t) {
828           try {
829             dg(t);
830             return true;
831           } catch (Exception e) {
832             with (flags.lock(ThrottleFlags.doneOrError_produced)) {
833               if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
834                 exception = e;
835               }
836               release();
837               process(newState);
838             }
839             return false;
840           }
841         }
842       }
843       private void setError(Exception e) {
844         with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
845           if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
846             exception = e;
847           }
848           release();
849           process(newState);
850         }
851       }
852       void armTimer() {
853         timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this));
854         timerOp.start();
855       }
856       void rearmTimer() @trusted {
857         flags.update(ThrottleFlags.timerRearming);
858         timerStopSource.stop();
859 
860         auto localFlags = flags.load!(MemoryOrder.acq);
861         // if old timer happens to trigger anyway (or the source is done) we can stop
862         if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0)
863           return;
864 
865         timerStopSource.reset();
866 
867         flags.update(0,0,ThrottleFlags.timerRearming);
868         timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this));
869         timerOp.start();
870       }
871       void process(size_t newState) {
872         auto count = newState / ThrottleFlags.counter;
873         bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0);
874 
875         if (!isDone) {
876           stopSource.stop();
877           timerStopSource.stop();
878           return;
879         }
880 
881         cb.dispose();
882 
883         if (receiver.getStopToken().isStopRequested)
884           receiver.setDone();
885         else if ((newState & ThrottleFlags.value_produced) > 0) {
886           static if (emitLogic == ThrottleEmitLogic.last) {
887             if ((newState & ThrottleFlags.timerArmed) > 0) {
888               try {
889                 static if (!is(Properties.ElementType == void))
890                   dg(item);
891                 else
892                   dg();
893               } catch (Exception e) {
894                 receiver.setError(e);
895                 return;
896               }
897             }
898           }
899           import concurrency.receiver : setValueOrError;
900           static if (is(Properties.Value == void))
901             receiver.setValueOrError();
902           else
903             receiver.setValueOrError(value);
904         } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) {
905           if (exception)
906             receiver.setError(exception);
907           else
908             receiver.setDone();
909         }
910       }
911       private void stop() @trusted nothrow {
912         stopSource.stop();
913         timerStopSource.stop();
914       }
915       void start() @trusted nothrow scope {
916         cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload
917         op.start();
918       }
919     }
920   }
921   return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream))(stream, dur);
922 }
923 
924 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream.
925 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) {
926   import std.traits : ReturnType;
927   alias Properties = StreamProperties!Stream;
928   static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do.");
929   import std.exception : enforce;
930   enforce(window > 0, "window must be greater than 0.");
931   enforce(step <= window, "step can't be bigger than window.");
932   alias DG = CollectDelegate!(Properties.ElementType[]);
933   static struct SlideStreamOp(Receiver) {
934     alias Op = OpType!(Properties.Sender, Receiver);
935     size_t window, step;
936     Properties.ElementType[] arr;
937     DG dg;
938     Op op;
939     @disable this(ref return scope typeof(this) rhs);
940     @disable this(this);
941     this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted {
942       this.window = window;
943       this.step = step;
944       this.arr.reserve(window);
945       this.dg = dg;
946       op = stream.collect(cast(Properties.DG)&item).connect(receiver);
947     }
948     void item(Properties.ElementType t) {
949       import std.algorithm : moveAll;
950       if (arr.length == window) {
951         arr[window-1] = t;
952       } else {
953         arr ~= t;
954         if (arr.length < window)
955           return;
956       }
957       dg(arr);
958       if (step != window) {
959         moveAll(arr[step .. $], arr[0..$-step]);
960         if (step > 1)
961           arr.length -= step;
962       }
963     }
964     void start() nothrow @safe {
965       op.start();
966     }
967   }
968   return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp)(stream, window, step);
969 }
970 
971 /// toList collects all the stream's values and emits the array as a Sender
972 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) {
973   alias Properties = StreamProperties!Stream;
974   static assert(is(Properties.Value == void), "sender must produce void for toList to work");
975   static struct ToListReceiver(State) {
976     State* state;
977     void setValue() @safe {
978       state.receiver.setValue(state.arr);
979     }
980     void setDone() @safe nothrow {
981       state.receiver.setDone();
982     }
983     void setError(Exception e) nothrow @safe {
984       state.receiver.setError(e);
985     }
986     auto getStopToken() nothrow @safe {
987       return state.receiver.getStopToken();
988     }
989     auto getScheduler() nothrow @safe {
990       return state.receiver.getScheduler();
991     }
992   }
993   static struct State(Receiver) {
994     Receiver receiver;
995     Properties.ElementType[] arr;
996   }
997   static struct ToListOp(Receiver) {
998     State!Receiver state;
999     alias Op = OpType!(Properties.Sender, ToListReceiver!(State!Receiver));
1000     Op op;
1001     @disable this(this);
1002     @disable this(ref return scope typeof(this) rhs);
1003     this(Stream stream, return Receiver receiver) @trusted scope return {
1004       state.receiver = receiver;
1005       op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(State!Receiver)(&state));
1006     }
1007     void item(Properties.ElementType t) {
1008       state.arr ~= t;
1009     }
1010     void start() nothrow @safe {
1011       op.start();
1012     }
1013   }
1014   static struct ToListSender {
1015     alias Value = Properties.ElementType[];
1016     Stream stream;
1017     auto connect(Receiver)(return Receiver receiver) @safe scope return {
1018       // ensure NRVO
1019       auto op = ToListOp!(Receiver)(stream, receiver);
1020       return op;
1021     }
1022   }
1023   return ToListSender(stream);
1024 }