1 module concurrency.thread;
2 
3 import concurrency.executor;
4 import concurrency.scheduler;
5 import concurrency.sender;
6 import concepts;
7 import core.sync.semaphore : Semaphore;
8 import mir.algebraic;
9 import concurrency.scheduler : Timer;
10 import core.time : Duration;
11 import concurrency.data.queue.waitable;
12 import concurrency.data.queue.mpsc;
13 
14 // we export the getLocalThreadExecutor function so that dynamic libraries
15 // can load it to access the host's localThreadExecutor TLS instance.
16 // Otherwise they would access their own local instance.
17 // should not be called directly by usercode, call `silThreadExecutor` instead.
18 export extern(C) LocalThreadExecutor concurrency_getLocalThreadExecutor() @safe {
19   static LocalThreadExecutor localThreadExecutor;
20   if (localThreadExecutor is null) {
21     localThreadExecutor = new LocalThreadExecutor();
22   }
23 
24   return localThreadExecutor;
25 }
26 
27 LocalThreadExecutor getLocalThreadExecutor() @trusted {
28   import concurrency.utils : dynamicLoad;
29   static LocalThreadExecutor localThreadExecutor;
30   if (localThreadExecutor is null) {
31     localThreadExecutor = dynamicLoad!concurrency_getLocalThreadExecutor()();
32   }
33   return localThreadExecutor;
34 }
35 
36 private struct AddTimer {
37   Timer timer;
38   Duration dur;
39 }
40 
41 private struct RemoveTimer {
42   Timer timer;
43 }
44 
45 private struct Noop {}
46 
47 private alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer, Noop); // null signifies end
48 
49 private struct WorkNode {
50   WorkItem payload;
51   shared WorkNode* next;
52 }
53 
54 private alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode);
55 
56 class LocalThreadExecutor : Executor {
57   import core.atomic : atomicOp, atomicStore, atomicLoad, cas;
58   import core.thread : ThreadID;
59   import std.process : thisThreadID;
60   import concurrency.scheduler : Timer;
61   import concurrency.timingwheels;
62 
63   static struct Node {
64     VoidDelegate dg;
65     Node* next;
66   }
67   private {
68     ThreadID threadId;
69     WorkQueue queue;
70     TimingWheels!Timer wheels;
71     shared ulong nextTimerId;
72   }
73 
74   this() @safe {
75     threadId = thisThreadID;
76     queue = new WorkQueue;
77   }
78 
79   void execute(VoidDelegate dg) @trusted {
80     if (isInContext)
81       dg();
82     else
83       queue.push(new WorkNode(WorkItem(dg)));
84   }
85 
86   void execute(VoidFunction fn) @trusted {
87     if (isInContext)
88       fn();
89     else
90       queue.push(new WorkNode(WorkItem(fn)));
91   }
92 
93   bool isInContext() @trusted {
94     return thisThreadID == threadId;
95   }
96 }
97 
98 package struct LocalThreadWorker {
99   import core.time : Duration, msecs, hnsecs;
100   import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate;
101 
102   private {
103     LocalThreadExecutor executor;
104   }
105 
106   this(LocalThreadExecutor e) @safe {
107     executor = e;
108   }
109 
110   private bool removeTimer(RemoveTimer cmd) {
111     auto removed = executor.wheels.cancel(cmd.timer);
112     cmd.timer.dg(TimerTrigger.cancel);
113     return removed;
114   }
115 
116   void start() @trusted {
117     assert(isInContext); // start can only be called on the thread
118     import std.datetime.systime : Clock;
119     import std.array : Appender;
120     Appender!(Timer[]) expiredTimers;
121     auto ticks = 1.msecs; // represents the granularity
122     executor.wheels.reset();
123     executor.wheels.init();
124     bool running = true;
125     while (running) {
126       import std.meta : AliasSeq;
127       alias handlers = AliasSeq!((typeof(null)){running = false;},
128                                  (RemoveTimer cmd) => removeTimer(cmd),
129                                  (AddTimer cmd) {
130                                    auto real_now = Clock.currStdTime;
131                                    auto tw_now = executor.wheels.currStdTime(ticks);
132                                    auto delay = (real_now - tw_now).hnsecs;
133                                    auto at = (cmd.dur + delay)/ticks;
134                                    executor.wheels.schedule(cmd.timer, at);
135                                  },
136                                  (VoidFunction fn) => fn(),
137                                  (VoidDelegate dg) => dg(),
138                                  (Noop){}
139                                  );
140       auto nextTrigger = executor.wheels.timeUntilNextEvent(ticks, Clock.currStdTime);
141       bool handleIt = false;
142       if (nextTrigger.isNull()) {
143         auto work = executor.queue.pop();
144         if (work is null)
145           continue;
146         work.payload.match!(handlers);
147       } else {
148         if (nextTrigger.get > 0.msecs) {
149           auto work = executor.queue.pop(nextTrigger.get);
150           if (work !is null) {
151 
152             work.payload.match!(handlers);
153             continue;
154           }
155         }
156         int advance = executor.wheels.ticksToCatchUp(ticks, Clock.currStdTime);
157         if (advance > 0) {
158           import std.range : retro;
159           executor.wheels.advance(advance, expiredTimers);
160           // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse
161           foreach(t; expiredTimers.data.retro) {
162             t.dg(TimerTrigger.trigger);
163           }
164           expiredTimers.shrinkTo(0);
165         }
166       }
167     }
168     version (unittest) {
169       if (!executor.queue.empty) {
170         auto work = executor.queue.pop();
171         import std.stdio;
172         writeln("Got unwanted message ", work);
173         assert(0);
174       }
175       assert(executor.wheels.totalTimers == 0, "Still timers left");
176     }
177   }
178 
179   void schedule(VoidDelegate dg) {
180     executor.queue.push(new WorkNode(WorkItem(dg)));
181   }
182 
183   Timer addTimer(TimerDelegate dg, Duration dur) @trusted {
184     import core.atomic : atomicOp;
185     ulong id = executor.nextTimerId.atomicOp!("+=")(1);
186     Timer timer = Timer(dg, id);
187     executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur))));
188     return timer;
189   }
190 
191   void cancelTimer(Timer timer) @trusted {
192     import std.algorithm : find;
193 
194     auto cmd = RemoveTimer(timer);
195     if (isInContext) {
196       if (removeTimer(cmd))
197         return;
198       // if the timer is still in the queue, rewrite the queue node to a Noop
199       auto nodes = executor.queue[].find!((node) {
200           if (!node.payload._is!AddTimer)
201             return false;
202           return node.payload.get!AddTimer.timer.id == timer.id;
203         });
204 
205       if (!nodes.empty) {
206         nodes.front.payload = WorkItem(Noop());
207       }
208     } else
209       executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer))));
210   }
211 
212   void stop() nothrow @trusted {
213     try {
214       executor.queue.push(new WorkNode(WorkItem(null)));
215     } catch (Exception e) {
216       assert(false, e.msg);
217     }
218   }
219 
220   bool isInContext() @trusted {
221     return executor.isInContext;
222   }
223 }
224 
225 private Semaphore localSemaphore() {
226   static Semaphore semaphore;
227   if (semaphore is null)
228     semaphore = new Semaphore();
229   return semaphore;
230 }
231 
232 package void executeInNewThread(VoidFunction fn) @system nothrow {
233   import concurrency.utils : closure;
234   import core.thread : Thread, thread_detachThis, thread_detachInstance;
235   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
236 
237   auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted {
238         fn();
239         version (Posix)
240           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
241       }, fn)).start();
242   try {
243     /*
244       the isDaemon is really only a protecting against a race condition in druntime,
245       which is only introduced because I am detaching the thread, which I need to do
246       if I don't want to leak memory.
247 
248       If the isDaemon fails because the Thread is gone (unlikely) than it can't
249       trigger unwanted behavior the isDaemon is preventing in the first place.
250 
251       So it is fine if the exception is ignored.
252      */
253     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
254   } catch (Exception e) {}
255 }
256 
257 package void executeInNewThread(VoidDelegate fn) @system nothrow {
258   import concurrency.utils : closure;
259   import core.thread : Thread, thread_detachThis, thread_detachInstance;
260   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
261 
262   auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted {
263         fn();
264         version (Posix)
265           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
266       }, fn)).start();
267   try {
268     /*
269       the isDaemon is really only a protecting against a race condition in druntime,
270       which is only introduced because I am detaching the thread, which I need to do
271       if I don't want to leak memory.
272 
273       If the isDaemon fails because the Thread is gone (unlikely) then it can't
274       trigger unwanted behavior the isDaemon is preventing in the first place.
275 
276       So it is fine if the exception is ignored.
277     */
278     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
279   } catch (Exception e) {}
280 }
281 
282 class ThreadExecutor : Executor {
283   void execute(VoidFunction fn) @trusted {
284     executeInNewThread(fn);
285   }
286   void execute(VoidDelegate fn) @trusted {
287     executeInNewThread(fn);
288   }
289   bool isInContext() @safe { return false; }
290 }
291 
292 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) {
293   import core.sync.semaphore;
294   import std.traits;
295 
296   if (executor.isInContext)
297     return work(args);
298 
299   Semaphore semaphore = localSemaphore();
300 
301   alias RT = ReturnType!Work;
302   struct Context {
303     Work work;
304     Args args;
305     Semaphore semaphore;
306     static if (is(RT == void)) {
307       void run() {
308         work(args);
309         semaphore.notify();
310       }
311     } else {
312       RT result;
313       void run() {
314         result = work(args);
315         semaphore.notify();
316       }
317     }
318   }
319   Context c = Context(work, args, semaphore);
320   executor.execute(cast(VoidDelegate)&c.run);
321   semaphore.wait();
322   static if (!is(RT == void)) {
323     return c.result;
324   }
325 }
326 
327 shared static this() {
328   import concurrency.utils : resetScheduler;
329 
330   resetScheduler();
331 }
332 
333 struct ThreadSender {
334   static assert (models!(typeof(this), isSender));
335   alias Value = void;
336   static struct Op(Receiver) {
337     private Receiver receiver;
338     @disable this(ref return scope typeof(this) rhs);
339     @disable this(this);
340     this(Receiver receiver) {
341       this.receiver = receiver;
342     }
343     void start() @trusted nothrow scope {
344       executeInNewThread(cast(VoidDelegate)&run);
345     }
346     void run() @trusted {
347       import concurrency.receiver : setValueOrError;
348       import concurrency.error : clone;
349       import concurrency : parentStopSource;
350 
351       parentStopSource = receiver.getStopToken().source;
352 
353       try {
354         receiver.setValue();
355       } catch (Exception e) {
356         receiver.setError(e);
357       } catch (Throwable t) {
358         receiver.setError(t.clone());
359       }
360 
361       parentStopSource = null;
362     }
363   }
364   auto connect(Receiver)(return Receiver receiver) @safe return scope {
365     // ensure NRVO
366     auto op = Op!(Receiver)(receiver);
367     return op;
368   }
369 }
370 
371 struct StdTaskPool {
372   import std.parallelism : Task, TaskPool;
373   TaskPool pool;
374   @disable this(ref return scope typeof(this) rhs);
375   @disable this(this);
376   this(TaskPool pool) @trusted scope shared {
377     this.pool = cast(shared)pool;
378   }
379   ~this() nothrow @trusted scope {
380     try {
381       pool.finish(true);
382     } catch (Exception e) {
383       // can't really happen
384       assert(0);
385     }
386   }
387   auto getScheduler() return @safe {
388     return StdTaskPoolProtoScheduler(pool);
389   }
390   auto getScheduler() return @trusted shared {
391     return StdTaskPoolProtoScheduler(cast()pool);
392   }
393 }
394 
395 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe {
396   import std.parallelism : TaskPool;
397   return shared StdTaskPool(new TaskPool(nWorkers));
398 }
399 
400 struct StdTaskPoolProtoScheduler {
401   import std.parallelism : TaskPool;
402   TaskPool pool;
403   auto schedule() {
404     return TaskPoolSender(pool);
405   }
406   auto withBaseScheduler(Scheduler)(Scheduler scheduler) {
407     return StdTaskPoolScheduler!(Scheduler)(pool, scheduler);
408   }
409 }
410 
411 private struct StdTaskPoolScheduler(Scheduler) {
412   import std.parallelism : TaskPool;
413   import core.time : Duration;
414   TaskPool pool;
415   Scheduler scheduler;
416   auto schedule() {
417     return TaskPoolSender(pool);
418   }
419   auto scheduleAfter(Duration run) {
420     import concurrency.operations : via;
421     return schedule().via(scheduler.scheduleAfter(run));
422   }
423 }
424 
425 private struct TaskPoolSender {
426   import std.parallelism : Task, TaskPool, scopedTask, task;
427   import std.traits : ReturnType;
428   import concurrency.error : clone;
429   alias Value = void;
430   TaskPool pool;
431   static struct Op(Receiver) {
432     static void setValue(Receiver receiver) @trusted nothrow {
433       import concurrency : parentStopSource;
434       parentStopSource = receiver.getStopToken().source;
435       try {
436         receiver.setValue();
437       } catch (Exception e) {
438         receiver.setError(e);
439       } catch (Throwable t) {
440         receiver.setError(t.clone);
441       }
442       parentStopSource = null;
443     }
444     TaskPool pool;
445     alias TaskType = typeof(task!setValue(Receiver.init));
446     TaskType myTask;
447     @disable this(ref return scope typeof(this) rhs);
448     @disable this(this);
449     this(Receiver receiver, TaskPool pool) @safe return scope {
450       myTask = task!(setValue)(receiver);
451       this.pool = pool;
452     }
453     void start() @trusted nothrow scope {
454       try {
455         pool.put(myTask);
456       } catch (Exception e) {
457         myTask.args[0].setError(e);
458       }
459     }
460   }
461   auto connect(Receiver)(return Receiver receiver) @safe return scope {
462     // ensure NRVO
463     auto op = Op!(Receiver)(receiver, pool);
464     return op;
465   }
466 }