1 module concurrency.thread;
2 
3 import concurrency.executor;
4 import concurrency.scheduler;
5 import concurrency.sender;
6 import concepts;
7 import core.sync.semaphore : Semaphore;
8 
9 LocalThreadExecutor getLocalThreadExecutor() @safe {
10   static LocalThreadExecutor localThreadExecutor;
11   if (localThreadExecutor is null)
12     localThreadExecutor = new LocalThreadExecutor();
13   return localThreadExecutor;
14 }
15 
16 class LocalThreadExecutor : Executor {
17   import core.atomic : atomicOp, atomicStore, atomicLoad, cas;
18   import std.concurrency : Tid, thisTid, send, receive;
19   private {
20     Tid tid;
21   }
22 
23   this() @safe {
24     tid = thisTid;
25   }
26 
27   void execute(VoidDelegate dg) @trusted {
28     version (unittest) {
29       // NOTE: We must call the delegate on the current thread instead of going to the main one
30       // the reason is that there is no nursery running on the main one because we are running unit tests
31       // when SIL becomes multithreaded we can revisit this
32       dg();
33     } else {
34       if (isInContext)
35         dg();
36       else
37         (cast() tid).send(dg);
38     }
39   }
40 
41   void execute(VoidFunction fn) @trusted {
42     version (unittest) {
43       fn();
44     } else {
45       if (isInContext)
46         fn();
47       else
48         (cast() tid).send(fn);
49     }
50   }
51 
52   bool isInContext() @trusted {
53     return thisTid == cast()tid;
54   }
55 }
56 
57 package struct LocalThreadWorker {
58   import core.time : Duration, msecs, hnsecs;
59   import std.concurrency : Tid, thisTid, send, receive, receiveTimeout;
60   import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate;
61 
62   static struct AddTimer {
63     Timer timer;
64     Duration dur;
65   }
66   static struct RemoveTimer {
67     Timer timer;
68   }
69 
70   private {
71     Tid tid;
72     shared int counter;
73     shared ulong nextTimerId;
74   }
75 
76   this(LocalThreadExecutor e) @safe {
77     this.tid = e.tid;
78   }
79 
80   this(Tid tid) @safe {
81     this.tid = tid;
82   }
83 
84   void start() @trusted {
85     assert(isInContext); // start can only be called on the thread
86     import concurrency.timingwheels;
87     import std.datetime.systime : Clock;
88     TimingWheels!Timer wheels;
89     auto ticks = 1.msecs; // represents the granularity
90     wheels.init();
91     bool running = true;
92     auto addTimerHandler = (AddTimer cmd) scope {
93       auto real_now = Clock.currStdTime;
94       auto tw_now = wheels.currStdTime(ticks);
95       auto delay = (real_now - tw_now).hnsecs;
96       auto at = (cmd.dur + delay)/ticks;
97       wheels.schedule(cmd.timer, at);
98     };
99     auto removeTimerHandler = (RemoveTimer cmd) scope {
100       wheels.cancel(cmd.timer);
101       cmd.timer.dg(TimerTrigger.cancel);
102     };
103     while (running) {
104       VoidFunction vFunc = null;
105       VoidDelegate vDel = null;
106       import std.meta : AliasSeq;
107       alias handlers = AliasSeq!((VoidFunction fn) => vFunc = fn,
108                                  (VoidDelegate dg) => vDel = dg,
109                                  addTimerHandler,
110                                  removeTimerHandler,
111                                  (bool _){running = false;}
112                                  );
113       auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime);
114       bool handleIt = false;
115       if (nextTrigger.isNull()) {
116         receive(handlers);
117         goto handleIt;
118       } else {
119         if (nextTrigger.get > 0.msecs) {
120           if (receiveTimeout(nextTrigger.get, handlers)) {
121             goto handleIt;
122           }
123         }
124         int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime);
125         if (advance > 0) {
126           auto wr = wheels.advance(advance);
127           foreach(t; wr.timers) {
128             t.dg(TimerTrigger.trigger);
129           }
130         }
131         continue;
132       }
133     handleIt:
134       if (vFunc !is null) {
135         vFunc();
136       }
137       if (vDel !is null) {
138         vDel();
139       }
140     }
141     version (unittest) {
142       import std.variant : Variant;
143       import core.time : seconds;
144       while(receiveTimeout(seconds(-1),(Variant v){
145             import std.stdio;
146             writeln("Got unwanted message ", v);
147             assert(0);
148           })) {}
149     }
150   }
151 
152   void schedule(VoidDelegate dg) {
153     tid.send(dg);
154   }
155 
156   Timer addTimer(TimerDelegate dg, Duration dur) @trusted {
157     import core.atomic : atomicOp;
158     ulong id = nextTimerId.atomicOp!("+=")(1);
159     Timer timer = Timer(dg, id);
160     tid.send(AddTimer(timer, dur));
161     return timer;
162   }
163 
164   void cancelTimer(Timer timer) @trusted {
165     tid.send(RemoveTimer(timer));
166   }
167 
168   void stop() nothrow @trusted {
169     try {
170       (cast()tid).send(false);
171     } catch (Exception e) {
172       assert(false, e.msg);
173     }
174   }
175 
176   bool isInContext() @trusted {
177     return thisTid == cast()tid;
178   }
179 }
180 
181 private Semaphore localSemaphore() {
182   static Semaphore semaphore;
183   if (semaphore is null)
184     semaphore = new Semaphore();
185   return semaphore;
186 }
187 
188 package void executeInNewThread(VoidFunction fn) @system nothrow {
189   import concurrency.utils : closure;
190   import core.thread : Thread, thread_detachThis, thread_detachInstance;
191   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
192 
193   auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted {
194         fn();
195         version (Posix)
196           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
197       }, fn)).start();
198   try {
199     /*
200       the isDaemon is really only a protecting against a race condition in druntime,
201       which is only introduced because I am detaching the thread, which I need to do
202       if I don't want to leak memory.
203 
204       If the isDaemon fails because the Thread is gone (unlikely) than it can't
205       trigger unwanted behavior the isDaemon is preventing in the first place.
206 
207       So it is fine if the exception is ignored.
208      */
209     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
210   } catch (Exception e) {}
211 }
212 
213 package void executeInNewThread(VoidDelegate fn) @system nothrow {
214   import concurrency.utils : closure;
215   import core.thread : Thread, thread_detachThis, thread_detachInstance;
216   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
217 
218   auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted {
219         fn();
220         version (Posix)
221           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
222       }, fn)).start();
223   try {
224     /*
225       the isDaemon is really only a protecting against a race condition in druntime,
226       which is only introduced because I am detaching the thread, which I need to do
227       if I don't want to leak memory.
228 
229       If the isDaemon fails because the Thread is gone (unlikely) then it can't
230       trigger unwanted behavior the isDaemon is preventing in the first place.
231 
232       So it is fine if the exception is ignored.
233     */
234     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
235   } catch (Exception e) {}
236 }
237 
238 class ThreadExecutor : Executor {
239   void execute(VoidFunction fn) @trusted {
240     executeInNewThread(fn);
241   }
242   void execute(VoidDelegate fn) @trusted {
243     executeInNewThread(fn);
244   }
245   bool isInContext() @safe { return false; }
246 }
247 
248 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) {
249   import core.sync.semaphore;
250   import std.concurrency;
251   import std.traits;
252 
253   if (executor.isInContext)
254     return work(args);
255 
256   Semaphore semaphore = localSemaphore();
257 
258   alias RT = ReturnType!Work;
259   struct Context {
260     Work work;
261     Args args;
262     Semaphore semaphore;
263     static if (is(RT == void)) {
264       void run() {
265         work(args);
266         semaphore.notify();
267       }
268     } else {
269       RT result;
270       void run() {
271         result = work(args);
272         semaphore.notify();
273       }
274     }
275   }
276   Context c = Context(work, args, semaphore);
277   executor.execute(cast(VoidDelegate)&c.run);
278   semaphore.wait();
279   static if (!is(RT == void)) {
280     return c.result;
281   }
282 }
283 
284 shared static this() {
285   import concurrency.utils : resetScheduler;
286 
287   resetScheduler();
288 }
289 
290 struct ThreadSender {
291   static assert (models!(typeof(this), isSender));
292   alias Value = void;
293   static struct Op(Receiver) {
294     private Receiver receiver;
295     this(Receiver receiver) {
296       this.receiver = receiver;
297     }
298     void start() @trusted nothrow scope {
299       executeInNewThread(cast(VoidDelegate)&run);
300     }
301     void run() @trusted {
302       import concurrency.receiver : setValueOrError;
303       import concurrency.error : clone;
304       import concurrency : parentStopSource;
305 
306       parentStopSource = receiver.getStopToken().source;
307 
308       try {
309         receiver.setValue();
310       } catch (Exception e) {
311         receiver.setError(e);
312       } catch (Throwable t) {
313         receiver.setError(t.clone());
314       }
315 
316       parentStopSource = null;
317     }
318   }
319   auto connect(Receiver)(return Receiver receiver) @safe scope return {
320     // ensure NRVO
321     auto op = Op!(Receiver)(receiver);
322     return op;
323   }
324 }
325 
326 struct StdTaskPool {
327   import std.parallelism : Task, TaskPool;
328   TaskPool pool;
329   @disable this(ref return scope typeof(this) rhs);
330   @disable this(this);
331   this(TaskPool pool) @trusted scope shared {
332     this.pool = cast(shared)pool;
333   }
334   ~this() nothrow @trusted scope {
335     try {
336       pool.finish(true);
337     } catch (Exception e) {
338       // can't really happen
339       assert(0);
340     }
341   }
342   auto getScheduler() scope @safe {
343     return StdTaskPoolProtoScheduler(pool);
344   }
345   auto getScheduler() scope @trusted shared {
346     return StdTaskPoolProtoScheduler(cast()pool);
347   }
348 }
349 
350 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe {
351   import std.parallelism : TaskPool;
352   return shared StdTaskPool(new TaskPool(nWorkers));
353 }
354 
355 struct StdTaskPoolProtoScheduler {
356   import std.parallelism : TaskPool;
357   TaskPool pool;
358   auto schedule() {
359     return TaskPoolSender(pool);
360   }
361   auto withBaseScheduler(Scheduler)(Scheduler scheduler) {
362     return StdTaskPoolScheduler!(Scheduler)(pool, scheduler);
363   }
364 }
365 
366 private struct StdTaskPoolScheduler(Scheduler) {
367   import std.parallelism : TaskPool;
368   import core.time : Duration;
369   TaskPool pool;
370   Scheduler scheduler;
371   auto schedule() {
372     return TaskPoolSender(pool);
373   }
374   auto scheduleAfter(Duration run) {
375     import concurrency.operations : via;
376     return schedule().via(scheduler.scheduleAfter(run));
377   }
378 }
379 
380 private struct TaskPoolSender {
381   import std.parallelism : Task, TaskPool, scopedTask, task;
382   import std.traits : ReturnType;
383   import concurrency.error : clone;
384   alias Value = void;
385   TaskPool pool;
386   static struct Op(Receiver) {
387     static void setValue(Receiver receiver) @trusted nothrow {
388       import concurrency : parentStopSource;
389       parentStopSource = receiver.getStopToken().source;
390       try {
391         receiver.setValue();
392       } catch (Exception e) {
393         receiver.setError(e);
394       } catch (Throwable t) {
395         receiver.setError(t.clone);
396       }
397       parentStopSource = null;
398     }
399     TaskPool pool;
400     alias TaskType = typeof(task!setValue(Receiver.init));
401     TaskType myTask;
402     @disable this(ref return scope typeof(this) rhs);
403     @disable this(this);
404     this(Receiver receiver, TaskPool pool) @safe return scope {
405       myTask = task!(setValue)(receiver);
406       this.pool = pool;
407     }
408     void start() @trusted nothrow scope {
409       try {
410         pool.put(myTask);
411       } catch (Exception e) {
412         myTask.args[0].setError(e);
413       }
414     }
415   }
416   auto connect(Receiver)(return Receiver receiver) @safe scope return {
417     // ensure NRVO
418     auto op = Op!(Receiver)(receiver, pool);
419     return op;
420   }
421 }