1 module concurrency.thread;
2 
3 import concurrency.executor;
4 import concurrency.scheduler;
5 import concurrency.sender;
6 import concepts;
7 import core.sync.semaphore : Semaphore;
8 import mir.algebraic;
9 import concurrency.scheduler : Timer;
10 import core.time : Duration;
11 import concurrency.data.queue.waitable;
12 import concurrency.data.queue.mpsc;
13 
14 LocalThreadExecutor getLocalThreadExecutor() @safe {
15   static LocalThreadExecutor localThreadExecutor;
16   if (localThreadExecutor is null)
17     localThreadExecutor = new LocalThreadExecutor();
18   return localThreadExecutor;
19 }
20 
21 struct AddTimer {
22   Timer timer;
23   Duration dur;
24 }
25 
26 struct RemoveTimer {
27   Timer timer;
28 }
29 
30 alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer); // null signifies end
31 
32 struct WorkNode {
33   WorkItem payload;
34   WorkNode* next;
35 }
36 
37 alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode);
38 
39 class LocalThreadExecutor : Executor {
40   import core.atomic : atomicOp, atomicStore, atomicLoad, cas;
41   import core.thread : ThreadID;
42   import std.process : thisThreadID;
43   static struct Node {
44     VoidDelegate dg;
45     Node* next;
46   }
47   private {
48     ThreadID threadId;
49     WorkQueue queue;
50   }
51 
52   this() @safe {
53     // todo: probably use thisThreadID
54     threadId = thisThreadID;
55     queue = new WorkQueue;
56   }
57 
58   void execute(VoidDelegate dg) @trusted {
59     if (isInContext)
60       dg();
61     else
62       queue.push(new WorkNode(WorkItem(dg)));
63   }
64 
65   void execute(VoidFunction fn) @trusted {
66     if (isInContext)
67       fn();
68     else
69       queue.push(new WorkNode(WorkItem(fn)));
70   }
71 
72   bool isInContext() @trusted {
73     return thisThreadID == threadId;
74   }
75 }
76 
77 package struct LocalThreadWorker {
78   import core.time : Duration, msecs, hnsecs;
79   import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate;
80 
81   private {
82     shared int counter;
83     static shared ulong nextTimerId;
84     LocalThreadExecutor executor;
85   }
86 
87   this(LocalThreadExecutor e) @safe {
88     executor = e;
89   }
90 
91   void start() @trusted {
92     assert(isInContext); // start can only be called on the thread
93     import concurrency.timingwheels;
94     import std.datetime.systime : Clock;
95     import std.array : Appender;
96     Appender!(Timer[]) expiredTimers;
97     TimingWheels!Timer wheels;
98     auto ticks = 1.msecs; // represents the granularity
99     wheels.init();
100     bool running = true;
101     while (running) {
102       import std.meta : AliasSeq;
103       alias handlers = AliasSeq!((typeof(null)){running = false;},
104                                  (RemoveTimer cmd) {
105                                    wheels.cancel(cmd.timer);
106                                    cmd.timer.dg(TimerTrigger.cancel);
107                                  },
108                                  (AddTimer cmd) {
109                                    auto real_now = Clock.currStdTime;
110                                    auto tw_now = wheels.currStdTime(ticks);
111                                    auto delay = (real_now - tw_now).hnsecs;
112                                    auto at = (cmd.dur + delay)/ticks;
113                                    wheels.schedule(cmd.timer, at);
114                                  },
115                                  (VoidFunction fn) => fn(),
116                                  (VoidDelegate dg) => dg()
117                                  );
118       auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime);
119       bool handleIt = false;
120       if (nextTrigger.isNull()) {
121         auto work = executor.queue.pop();
122         if (work is null)
123           continue;
124         work.payload.match!(handlers);
125       } else {
126         if (nextTrigger.get > 0.msecs) {
127           auto work = executor.queue.pop(nextTrigger.get);
128           if (work !is null) {
129 
130             work.payload.match!(handlers);
131             continue;
132           }
133         }
134         int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime);
135         if (advance > 0) {
136           import std.range : retro;
137           wheels.advance(advance, expiredTimers);
138           // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse
139           foreach(t; expiredTimers.data.retro) {
140             t.dg(TimerTrigger.trigger);
141           }
142           expiredTimers.shrinkTo(0);
143         }
144       }
145     }
146     version (unittest) {
147       if (!executor.queue.empty) {
148         auto work = executor.queue.pop();
149         import std.stdio;
150         writeln("Got unwanted message ", work);
151         assert(0);
152       }
153     }
154   }
155 
156   void schedule(VoidDelegate dg) {
157     executor.queue.push(new WorkNode(WorkItem(dg)));
158   }
159 
160   Timer addTimer(TimerDelegate dg, Duration dur) @trusted {
161     import core.atomic : atomicOp;
162     ulong id = nextTimerId.atomicOp!("+=")(1);
163     Timer timer = Timer(dg, id);
164     executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur))));
165     return timer;
166   }
167 
168   void cancelTimer(Timer timer) @trusted {
169     executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer))));
170   }
171 
172   void stop() nothrow @trusted {
173     try {
174       executor.queue.push(new WorkNode(WorkItem(null)));
175     } catch (Exception e) {
176       assert(false, e.msg);
177     }
178   }
179 
180   bool isInContext() @trusted {
181     return executor.isInContext;
182   }
183 }
184 
185 private Semaphore localSemaphore() {
186   static Semaphore semaphore;
187   if (semaphore is null)
188     semaphore = new Semaphore();
189   return semaphore;
190 }
191 
192 package void executeInNewThread(VoidFunction fn) @system nothrow {
193   import concurrency.utils : closure;
194   import core.thread : Thread, thread_detachThis, thread_detachInstance;
195   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
196 
197   auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted {
198         fn();
199         version (Posix)
200           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
201       }, fn)).start();
202   try {
203     /*
204       the isDaemon is really only a protecting against a race condition in druntime,
205       which is only introduced because I am detaching the thread, which I need to do
206       if I don't want to leak memory.
207 
208       If the isDaemon fails because the Thread is gone (unlikely) than it can't
209       trigger unwanted behavior the isDaemon is preventing in the first place.
210 
211       So it is fine if the exception is ignored.
212      */
213     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
214   } catch (Exception e) {}
215 }
216 
217 package void executeInNewThread(VoidDelegate fn) @system nothrow {
218   import concurrency.utils : closure;
219   import core.thread : Thread, thread_detachThis, thread_detachInstance;
220   version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self;
221 
222   auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted {
223         fn();
224         version (Posix)
225           pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3
226       }, fn)).start();
227   try {
228     /*
229       the isDaemon is really only a protecting against a race condition in druntime,
230       which is only introduced because I am detaching the thread, which I need to do
231       if I don't want to leak memory.
232 
233       If the isDaemon fails because the Thread is gone (unlikely) then it can't
234       trigger unwanted behavior the isDaemon is preventing in the first place.
235 
236       So it is fine if the exception is ignored.
237     */
238     t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting)
239   } catch (Exception e) {}
240 }
241 
242 class ThreadExecutor : Executor {
243   void execute(VoidFunction fn) @trusted {
244     executeInNewThread(fn);
245   }
246   void execute(VoidDelegate fn) @trusted {
247     executeInNewThread(fn);
248   }
249   bool isInContext() @safe { return false; }
250 }
251 
252 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) {
253   import core.sync.semaphore;
254   import std.traits;
255 
256   if (executor.isInContext)
257     return work(args);
258 
259   Semaphore semaphore = localSemaphore();
260 
261   alias RT = ReturnType!Work;
262   struct Context {
263     Work work;
264     Args args;
265     Semaphore semaphore;
266     static if (is(RT == void)) {
267       void run() {
268         work(args);
269         semaphore.notify();
270       }
271     } else {
272       RT result;
273       void run() {
274         result = work(args);
275         semaphore.notify();
276       }
277     }
278   }
279   Context c = Context(work, args, semaphore);
280   executor.execute(cast(VoidDelegate)&c.run);
281   semaphore.wait();
282   static if (!is(RT == void)) {
283     return c.result;
284   }
285 }
286 
287 shared static this() {
288   import concurrency.utils : resetScheduler;
289 
290   resetScheduler();
291 }
292 
293 struct ThreadSender {
294   static assert (models!(typeof(this), isSender));
295   alias Value = void;
296   static struct Op(Receiver) {
297     private Receiver receiver;
298     this(Receiver receiver) {
299       this.receiver = receiver;
300     }
301     void start() @trusted nothrow scope {
302       executeInNewThread(cast(VoidDelegate)&run);
303     }
304     void run() @trusted {
305       import concurrency.receiver : setValueOrError;
306       import concurrency.error : clone;
307       import concurrency : parentStopSource;
308 
309       parentStopSource = receiver.getStopToken().source;
310 
311       try {
312         receiver.setValue();
313       } catch (Exception e) {
314         receiver.setError(e);
315       } catch (Throwable t) {
316         receiver.setError(t.clone());
317       }
318 
319       parentStopSource = null;
320     }
321   }
322   auto connect(Receiver)(return Receiver receiver) @safe scope return {
323     // ensure NRVO
324     auto op = Op!(Receiver)(receiver);
325     return op;
326   }
327 }
328 
329 struct StdTaskPool {
330   import std.parallelism : Task, TaskPool;
331   TaskPool pool;
332   @disable this(ref return scope typeof(this) rhs);
333   @disable this(this);
334   this(TaskPool pool) @trusted scope shared {
335     this.pool = cast(shared)pool;
336   }
337   ~this() nothrow @trusted scope {
338     try {
339       pool.finish(true);
340     } catch (Exception e) {
341       // can't really happen
342       assert(0);
343     }
344   }
345   auto getScheduler() scope @safe {
346     return StdTaskPoolProtoScheduler(pool);
347   }
348   auto getScheduler() scope @trusted shared {
349     return StdTaskPoolProtoScheduler(cast()pool);
350   }
351 }
352 
353 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe {
354   import std.parallelism : TaskPool;
355   return shared StdTaskPool(new TaskPool(nWorkers));
356 }
357 
358 struct StdTaskPoolProtoScheduler {
359   import std.parallelism : TaskPool;
360   TaskPool pool;
361   auto schedule() {
362     return TaskPoolSender(pool);
363   }
364   auto withBaseScheduler(Scheduler)(Scheduler scheduler) {
365     return StdTaskPoolScheduler!(Scheduler)(pool, scheduler);
366   }
367 }
368 
369 private struct StdTaskPoolScheduler(Scheduler) {
370   import std.parallelism : TaskPool;
371   import core.time : Duration;
372   TaskPool pool;
373   Scheduler scheduler;
374   auto schedule() {
375     return TaskPoolSender(pool);
376   }
377   auto scheduleAfter(Duration run) {
378     import concurrency.operations : via;
379     return schedule().via(scheduler.scheduleAfter(run));
380   }
381 }
382 
383 private struct TaskPoolSender {
384   import std.parallelism : Task, TaskPool, scopedTask, task;
385   import std.traits : ReturnType;
386   import concurrency.error : clone;
387   alias Value = void;
388   TaskPool pool;
389   static struct Op(Receiver) {
390     static void setValue(Receiver receiver) @trusted nothrow {
391       import concurrency : parentStopSource;
392       parentStopSource = receiver.getStopToken().source;
393       try {
394         receiver.setValue();
395       } catch (Exception e) {
396         receiver.setError(e);
397       } catch (Throwable t) {
398         receiver.setError(t.clone);
399       }
400       parentStopSource = null;
401     }
402     TaskPool pool;
403     alias TaskType = typeof(task!setValue(Receiver.init));
404     TaskType myTask;
405     @disable this(ref return scope typeof(this) rhs);
406     @disable this(this);
407     this(Receiver receiver, TaskPool pool) @safe return scope {
408       myTask = task!(setValue)(receiver);
409       this.pool = pool;
410     }
411     void start() @trusted nothrow scope {
412       try {
413         pool.put(myTask);
414       } catch (Exception e) {
415         myTask.args[0].setError(e);
416       }
417     }
418   }
419   auto connect(Receiver)(return Receiver receiver) @safe scope return {
420     // ensure NRVO
421     auto op = Op!(Receiver)(receiver, pool);
422     return op;
423   }
424 }