1 module concurrency.stream.throttling; 2 3 import concurrency.stream.stream; 4 import concurrency.sender : OpType; 5 import concurrency.stoptoken; 6 import concepts; 7 import core.time : Duration; 8 import core.atomic : MemoryOrder; 9 10 private enum ThrottleFlags : size_t { 11 locked = 0x1, 12 value_produced = 0x2, 13 doneOrError_produced = 0x4, 14 timerArmed = 0x8, 15 timerRearming = 0x10, 16 counter = 0x20 17 } 18 19 enum ThrottleEmitLogic: uint { 20 first, // emit the first item in the window 21 last // emit the last item in the window 22 }; 23 enum ThrottleTimerLogic: uint { 24 noop, // don't reset the timer on new items 25 rearm // reset the timer on new items 26 }; 27 28 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items 29 auto throttleFirst(Stream)(Stream s, Duration d) { 30 return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d); 31 } 32 33 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period 34 auto throttleLast(Stream)(Stream s, Duration d) { 35 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d); 36 } 37 38 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence 39 auto debounce(Stream)(Stream s, Duration d) { 40 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d); 41 } 42 43 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) { 44 alias Properties = StreamProperties!Stream; 45 return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream, emitLogic, timerLogic))(stream, dur); 46 } 47 48 template ThrottleStreamOp(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic) { 49 import std.traits : ReturnType; 50 import concurrency.bitfield : SharedBitField; 51 alias Properties = StreamProperties!Stream; 52 alias DG = Properties.DG; 53 struct ThrottleStreamOp(Receiver) { 54 Duration dur; 55 DG dg; 56 Receiver receiver; 57 static if (emitLogic == ThrottleEmitLogic.last) 58 static if (!is(Properties.ElementType == void)) 59 Properties.ElementType item; 60 static if (!is(Properties.Value == void)) 61 Properties.Value value; 62 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 63 alias InnerReceiver = TimerReceiver!(typeof(this), Properties.ElementType, emitLogic, timerLogic); 64 StopSource stopSource; 65 StopSource timerStopSource; 66 StopCallback cb; 67 Throwable throwable; 68 alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this), Properties.Value)); 69 alias TimerOp = OpType!(SchedulerAfterSender, InnerReceiver); 70 Op op; 71 TimerOp timerOp; 72 shared SharedBitField!ThrottleFlags flags; 73 @disable this(ref return scope inout typeof(this) rhs); 74 @disable this(this); 75 this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope { 76 this.dur = dur; 77 this.dg = dg; 78 this.receiver = receiver; 79 stopSource = new StopSource(); 80 timerStopSource = new StopSource(); 81 op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this), Properties.Value)(&this)); 82 } 83 static if (is(Properties.ElementType == void)) { 84 private void onItem() { 85 with (flags.update(ThrottleFlags.timerArmed)) { 86 if ((oldState & ThrottleFlags.timerArmed) == 0) { 87 static if (emitLogic == ThrottleEmitLogic.first) { 88 if (!push(t)) 89 return; 90 } 91 armTimer(); 92 } else { 93 static if (timerLogic == ThrottleTimerLogic.rearm) { 94 // release(); 95 rearmTimer(); 96 } 97 } 98 } 99 } 100 private bool push() { 101 try { 102 dg(); 103 return true; 104 } catch (Exception e) { 105 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 106 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 107 throwable = e; 108 } 109 release(); 110 process(newState); 111 } 112 return false; 113 } 114 } 115 } else { 116 private void onItem(Properties.ElementType t) { 117 with (flags.lock(ThrottleFlags.timerArmed)) { 118 static if (emitLogic == ThrottleEmitLogic.last) 119 item = t; 120 release(); 121 if ((oldState & ThrottleFlags.timerArmed) == 0) { 122 static if (emitLogic == ThrottleEmitLogic.first) { 123 if (!push(t)) 124 return; 125 } 126 armTimer(); 127 } else { 128 static if (timerLogic == ThrottleTimerLogic.rearm) { 129 rearmTimer(); 130 } 131 } 132 } 133 } 134 private bool push(Properties.ElementType t) { 135 try { 136 dg(t); 137 return true; 138 } catch (Exception e) { 139 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 140 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 141 throwable = e; 142 } 143 release(); 144 process(newState); 145 } 146 return false; 147 } 148 } 149 } 150 private void setError(Throwable e) { 151 with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 152 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 153 throwable = e; 154 } 155 release(); 156 process(newState); 157 } 158 } 159 void armTimer() { 160 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(InnerReceiver(&this)); 161 timerOp.start(); 162 } 163 void rearmTimer() @trusted { 164 flags.update(ThrottleFlags.timerRearming); 165 timerStopSource.stop(); 166 167 auto localFlags = flags.load!(MemoryOrder.acq); 168 // if old timer happens to trigger anyway (or the source is done) we can stop 169 if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0) 170 return; 171 172 timerStopSource.reset(); 173 174 flags.update(0,0,ThrottleFlags.timerRearming); 175 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(InnerReceiver(&this)); 176 timerOp.start(); 177 } 178 void process(size_t newState) { 179 auto count = newState / ThrottleFlags.counter; 180 bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0); 181 182 if (!isDone) { 183 stopSource.stop(); 184 timerStopSource.stop(); 185 return; 186 } 187 188 cb.dispose(); 189 190 if (receiver.getStopToken().isStopRequested) 191 receiver.setDone(); 192 else if ((newState & ThrottleFlags.value_produced) > 0) { 193 static if (emitLogic == ThrottleEmitLogic.last) { 194 if ((newState & ThrottleFlags.timerArmed) > 0) { 195 try { 196 static if (!is(Properties.ElementType == void)) 197 dg(item); 198 else 199 dg(); 200 } catch (Exception e) { 201 receiver.setError(e); 202 return; 203 } 204 } 205 } 206 import concurrency.receiver : setValueOrError; 207 static if (is(Properties.Value == void)) 208 receiver.setValueOrError(); 209 else 210 receiver.setValueOrError(value); 211 } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) { 212 if (throwable) 213 receiver.setError(throwable); 214 else 215 receiver.setDone(); 216 } 217 } 218 private void stop() @trusted nothrow { 219 stopSource.stop(); 220 timerStopSource.stop(); 221 } 222 void start() @trusted nothrow scope { 223 cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload 224 op.start(); 225 } 226 } 227 } 228 229 struct TimerReceiver(Op, ElementType, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic) { 230 Op* state; 231 void setValue() @safe { 232 with (state.flags.lock()) { 233 if (was(ThrottleFlags.timerRearming)) 234 return; 235 236 static if (!is(ElementType == void) && emitLogic == ThrottleEmitLogic.last) 237 auto item = state.item; 238 release(ThrottleFlags.timerArmed); 239 static if (emitLogic == ThrottleEmitLogic.last) { 240 static if (!is(ElementType == void)) 241 state.push(item); 242 else 243 state.push(); 244 } 245 } 246 } 247 void setDone() @safe nothrow { 248 // TODO: would be nice if we can merge in next update... 249 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 250 return; 251 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 252 state.process(newState); 253 } 254 } 255 void setError(Throwable e) nothrow @safe { 256 // TODO: would be nice if we can merge in next lock... 257 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 258 return; 259 state.setError(e); 260 } 261 auto getStopToken() { 262 return StopToken(state.timerStopSource); 263 } 264 auto getScheduler() { 265 return state.receiver.getScheduler(); 266 } 267 } 268 269 struct SenderReceiver(Op, Value) { 270 Op* state; 271 static if (is(Value == void)) 272 void setValue() { 273 with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 274 state.process(newState); 275 } 276 } 277 else 278 void setValue(Value value) { 279 with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 280 state.value = value; 281 release(); 282 state.process(newState); 283 } 284 } 285 void setDone() { 286 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 287 state.process(newState); 288 } 289 } 290 void setError(Throwable e) nothrow @safe { 291 state.setError(e); 292 } 293 auto getStopToken() { 294 return StopToken(state.stopSource); 295 } 296 auto getScheduler() { 297 return state.receiver.getScheduler(); 298 } 299 }