1 module concurrency.stream.throttling;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concurrency.stoptoken;
6 import concepts;
7 import core.time : Duration;
8 import core.atomic : MemoryOrder;
9 
10 private enum ThrottleFlags : size_t {
11   locked = 0x1,
12   value_produced = 0x2,
13   doneOrError_produced = 0x4,
14   timerArmed = 0x8,
15   timerRearming = 0x10,
16   counter = 0x20
17 }
18 
19 enum ThrottleEmitLogic: uint {
20   first, // emit the first item in the window
21   last // emit the last item in the window
22 };
23 enum ThrottleTimerLogic: uint {
24   noop, // don't reset the timer on new items
25   rearm // reset the timer on new items
26 };
27 
28 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items
29 auto throttleFirst(Stream)(Stream s, Duration d) {
30   return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d);
31 }
32 
33 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period
34 auto throttleLast(Stream)(Stream s, Duration d) {
35   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d);
36 }
37 
38 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence
39 auto debounce(Stream)(Stream s, Duration d) {
40   return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d);
41 }
42 
43 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) {
44   alias Properties = StreamProperties!Stream;
45   return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream, emitLogic, timerLogic))(stream, dur);
46 }
47 
48 template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic) {
49   import std.traits : ReturnType;
50   import concurrency.bitfield : SharedBitField;
51   alias Properties = StreamProperties!Stream;
52   alias DG = Properties.DG;
53   struct ThrottleStreamOp(Receiver) {
54     Duration dur;
55     DG dg;
56     Receiver receiver;
57     static if (emitLogic == ThrottleEmitLogic.last)
58       static if (!is(Properties.ElementType == void))
59         Properties.ElementType item;
60     static if (!is(Properties.Value == void))
61       Properties.Value value;
62     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
63     alias InnerReceiver = TimerReceiver!(typeof(this), Properties.ElementType, emitLogic, timerLogic);
64     StopSource stopSource;
65     StopSource timerStopSource;
66     StopCallback cb;
67     Throwable throwable;
68     alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this), Properties.Value));
69     alias TimerOp = OpType!(SchedulerAfterSender, InnerReceiver);
70     Op op;
71     TimerOp timerOp;
72     shared SharedBitField!ThrottleFlags flags;
73     @disable this(ref return scope inout typeof(this) rhs);
74     @disable this(this);
75     this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope {
76       this.dur = dur;
77       this.dg = dg;
78       this.receiver = receiver;
79       stopSource = new StopSource();
80       timerStopSource = new StopSource();
81       op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this), Properties.Value)(&this));
82     }
83     static if (is(Properties.ElementType == void)) {
84       private void onItem() {
85         with (flags.update(ThrottleFlags.timerArmed)) {
86           if ((oldState & ThrottleFlags.timerArmed) == 0) {
87             static if (emitLogic == ThrottleEmitLogic.first) {
88               if (!push(t))
89                 return;
90             }
91             armTimer();
92           } else {
93             static if (timerLogic == ThrottleTimerLogic.rearm) {
94               // release();
95               rearmTimer();
96             }
97           }
98         }
99       }
100       private bool push() {
101         try {
102           dg();
103           return true;
104         } catch (Exception e) {
105           with (flags.lock(ThrottleFlags.doneOrError_produced)) {
106             if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
107               throwable = e;
108             }
109             release();
110             process(newState);
111           }
112           return false;
113         }
114       }
115     } else {
116       private void onItem(Properties.ElementType t) {
117         with (flags.lock(ThrottleFlags.timerArmed)) {
118           static if (emitLogic == ThrottleEmitLogic.last)
119             item = t;
120           release();
121           if ((oldState & ThrottleFlags.timerArmed) == 0) {
122             static if (emitLogic == ThrottleEmitLogic.first) {
123               if (!push(t))
124                 return;
125             }
126             armTimer();
127           } else {
128             static if (timerLogic == ThrottleTimerLogic.rearm) {
129               rearmTimer();
130             }
131           }
132         }
133       }
134       private bool push(Properties.ElementType t) {
135         try {
136           dg(t);
137           return true;
138         } catch (Exception e) {
139           with (flags.lock(ThrottleFlags.doneOrError_produced)) {
140             if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
141               throwable = e;
142             }
143             release();
144             process(newState);
145           }
146           return false;
147         }
148       }
149     }
150     private void setError(Throwable e) {
151       with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
152         if ((oldState & ThrottleFlags.doneOrError_produced) == 0) {
153           throwable = e;
154         }
155         release();
156         process(newState);
157       }
158     }
159     void armTimer() {
160       timerOp = receiver.getScheduler().scheduleAfter(dur).connect(InnerReceiver(&this));
161       timerOp.start();
162     }
163     void rearmTimer() @trusted {
164       flags.update(ThrottleFlags.timerRearming);
165       timerStopSource.stop();
166 
167       auto localFlags = flags.load!(MemoryOrder.acq);
168       // if old timer happens to trigger anyway (or the source is done) we can stop
169       if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0)
170         return;
171 
172       timerStopSource.reset();
173 
174       flags.update(0,0,ThrottleFlags.timerRearming);
175       timerOp = receiver.getScheduler().scheduleAfter(dur).connect(InnerReceiver(&this));
176       timerOp.start();
177     }
178     void process(size_t newState) {
179       auto count = newState / ThrottleFlags.counter;
180       bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0);
181 
182       if (!isDone) {
183         stopSource.stop();
184         timerStopSource.stop();
185         return;
186       }
187 
188       cb.dispose();
189 
190       if (receiver.getStopToken().isStopRequested)
191         receiver.setDone();
192       else if ((newState & ThrottleFlags.value_produced) > 0) {
193         static if (emitLogic == ThrottleEmitLogic.last) {
194           if ((newState & ThrottleFlags.timerArmed) > 0) {
195             try {
196               static if (!is(Properties.ElementType == void))
197                 dg(item);
198               else
199                 dg();
200             } catch (Exception e) {
201               receiver.setError(e);
202               return;
203             }
204           }
205         }
206         import concurrency.receiver : setValueOrError;
207         static if (is(Properties.Value == void))
208           receiver.setValueOrError();
209         else
210           receiver.setValueOrError(value);
211       } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) {
212         if (throwable)
213           receiver.setError(throwable);
214         else
215           receiver.setDone();
216       }
217     }
218     private void stop() @trusted nothrow {
219       stopSource.stop();
220       timerStopSource.stop();
221     }
222     void start() @trusted nothrow scope {
223       cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload
224       op.start();
225     }
226   }
227 }
228 
229 struct TimerReceiver(Op, ElementType, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic) {
230   Op* state;
231   void setValue() @safe {
232     with (state.flags.lock()) {
233       if (was(ThrottleFlags.timerRearming))
234         return;
235 
236       static if (!is(ElementType == void) && emitLogic == ThrottleEmitLogic.last)
237         auto item = state.item;
238       release(ThrottleFlags.timerArmed);
239       static if (emitLogic == ThrottleEmitLogic.last) {
240         static if (!is(ElementType == void))
241           state.push(item);
242         else
243           state.push();
244       }
245     }
246   }
247   void setDone() @safe nothrow {
248     // TODO: would be nice if we can merge in next update...
249     if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
250       return;
251     with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
252       state.process(newState);
253     }
254   }
255   void setError(Throwable e) nothrow @safe {
256     // TODO: would be nice if we can merge in next lock...
257     if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0)
258       return;
259     state.setError(e);
260   }
261   auto getStopToken() {
262     return StopToken(state.timerStopSource);
263   }
264   auto getScheduler() {
265     return state.receiver.getScheduler();
266   }
267 }
268 
269 struct SenderReceiver(Op, Value) {
270   Op* state;
271   static if (is(Value == void))
272     void setValue() {
273       with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
274         state.process(newState);
275       }
276     }
277   else
278     void setValue(Value value) {
279       with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) {
280         state.value = value;
281         release();
282         state.process(newState);
283       }
284     }
285   void setDone() {
286     with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) {
287       state.process(newState);
288     }
289   }
290   void setError(Throwable e) nothrow @safe {
291     state.setError(e);
292   }
293   auto getStopToken() {
294     return StopToken(state.stopSource);
295   }
296   auto getScheduler() {
297     return state.receiver.getScheduler();
298   }
299 }