1 module concurrency.scheduler; 2 3 import concurrency.sender : SenderObjectBase; 4 import core.time : Duration; 5 import concepts; 6 import mir.algebraic : Nullable, nullable; 7 8 void checkScheduler(T)() { 9 import concurrency.sender : checkSender; 10 import core.time : msecs; 11 T t = T.init; 12 alias Sender = typeof(t.schedule()); 13 checkSender!Sender(); 14 alias AfterSender = typeof(t.scheduleAfter(10.msecs)); 15 checkSender!AfterSender(); 16 } 17 enum isScheduler(T) = is(typeof(checkScheduler!T)); 18 19 /// polymorphic Scheduler 20 interface SchedulerObjectBase { 21 SenderObjectBase!void schedule() @safe; 22 SenderObjectBase!void scheduleAfter(Duration d) @safe; 23 } 24 25 class SchedulerObject(S) : SchedulerObjectBase { 26 import concurrency.sender : toSenderObject; 27 S scheduler; 28 this(S scheduler) { 29 this.scheduler = scheduler; 30 } 31 SenderObjectBase!void schedule() @safe { 32 return scheduler.schedule().toSenderObject(); 33 } 34 SenderObjectBase!void scheduleAfter(Duration d) @safe { 35 return scheduler.scheduleAfter(d).toSenderObject(); 36 } 37 } 38 39 SchedulerObjectBase toSchedulerObject(S)(S scheduler) { 40 return new SchedulerObject!(S)(scheduler); 41 } 42 43 enum TimerTrigger { 44 trigger, 45 cancel 46 } 47 48 alias TimerDelegate = void delegate(TimerTrigger) shared @safe; 49 50 struct Timer { 51 TimerDelegate dg; 52 ulong id_; 53 ulong id() { return id_; } 54 } 55 56 auto localThreadScheduler() { 57 import concurrency.thread : LocalThreadWorker; 58 import std.concurrency : thisTid; 59 return SchedulerAdapter!LocalThreadWorker(LocalThreadWorker(thisTid())); 60 } 61 62 struct SchedulerAdapter(Worker) { 63 import concurrency.receiver : setValueOrError; 64 import concurrency.executor : VoidDelegate; 65 import core.time : Duration; 66 Worker worker; 67 auto schedule() { 68 static struct ScheduleOp(Receiver) { 69 Worker worker; 70 Receiver receiver; 71 void start() @trusted nothrow { 72 try { 73 worker.schedule(cast(VoidDelegate)()=>receiver.setValueOrError()); 74 } catch (Exception e) { 75 receiver.setError(e); 76 } 77 } 78 } 79 static struct ScheduleSender { 80 alias Value = void; 81 Worker worker; 82 auto connect(Receiver)(return Receiver receiver) @safe scope return { 83 // ensure NRVO 84 auto op = ScheduleOp!(Receiver)(worker, receiver); 85 return op; 86 } 87 } 88 return ScheduleSender(worker); 89 } 90 auto scheduleAfter(Duration dur) { 91 return ScheduleAfterSender!(Worker)(worker, dur); 92 } 93 } 94 95 struct ScheduleAfterOp(Worker, Receiver) { 96 import std.traits : ReturnType; 97 import concurrency.bitfield : SharedBitField; 98 import concurrency.stoptoken : StopCallback, onStop; 99 import concurrency.receiver : setValueOrError; 100 101 enum Flags { 102 locked = 0x1, 103 terminated = 0x2 104 } 105 alias Timer = ReturnType!(Worker.addTimer); 106 Worker worker; 107 Duration dur; 108 Receiver receiver; 109 Timer timer; 110 StopCallback stopCb; 111 shared SharedBitField!Flags flags; 112 void start() @trusted nothrow { 113 with(flags.lock()) { 114 if (receiver.getStopToken().isStopRequested) { 115 receiver.setDone(); 116 return; 117 } 118 stopCb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&stop); 119 try { 120 timer = worker.addTimer((TimerTrigger cause) shared nothrow { 121 stopCb.dispose(); 122 final switch (cause) { 123 case TimerTrigger.cancel: 124 receiver.setDone(); 125 break; 126 case TimerTrigger.trigger: 127 with(flags.update(Flags.terminated)) { 128 if ((oldState & Flags.terminated) == 0) 129 receiver.setValueOrError(); 130 } 131 break; 132 } 133 }, dur); 134 } catch (Exception e) { 135 receiver.setError(e); 136 } 137 } 138 } 139 private void stop() @trusted nothrow { 140 with(flags.update(Flags.terminated)) { 141 if ((oldState & Flags.terminated) == 0) { 142 try { worker.cancelTimer(timer); } catch (Exception e) {} // TODO: what to do here? 143 } 144 } 145 } 146 } 147 148 struct ScheduleAfterSender(Worker) { 149 alias Value = void; 150 Worker worker; 151 Duration dur; 152 auto connect(Receiver)(return Receiver receiver) @safe return scope { 153 // ensure NRVO 154 auto op = ScheduleAfterOp!(Worker, Receiver)(worker, dur, receiver); 155 return op; 156 } 157 } 158 159 struct ManualTimeScheduler { 160 shared ManualTimeWorker worker; 161 auto schedule() { 162 import core.time : msecs; 163 return scheduleAfter(0.msecs); 164 } 165 auto scheduleAfter(Duration dur) { 166 return ScheduleAfterSender!(shared ManualTimeWorker)(worker, dur); 167 } 168 } 169 170 class ManualTimeWorker { 171 import concurrency.timingwheels : TimingWheels; 172 import concurrency.executor : VoidDelegate; 173 import core.sync.mutex : Mutex; 174 import core.time : msecs, hnsecs; 175 private { 176 TimingWheels!Timer wheels; 177 Mutex mutex; 178 size_t time = 1; 179 shared ulong nextTimerId; 180 } 181 auto lock() @trusted shared { 182 import concurrency.utils : SharedGuard; 183 return SharedGuard!(ManualTimeWorker).acquire(this, cast()mutex); 184 } 185 this() @trusted shared { 186 mutex = cast(shared)new Mutex(); 187 (cast()wheels).init(time); 188 } 189 ManualTimeScheduler getScheduler() @safe shared { 190 return ManualTimeScheduler(this); 191 } 192 Timer addTimer(TimerDelegate dg, Duration dur) @trusted shared { 193 import core.atomic : atomicOp; 194 with(lock()) { 195 auto real_now = time; 196 auto tw_now = wheels.currStdTime(1.msecs); 197 auto delay = (real_now - tw_now).hnsecs; 198 auto at = (dur + delay)/1.msecs; 199 auto timer = Timer(dg, nextTimerId.atomicOp!("+=")(1)); 200 wheels.schedule(timer, at); 201 return timer; 202 } 203 } 204 void cancelTimer(Timer timer) @trusted shared { 205 with(lock()) { 206 wheels.cancel(timer); 207 } 208 timer.dg(TimerTrigger.cancel); 209 } 210 Nullable!Duration timeUntilNextEvent() @trusted shared { 211 with(lock()) { 212 return wheels.timeUntilNextEvent(1.msecs, time); 213 } 214 } 215 void advance(Duration dur) @trusted shared { 216 import core.time : msecs; 217 with(lock()) { 218 time += dur.total!"hnsecs"; 219 int incr = wheels.ticksToCatchUp(1.msecs, time); 220 if (incr > 0) { 221 auto wr = wheels.advance(incr); 222 foreach(t; wr.timers) { 223 t.dg(TimerTrigger.trigger); 224 } 225 } 226 } 227 } 228 } 229 230 T withBaseScheduler(T, P)(auto ref T t, auto ref P p) if (isScheduler!T && isScheduler!P) { 231 return t; 232 }