1 module concurrency.thread;
2 
3 import concurrency.executor;
4 import concurrency.scheduler;
5 import concurrency.sender;
6 import concepts;
7 import core.sync.semaphore : Semaphore;
8 
9 LocalThreadExecutor getLocalThreadExecutor() @safe {
10   static LocalThreadExecutor localThreadExecutor;
11   if (localThreadExecutor is null)
12     localThreadExecutor = new LocalThreadExecutor();
13   return localThreadExecutor;
14 }
15 
16 class LocalThreadExecutor : Executor {
17   import core.atomic : atomicOp, atomicStore, atomicLoad, cas;
18   import std.concurrency : Tid, thisTid, send, receive;
19   private {
20     Tid tid;
21   }
22 
23   this() @safe {
24     tid = thisTid;
25   }
26 
27   void execute(VoidDelegate dg) @trusted {
28     version (unittest) {
29       // NOTE: We must call the delegate on the current thread instead of going to the main one
30       // the reason is that there is no nursery running on the main one because we are running unit tests
31       // when SIL becomes multithreaded we can revisit this
32       dg();
33     } else {
34       if (isInContext)
35         dg();
36       else
37         (cast() tid).send(dg);
38     }
39   }
40 
41   void execute(VoidFunction fn) @trusted {
42     version (unittest) {
43       fn();
44     } else {
45       if (isInContext)
46         fn();
47       else
48         (cast() tid).send(fn);
49     }
50   }
51 
52   bool isInContext() @trusted {
53     return thisTid == cast()tid;
54   }
55 }
56 
57 package struct LocalThreadWorker {
58   import core.time : Duration, msecs, hnsecs;
59   import std.concurrency : Tid, thisTid, send, receive, receiveTimeout;
60 
61   static struct AddTimer {
62     Timer timer;
63     Duration dur;
64   }
65   static struct Timer {
66     VoidDelegate dg;
67     ulong id_;
68     ulong id() { return id_; }
69   }
70   static struct RemoveTimer {
71     Timer timer;
72     shared Semaphore semaphore;
73   }
74 
75   private {
76     Tid tid;
77     shared int counter;
78     shared ulong nextTimerId;
79   }
80 
81   this(LocalThreadExecutor e) @safe {
82     this.tid = e.tid;
83   }
84 
85   this(Tid tid) @safe {
86     this.tid = tid;
87   }
88 
89   void start() @trusted {
90     assert(isInContext); // start can only be called on the thread
91     import concurrency.timingwheels;
92     import std.datetime.systime : Clock;
93     TimingWheels!Timer wheels;
94     auto ticks = 1.msecs; // represents the granularity
95     wheels.init();
96     bool running = true;
97     auto addTimerHandler = (AddTimer cmd) scope {
98       auto real_now = Clock.currStdTime;
99       auto tw_now = wheels.currStdTime(ticks);
100       auto delay = (real_now - tw_now).hnsecs;
101       auto at = (cmd.dur + delay)/ticks;
102       wheels.schedule(cmd.timer, at);
103     };
104     auto removeTimerHandler = (RemoveTimer cmd) scope {
105       wheels.cancel(cmd.timer);
106       if (cmd.semaphore !is null)
107         (cast()cmd.semaphore).notify;
108     };
109     while (running) {
110       VoidFunction vFunc = null;
111       VoidDelegate vDel = null;
112       import std.meta : AliasSeq;
113       alias handlers = AliasSeq!((VoidFunction fn) => vFunc = fn,
114                                  (VoidDelegate dg) => vDel = dg,
115                                  addTimerHandler,
116                                  removeTimerHandler,
117                                  (bool _){running = false;}
118                                  );
119       auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime);
120       bool handleIt = false;
121       if (nextTrigger.isNull()) {
122         receive(handlers);
123         goto handleIt;
124       } else {
125         if (nextTrigger.get > 0.msecs) {
126           if (receiveTimeout(nextTrigger.get, handlers)) {
127             goto handleIt;
128           }
129         }
130         int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime);
131         if (advance > 0) {
132           auto wr = wheels.advance(advance);
133           foreach(t; wr.timers) {
134             t.dg();
135           }
136         }
137         continue;
138       }
139     handleIt:
140       if (vFunc !is null) {
141         vFunc();
142       }
143       if (vDel !is null) {
144         vDel();
145       }
146     }
147     version (unittest) {
148       import std.variant : Variant;
149       import core.time : seconds;
150       while(receiveTimeout(seconds(-1),(Variant v){
151             import std.stdio;
152             writeln("Got unwanted message ", v);
153             assert(0);
154           })) {}
155     }
156   }
157 
158   void schedule(VoidDelegate dg) {
159     tid.send(dg);
160   }
161 
162   Timer addTimer(VoidDelegate dg, Duration dur) @trusted {
163     import core.atomic : atomicOp;
164     ulong id = nextTimerId.atomicOp!("+=")(1);
165     Timer timer = Timer(dg, id);
166     tid.send(AddTimer(timer, dur));
167     return timer;
168   }
169 
170   void cancelTimer(Timer timer) @trusted {
171     Semaphore semaphore;
172     if (!isInContext) {
173       semaphore = localSemaphore();
174     }
175     tid.send(RemoveTimer(timer, cast(shared)semaphore));
176     if (semaphore !is null)
177       semaphore.wait();
178   }
179 
180   void stop() nothrow @trusted {
181     try {
182       (cast()tid).send(false);
183     } catch (Exception e) {
184       assert(false, e.msg);
185     }
186   }
187 
188   bool isInContext() @trusted {
189     return thisTid == cast()tid;
190   }
191 }
192 
193 private Semaphore localSemaphore() {
194   static Semaphore semaphore;
195   if (semaphore is null)
196     semaphore = new Semaphore();
197   return semaphore;
198 }
199 
200 package void executeInNewThread(VoidFunction fn) @system nothrow {
201   import concurrency.utils : closure;
202   import core.thread : Thread, thread_detachThis, thread_detachInstance;
203   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
204 
205   auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted {
206         fn();
207         version (Posix)
208           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
209       }, fn)).start();
210   try {
211     /*
212       the isDaemon is really only a protecting against a race condition in druntime,
213       which is only introduced because I am detaching the thread, which I need to do
214       if I don't want to leak memory.
215 
216       If the isDaemon fails because the Thread is gone (unlikely) than it can't
217       trigger unwanted behavior the isDaemon is preventing in the first place.
218 
219       So it is fine if the exception is ignored.
220      */
221     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
222   } catch (Exception e) {}
223 }
224 
225 package void executeInNewThread(VoidDelegate fn) @system nothrow {
226   import concurrency.utils : closure;
227   import core.thread : Thread, thread_detachThis, thread_detachInstance;
228   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
229 
230   auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted {
231         fn();
232         version (Posix)
233           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
234       }, fn)).start();
235   try {
236     /*
237       the isDaemon is really only a protecting against a race condition in druntime,
238       which is only introduced because I am detaching the thread, which I need to do
239       if I don't want to leak memory.
240 
241       If the isDaemon fails because the Thread is gone (unlikely) then it can't
242       trigger unwanted behavior the isDaemon is preventing in the first place.
243 
244       So it is fine if the exception is ignored.
245     */
246     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
247   } catch (Exception e) {}
248 }
249 
250 class ThreadExecutor : Executor {
251   void execute(VoidFunction fn) @trusted {
252     executeInNewThread(fn);
253   }
254   void execute(VoidDelegate fn) @trusted {
255     executeInNewThread(fn);
256   }
257   bool isInContext() @safe { return false; }
258 }
259 
260 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) {
261   import core.sync.semaphore;
262   import std.concurrency;
263   import std.traits;
264 
265   if (executor.isInContext)
266     return work(args);
267 
268   Semaphore semaphore = localSemaphore();
269 
270   alias RT = ReturnType!Work;
271   struct Context {
272     Work work;
273     Args args;
274     Semaphore semaphore;
275     static if (is(RT == void)) {
276       void run() {
277         work(args);
278         semaphore.notify();
279       }
280     } else {
281       RT result;
282       void run() {
283         result = work(args);
284         semaphore.notify();
285       }
286     }
287   }
288   Context c = Context(work, args, semaphore);
289   executor.execute(cast(VoidDelegate)&c.run);
290   semaphore.wait();
291   static if (!is(RT == void)) {
292     return c.result;
293   }
294 }
295 
296 shared static this() {
297   import concurrency.utils : resetScheduler;
298 
299   resetScheduler();
300 }
301 
302 @models!(ThreadSender, isSender)
303 struct ThreadSender {
304   alias Value = void;
305   static struct Op(Receiver) {
306     private Receiver receiver;
307     this(Receiver receiver) {
308       this.receiver = receiver;
309     }
310     void start() @trusted nothrow scope {
311       executeInNewThread(cast(VoidDelegate)&run);
312     }
313     void run() @trusted {
314       import concurrency.receiver : setValueOrError;
315       try {
316         receiver.setValueOrError();
317       } catch (Throwable t) {
318         import std.stdio;
319         stderr.writeln(t);
320         assert(0);
321       }
322     }
323   }
324   auto connect(Receiver)(return Receiver receiver) @safe scope return {
325     // ensure NRVO
326     auto op = Op!(Receiver)(receiver);
327     return op;
328   }
329 }
330 
331 struct StdTaskPool {
332   import std.parallelism : Task, TaskPool;
333   TaskPool pool;
334   @disable this(ref return scope typeof(this) rhs);
335   @disable this(this);
336   this(TaskPool pool) @trusted scope shared {
337     this.pool = cast(shared)pool;
338   }
339   ~this() nothrow @trusted scope {
340     try {
341       pool.finish(true);
342     } catch (Exception e) {
343       // can't really happen
344       assert(0);
345     }
346   }
347   auto getScheduler() scope @safe {
348     return StdTaskPoolProtoScheduler(pool);
349   }
350   auto getScheduler() scope @trusted shared {
351     return StdTaskPoolProtoScheduler(cast()pool);
352   }
353 }
354 
355 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe {
356   import std.parallelism : TaskPool;
357   return shared StdTaskPool(new TaskPool(nWorkers));
358 }
359 
360 struct StdTaskPoolProtoScheduler {
361   import std.parallelism : TaskPool;
362   TaskPool pool;
363   auto schedule() {
364     return TaskPoolSender(pool);
365   }
366   auto withBaseScheduler(Scheduler)(Scheduler scheduler) {
367     return StdTaskPoolScheduler!(Scheduler)(pool, scheduler);
368   }
369 }
370 
371 private struct StdTaskPoolScheduler(Scheduler) {
372   import std.parallelism : TaskPool;
373   import core.time : Duration;
374   TaskPool pool;
375   Scheduler scheduler;
376   auto schedule() {
377     return TaskPoolSender(pool);
378   }
379   auto scheduleAfter(Duration run) {
380     import concurrency.operations : via;
381     return schedule().via(scheduler.scheduleAfter(run));
382   }
383 }
384 
385 private struct TaskPoolSender {
386   import std.parallelism : Task, TaskPool, scopedTask, task;
387   import std.traits : ReturnType;
388   alias Value = void;
389   TaskPool pool;
390   static struct Op(Receiver) {
391     static void setValue(Receiver receiver) @safe nothrow {
392       import concurrency.receiver : setValueOrError;
393       receiver.setValueOrError();
394     }
395     TaskPool pool;
396     alias TaskType = typeof(task!setValue(Receiver.init));
397     TaskType myTask;
398     @disable this(ref return scope typeof(this) rhs);
399     @disable this(this);
400     this(Receiver receiver, TaskPool pool) @safe return scope {
401       myTask = task!(setValue)(receiver);
402       this.pool = pool;
403     }
404     void start() @trusted nothrow scope {
405       try {
406         pool.put(myTask);
407       } catch (Exception e) {
408         myTask.args[0].setError(e);
409       }
410     }
411   }
412   auto connect(Receiver)(return Receiver receiver) @safe scope return {
413     // ensure NRVO
414     auto op = Op!(Receiver)(receiver, pool);
415     return op;
416   }
417 }