1 module concurrency.thread;
2 
3 import concurrency.executor;
4 import concurrency.scheduler;
5 import concurrency.sender;
6 import concepts;
7 import core.sync.semaphore : Semaphore;
8 import mir.algebraic;
9 import concurrency.scheduler : Timer;
10 import core.time : Duration;
11 import concurrency.data.queue.waitable;
12 import concurrency.data.queue.mpsc;
13 
14 // we export the getLocalThreadExecutor function so that dynamic libraries
15 // can load it to access the host's localThreadExecutor TLS instance.
16 // Otherwise they would access their own local instance.
17 // should not be called directly by usercode, call `silThreadExecutor` instead.
18 export extern(C) LocalThreadExecutor concurrency_getLocalThreadExecutor() @safe {
19   static LocalThreadExecutor localThreadExecutor;
20   if (localThreadExecutor is null) {
21     localThreadExecutor = new LocalThreadExecutor();
22   }
23 
24   return localThreadExecutor;
25 }
26 
27 LocalThreadExecutor getLocalThreadExecutor() @trusted {
28   import concurrency.utils : dynamicLoad;
29   static LocalThreadExecutor localThreadExecutor;
30   if (localThreadExecutor is null) {
31     localThreadExecutor = dynamicLoad!concurrency_getLocalThreadExecutor()();
32   }
33   return localThreadExecutor;
34 }
35 
36 struct AddTimer {
37   Timer timer;
38   Duration dur;
39 }
40 
41 struct RemoveTimer {
42   Timer timer;
43 }
44 
45 struct Noop {}
46 
47 alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer, Noop); // null signifies end
48 
49 struct WorkNode {
50   WorkItem payload;
51   shared WorkNode* next;
52 }
53 
54 alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode);
55 
56 class LocalThreadExecutor : Executor {
57   import core.atomic : atomicOp, atomicStore, atomicLoad, cas;
58   import core.thread : ThreadID;
59   import std.process : thisThreadID;
60   import concurrency.scheduler : Timer;
61   import concurrency.timingwheels;
62 
63   static struct Node {
64     VoidDelegate dg;
65     Node* next;
66   }
67   private {
68     ThreadID threadId;
69     WorkQueue queue;
70     TimingWheels!Timer wheels;
71     shared ulong nextTimerId;
72   }
73 
74   this() @safe {
75     threadId = thisThreadID;
76     queue = new WorkQueue;
77   }
78 
79   void execute(VoidDelegate dg) @trusted {
80     if (isInContext)
81       dg();
82     else
83       queue.push(new WorkNode(WorkItem(dg)));
84   }
85 
86   void execute(VoidFunction fn) @trusted {
87     if (isInContext)
88       fn();
89     else
90       queue.push(new WorkNode(WorkItem(fn)));
91   }
92 
93   bool isInContext() @trusted {
94     return thisThreadID == threadId;
95   }
96 }
97 
98 package struct LocalThreadWorker {
99   import core.time : Duration, msecs, hnsecs;
100   import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate;
101 
102   private {
103     LocalThreadExecutor executor;
104   }
105 
106   this(LocalThreadExecutor e) @safe {
107     executor = e;
108   }
109 
110   private bool removeTimer(RemoveTimer cmd) {
111     auto removed = executor.wheels.cancel(cmd.timer);
112     cmd.timer.dg(TimerTrigger.cancel);
113     return removed;
114   }
115 
116   void start() @trusted {
117     assert(isInContext); // start can only be called on the thread
118     import std.datetime.systime : Clock;
119     import std.array : Appender;
120     Appender!(Timer[]) expiredTimers;
121     auto ticks = 1.msecs; // represents the granularity
122     executor.wheels.reset();
123     executor.wheels.init();
124     bool running = true;
125     while (running) {
126       import std.meta : AliasSeq;
127       alias handlers = AliasSeq!((typeof(null)){running = false;},
128                                  (RemoveTimer cmd) => removeTimer(cmd),
129                                  (AddTimer cmd) {
130                                    auto real_now = Clock.currStdTime;
131                                    auto tw_now = executor.wheels.currStdTime(ticks);
132                                    auto delay = (real_now - tw_now).hnsecs;
133                                    auto at = (cmd.dur + delay)/ticks;
134                                    executor.wheels.schedule(cmd.timer, at);
135                                  },
136                                  (VoidFunction fn) => fn(),
137                                  (VoidDelegate dg) => dg(),
138                                  (Noop){}
139                                  );
140       auto nextTrigger = executor.wheels.timeUntilNextEvent(ticks, Clock.currStdTime);
141       bool handleIt = false;
142       if (nextTrigger.isNull()) {
143         auto work = executor.queue.pop();
144         if (work is null)
145           continue;
146         work.payload.match!(handlers);
147       } else {
148         if (nextTrigger.get > 0.msecs) {
149           auto work = executor.queue.pop(nextTrigger.get);
150           if (work !is null) {
151 
152             work.payload.match!(handlers);
153             continue;
154           }
155         }
156         int advance = executor.wheels.ticksToCatchUp(ticks, Clock.currStdTime);
157         if (advance > 0) {
158           import std.range : retro;
159           executor.wheels.advance(advance, expiredTimers);
160           // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse
161           foreach(t; expiredTimers.data.retro) {
162             t.dg(TimerTrigger.trigger);
163           }
164           expiredTimers.shrinkTo(0);
165         }
166       }
167     }
168     version (unittest) {
169       if (!executor.queue.empty) {
170         auto work = executor.queue.pop();
171         import std.stdio;
172         writeln("Got unwanted message ", work);
173         assert(0);
174       }
175       assert(executor.wheels.totalTimers == 0, "Still timers left");
176     }
177   }
178 
179   void schedule(VoidDelegate dg) {
180     executor.queue.push(new WorkNode(WorkItem(dg)));
181   }
182 
183   Timer addTimer(TimerDelegate dg, Duration dur) @trusted {
184     import core.atomic : atomicOp;
185     ulong id = executor.nextTimerId.atomicOp!("+=")(1);
186     Timer timer = Timer(dg, id);
187     executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur))));
188     return timer;
189   }
190 
191   void cancelTimer(Timer timer) @trusted {
192     import std.algorithm : find;
193 
194     auto cmd = RemoveTimer(timer);
195     if (isInContext) {
196       if (removeTimer(cmd))
197         return;
198       // if the timer is still in the queue, rewrite the queue node to a Noop
199       auto nodes = executor.queue[].find!((node) {
200           if (!node.payload._is!AddTimer)
201             return false;
202           return node.payload.get!AddTimer.timer.id == timer.id;
203         });
204 
205       if (!nodes.empty) {
206         nodes.front.payload = WorkItem(Noop());
207       }
208     } else
209       executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer))));
210   }
211 
212   void stop() nothrow @trusted {
213     try {
214       executor.queue.push(new WorkNode(WorkItem(null)));
215     } catch (Exception e) {
216       assert(false, e.msg);
217     }
218   }
219 
220   bool isInContext() @trusted {
221     return executor.isInContext;
222   }
223 }
224 
225 private Semaphore localSemaphore() {
226   static Semaphore semaphore;
227   if (semaphore is null)
228     semaphore = new Semaphore();
229   return semaphore;
230 }
231 
232 package void executeInNewThread(VoidFunction fn) @system nothrow {
233   import concurrency.utils : closure;
234   import core.thread : Thread, thread_detachThis, thread_detachInstance;
235   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
236 
237   auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted {
238         fn();
239         version (Posix)
240           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
241       }, fn)).start();
242   try {
243     /*
244       the isDaemon is really only a protecting against a race condition in druntime,
245       which is only introduced because I am detaching the thread, which I need to do
246       if I don't want to leak memory.
247 
248       If the isDaemon fails because the Thread is gone (unlikely) than it can't
249       trigger unwanted behavior the isDaemon is preventing in the first place.
250 
251       So it is fine if the exception is ignored.
252      */
253     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
254   } catch (Exception e) {}
255 }
256 
257 package void executeInNewThread(VoidDelegate fn) @system nothrow {
258   import concurrency.utils : closure;
259   import core.thread : Thread, thread_detachThis, thread_detachInstance;
260   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
261 
262   auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted {
263         fn();
264         version (Posix)
265           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
266       }, fn)).start();
267   try {
268     /*
269       the isDaemon is really only a protecting against a race condition in druntime,
270       which is only introduced because I am detaching the thread, which I need to do
271       if I don't want to leak memory.
272 
273       If the isDaemon fails because the Thread is gone (unlikely) then it can't
274       trigger unwanted behavior the isDaemon is preventing in the first place.
275 
276       So it is fine if the exception is ignored.
277     */
278     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
279   } catch (Exception e) {}
280 }
281 
282 class ThreadExecutor : Executor {
283   void execute(VoidFunction fn) @trusted {
284     executeInNewThread(fn);
285   }
286   void execute(VoidDelegate fn) @trusted {
287     executeInNewThread(fn);
288   }
289   bool isInContext() @safe { return false; }
290 }
291 
292 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) {
293   import core.sync.semaphore;
294   import std.traits;
295 
296   if (executor.isInContext)
297     return work(args);
298 
299   Semaphore semaphore = localSemaphore();
300 
301   alias RT = ReturnType!Work;
302   struct Context {
303     Work work;
304     Args args;
305     Semaphore semaphore;
306     static if (is(RT == void)) {
307       void run() {
308         work(args);
309         semaphore.notify();
310       }
311     } else {
312       RT result;
313       void run() {
314         result = work(args);
315         semaphore.notify();
316       }
317     }
318   }
319   Context c = Context(work, args, semaphore);
320   executor.execute(cast(VoidDelegate)&c.run);
321   semaphore.wait();
322   static if (!is(RT == void)) {
323     return c.result;
324   }
325 }
326 
327 shared static this() {
328   import concurrency.utils : resetScheduler;
329 
330   resetScheduler();
331 }
332 
333 struct ThreadSender {
334   static assert (models!(typeof(this), isSender));
335   alias Value = void;
336   static struct Op(Receiver) {
337     private Receiver receiver;
338     this(Receiver receiver) {
339       this.receiver = receiver;
340     }
341     void start() @trusted nothrow scope {
342       executeInNewThread(cast(VoidDelegate)&run);
343     }
344     void run() @trusted {
345       import concurrency.receiver : setValueOrError;
346       import concurrency.error : clone;
347       import concurrency : parentStopSource;
348 
349       parentStopSource = receiver.getStopToken().source;
350 
351       try {
352         receiver.setValue();
353       } catch (Exception e) {
354         receiver.setError(e);
355       } catch (Throwable t) {
356         receiver.setError(t.clone());
357       }
358 
359       parentStopSource = null;
360     }
361   }
362   auto connect(Receiver)(return Receiver receiver) @safe scope return {
363     // ensure NRVO
364     auto op = Op!(Receiver)(receiver);
365     return op;
366   }
367 }
368 
369 struct StdTaskPool {
370   import std.parallelism : Task, TaskPool;
371   TaskPool pool;
372   @disable this(ref return scope typeof(this) rhs);
373   @disable this(this);
374   this(TaskPool pool) @trusted scope shared {
375     this.pool = cast(shared)pool;
376   }
377   ~this() nothrow @trusted scope {
378     try {
379       pool.finish(true);
380     } catch (Exception e) {
381       // can't really happen
382       assert(0);
383     }
384   }
385   auto getScheduler() return @safe {
386     return StdTaskPoolProtoScheduler(pool);
387   }
388   auto getScheduler() return @trusted shared {
389     return StdTaskPoolProtoScheduler(cast()pool);
390   }
391 }
392 
393 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe {
394   import std.parallelism : TaskPool;
395   return shared StdTaskPool(new TaskPool(nWorkers));
396 }
397 
398 struct StdTaskPoolProtoScheduler {
399   import std.parallelism : TaskPool;
400   TaskPool pool;
401   auto schedule() {
402     return TaskPoolSender(pool);
403   }
404   auto withBaseScheduler(Scheduler)(Scheduler scheduler) {
405     return StdTaskPoolScheduler!(Scheduler)(pool, scheduler);
406   }
407 }
408 
409 private struct StdTaskPoolScheduler(Scheduler) {
410   import std.parallelism : TaskPool;
411   import core.time : Duration;
412   TaskPool pool;
413   Scheduler scheduler;
414   auto schedule() {
415     return TaskPoolSender(pool);
416   }
417   auto scheduleAfter(Duration run) {
418     import concurrency.operations : via;
419     return schedule().via(scheduler.scheduleAfter(run));
420   }
421 }
422 
423 private struct TaskPoolSender {
424   import std.parallelism : Task, TaskPool, scopedTask, task;
425   import std.traits : ReturnType;
426   import concurrency.error : clone;
427   alias Value = void;
428   TaskPool pool;
429   static struct Op(Receiver) {
430     static void setValue(Receiver receiver) @trusted nothrow {
431       import concurrency : parentStopSource;
432       parentStopSource = receiver.getStopToken().source;
433       try {
434         receiver.setValue();
435       } catch (Exception e) {
436         receiver.setError(e);
437       } catch (Throwable t) {
438         receiver.setError(t.clone);
439       }
440       parentStopSource = null;
441     }
442     TaskPool pool;
443     alias TaskType = typeof(task!setValue(Receiver.init));
444     TaskType myTask;
445     @disable this(ref return scope typeof(this) rhs);
446     @disable this(this);
447     this(Receiver receiver, TaskPool pool) @safe return scope {
448       myTask = task!(setValue)(receiver);
449       this.pool = pool;
450     }
451     void start() @trusted nothrow scope {
452       try {
453         pool.put(myTask);
454       } catch (Exception e) {
455         myTask.args[0].setError(e);
456       }
457     }
458   }
459   auto connect(Receiver)(return Receiver receiver) @safe scope return {
460     // ensure NRVO
461     auto op = Op!(Receiver)(receiver, pool);
462     return op;
463   }
464 }