1 module concurrency.scheduler;
2 
3 import concurrency.sender : SenderObjectBase;
4 import core.time : Duration;
5 import concepts;
6 import mir.algebraic : Nullable, nullable;
7 
8 void checkScheduler(T)() {
9   import concurrency.sender : checkSender;
10   import core.time : msecs;
11   T t = T.init;
12   alias Sender = typeof(t.schedule());
13   checkSender!Sender();
14   alias AfterSender = typeof(t.scheduleAfter(10.msecs));
15   checkSender!AfterSender();
16 }
17 enum isScheduler(T) = is(typeof(checkScheduler!T));
18 
19 /// polymorphic Scheduler
20 interface SchedulerObjectBase {
21   SenderObjectBase!void schedule() @safe;
22   SenderObjectBase!void scheduleAfter(Duration d) @safe;
23 }
24 
25 class SchedulerObject(S) : SchedulerObjectBase {
26   import concurrency.sender : toSenderObject;
27   S scheduler;
28   this(S scheduler) {
29     this.scheduler = scheduler;
30   }
31   SenderObjectBase!void schedule() @safe {
32     return scheduler.schedule().toSenderObject();
33   }
34   SenderObjectBase!void scheduleAfter(Duration d) @safe {
35     return scheduler.scheduleAfter(d).toSenderObject();
36   }
37 }
38 
39 SchedulerObjectBase toSchedulerObject(S)(S scheduler) {
40   return new SchedulerObject!(S)(scheduler);
41 }
42 
43 enum TimerTrigger {
44   trigger,
45   cancel
46 }
47 
48 alias TimerDelegate = void delegate(TimerTrigger) shared @safe;
49 
50 struct Timer {
51   TimerDelegate dg;
52   ulong id_;
53   ulong id() { return id_; }
54 }
55 
56 auto localThreadScheduler() {
57   import concurrency.thread : LocalThreadWorker;
58   import std.concurrency : thisTid;
59   return SchedulerAdapter!LocalThreadWorker(LocalThreadWorker(thisTid()));
60 }
61 
62 struct SchedulerAdapter(Worker) {
63   import concurrency.receiver : setValueOrError;
64   import concurrency.executor : VoidDelegate;
65   import core.time : Duration;
66   Worker worker;
67   auto schedule() {
68     static struct ScheduleOp(Receiver) {
69       Worker worker;
70       Receiver receiver;
71       void start() @trusted nothrow {
72         try {
73           worker.schedule(cast(VoidDelegate)()=>receiver.setValueOrError());
74         } catch (Exception e) {
75           receiver.setError(e);
76         }
77       }
78     }
79     static struct ScheduleSender {
80       alias Value = void;
81       Worker worker;
82       auto connect(Receiver)(return Receiver receiver) @safe scope return {
83         // ensure NRVO
84         auto op = ScheduleOp!(Receiver)(worker, receiver);
85         return op;
86       }
87     }
88     return ScheduleSender(worker);
89   }
90   auto scheduleAfter(Duration dur) {
91     return ScheduleAfterSender!(Worker)(worker, dur);
92   }
93 }
94 
95 struct ScheduleAfterOp(Worker, Receiver) {
96   import std.traits : ReturnType;
97   import concurrency.bitfield : SharedBitField;
98   import concurrency.stoptoken : StopCallback, onStop;
99   import concurrency.receiver : setValueOrError;
100 
101   enum Flags {
102     locked = 0x1,
103     terminated = 0x2
104   }
105   alias Timer = ReturnType!(Worker.addTimer);
106   Worker worker;
107   Duration dur;
108   Receiver receiver;
109   Timer timer;
110   StopCallback stopCb;
111   shared SharedBitField!Flags flags;
112   void start() @trusted nothrow {
113     with(flags.lock()) {
114       if (receiver.getStopToken().isStopRequested) {
115         receiver.setDone();
116         return;
117       }
118       stopCb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&stop);
119       try {
120         timer = worker.addTimer((TimerTrigger cause) shared nothrow {
121             stopCb.dispose();
122             final switch (cause) {
123             case TimerTrigger.cancel:
124               receiver.setDone();
125               break;
126             case TimerTrigger.trigger:
127               with(flags.update(Flags.terminated)) {
128                 if ((oldState & Flags.terminated) == 0)
129                   receiver.setValueOrError();
130               }
131               break;
132             }
133           }, dur);
134       } catch (Exception e) {
135         receiver.setError(e);
136       }
137     }
138   }
139   private void stop() @trusted nothrow {
140     with(flags.update(Flags.terminated)) {
141       if ((oldState & Flags.terminated) == 0) {
142         try { worker.cancelTimer(timer); } catch (Exception e) {} // TODO: what to do here?
143       }
144     }
145   }
146 }
147 
148 struct ScheduleAfterSender(Worker) {
149   alias Value = void;
150   Worker worker;
151   Duration dur;
152   auto connect(Receiver)(return Receiver receiver) @safe return scope {
153     // ensure NRVO
154     auto op = ScheduleAfterOp!(Worker, Receiver)(worker, dur, receiver);
155     return op;
156   }
157 }
158 
159 struct ManualTimeScheduler {
160   shared ManualTimeWorker worker;
161   auto schedule() {
162     import core.time : msecs;
163     return scheduleAfter(0.msecs);
164   }
165   auto scheduleAfter(Duration dur) {
166     return ScheduleAfterSender!(shared ManualTimeWorker)(worker, dur);
167   }
168 }
169 
170 class ManualTimeWorker {
171   import concurrency.timingwheels : TimingWheels;
172   import concurrency.executor : VoidDelegate;
173   import core.sync.mutex : Mutex;
174   import core.time : msecs, hnsecs;
175   private {
176     TimingWheels!Timer wheels;
177     Mutex mutex;
178     size_t time = 1;
179     shared ulong nextTimerId;
180   }
181   auto lock() @trusted shared {
182     import concurrency.utils : SharedGuard;
183     return SharedGuard!(ManualTimeWorker).acquire(this, cast()mutex);
184   }
185   this() @trusted shared {
186     mutex = cast(shared)new Mutex();
187     (cast()wheels).init(time);
188   }
189   ManualTimeScheduler getScheduler() @safe shared {
190     return ManualTimeScheduler(this);
191   }
192   Timer addTimer(TimerDelegate dg, Duration dur) @trusted shared {
193     import core.atomic : atomicOp;
194     with(lock()) {
195       auto real_now = time;
196       auto tw_now = wheels.currStdTime(1.msecs);
197       auto delay = (real_now - tw_now).hnsecs;
198       auto at = (dur + delay)/1.msecs;
199       auto timer = Timer(dg, nextTimerId.atomicOp!("+=")(1));
200       wheels.schedule(timer, at);
201       return timer;
202     }
203   }
204   void cancelTimer(Timer timer) @trusted shared {
205     with(lock()) {
206       wheels.cancel(timer);
207     }
208     timer.dg(TimerTrigger.cancel);
209   }
210   Nullable!Duration timeUntilNextEvent() @trusted shared {
211     with(lock()) {
212       return wheels.timeUntilNextEvent(1.msecs, time);
213     }
214   }
215   void advance(Duration dur) @trusted shared {
216     import core.time : msecs;
217     with(lock()) {
218       time += dur.total!"hnsecs";
219       int incr = wheels.ticksToCatchUp(1.msecs, time);
220       if (incr > 0) {
221         auto wr = wheels.advance(incr);
222         foreach(t; wr.timers) {
223           t.dg(TimerTrigger.trigger);
224         }
225       }
226     }
227   }
228 }
229 
230 T withBaseScheduler(T, P)(auto ref T t, auto ref P p) if (isScheduler!T && isScheduler!P) {
231   return t;
232 }