1 module concurrency.thread; 2 3 import concurrency.executor; 4 import concurrency.scheduler; 5 import concurrency.sender; 6 import concepts; 7 import core.sync.semaphore : Semaphore; 8 9 LocalThreadExecutor getLocalThreadExecutor() @safe { 10 static LocalThreadExecutor localThreadExecutor; 11 if (localThreadExecutor is null) 12 localThreadExecutor = new LocalThreadExecutor(); 13 return localThreadExecutor; 14 } 15 16 class LocalThreadExecutor : Executor { 17 import core.atomic : atomicOp, atomicStore, atomicLoad, cas; 18 import std.concurrency : Tid, thisTid, send, receive; 19 private { 20 Tid tid; 21 } 22 23 this() @safe { 24 tid = thisTid; 25 } 26 27 void execute(VoidDelegate dg) @trusted { 28 version (unittest) { 29 // NOTE: We must call the delegate on the current thread instead of going to the main one 30 // the reason is that there is no nursery running on the main one because we are running unit tests 31 // when SIL becomes multithreaded we can revisit this 32 dg(); 33 } else { 34 if (isInContext) 35 dg(); 36 else 37 (cast() tid).send(dg); 38 } 39 } 40 41 void execute(VoidFunction fn) @trusted { 42 version (unittest) { 43 fn(); 44 } else { 45 if (isInContext) 46 fn(); 47 else 48 (cast() tid).send(fn); 49 } 50 } 51 52 bool isInContext() @trusted { 53 return thisTid == cast()tid; 54 } 55 } 56 57 package struct LocalThreadWorker { 58 import core.time : Duration, msecs, hnsecs; 59 import std.concurrency : Tid, thisTid, send, receive, receiveTimeout; 60 import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate; 61 62 static struct AddTimer { 63 Timer timer; 64 Duration dur; 65 } 66 static struct RemoveTimer { 67 Timer timer; 68 } 69 70 private { 71 Tid tid; 72 shared int counter; 73 shared ulong nextTimerId; 74 } 75 76 this(LocalThreadExecutor e) @safe { 77 this.tid = e.tid; 78 } 79 80 this(Tid tid) @safe { 81 this.tid = tid; 82 } 83 84 void start() @trusted { 85 assert(isInContext); // start can only be called on the thread 86 import concurrency.timingwheels; 87 import std.datetime.systime : Clock; 88 TimingWheels!Timer wheels; 89 auto ticks = 1.msecs; // represents the granularity 90 wheels.init(); 91 bool running = true; 92 auto addTimerHandler = (AddTimer cmd) scope { 93 auto real_now = Clock.currStdTime; 94 auto tw_now = wheels.currStdTime(ticks); 95 auto delay = (real_now - tw_now).hnsecs; 96 auto at = (cmd.dur + delay)/ticks; 97 wheels.schedule(cmd.timer, at); 98 }; 99 auto removeTimerHandler = (RemoveTimer cmd) scope { 100 wheels.cancel(cmd.timer); 101 cmd.timer.dg(TimerTrigger.cancel); 102 }; 103 while (running) { 104 VoidFunction vFunc = null; 105 VoidDelegate vDel = null; 106 import std.meta : AliasSeq; 107 alias handlers = AliasSeq!((VoidFunction fn) => vFunc = fn, 108 (VoidDelegate dg) => vDel = dg, 109 addTimerHandler, 110 removeTimerHandler, 111 (bool _){running = false;} 112 ); 113 auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime); 114 bool handleIt = false; 115 if (nextTrigger.isNull()) { 116 receive(handlers); 117 goto handleIt; 118 } else { 119 if (nextTrigger.get > 0.msecs) { 120 if (receiveTimeout(nextTrigger.get, handlers)) { 121 goto handleIt; 122 } 123 } 124 int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime); 125 if (advance > 0) { 126 auto wr = wheels.advance(advance); 127 foreach(t; wr.timers) { 128 t.dg(TimerTrigger.trigger); 129 } 130 } 131 continue; 132 } 133 handleIt: 134 if (vFunc !is null) { 135 vFunc(); 136 } 137 if (vDel !is null) { 138 vDel(); 139 } 140 } 141 version (unittest) { 142 import std.variant : Variant; 143 import core.time : seconds; 144 while(receiveTimeout(seconds(-1),(Variant v){ 145 import std.stdio; 146 writeln("Got unwanted message ", v); 147 assert(0); 148 })) {} 149 } 150 } 151 152 void schedule(VoidDelegate dg) { 153 tid.send(dg); 154 } 155 156 Timer addTimer(TimerDelegate dg, Duration dur) @trusted { 157 import core.atomic : atomicOp; 158 ulong id = nextTimerId.atomicOp!("+=")(1); 159 Timer timer = Timer(dg, id); 160 tid.send(AddTimer(timer, dur)); 161 return timer; 162 } 163 164 void cancelTimer(Timer timer) @trusted { 165 tid.send(RemoveTimer(timer)); 166 } 167 168 void stop() nothrow @trusted { 169 try { 170 (cast()tid).send(false); 171 } catch (Exception e) { 172 assert(false, e.msg); 173 } 174 } 175 176 bool isInContext() @trusted { 177 return thisTid == cast()tid; 178 } 179 } 180 181 private Semaphore localSemaphore() { 182 static Semaphore semaphore; 183 if (semaphore is null) 184 semaphore = new Semaphore(); 185 return semaphore; 186 } 187 188 package void executeInNewThread(VoidFunction fn) @system nothrow { 189 import concurrency.utils : closure; 190 import core.thread : Thread, thread_detachThis, thread_detachInstance; 191 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 192 193 auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted { 194 fn(); 195 version (Posix) 196 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 197 }, fn)).start(); 198 try { 199 /* 200 the isDaemon is really only a protecting against a race condition in druntime, 201 which is only introduced because I am detaching the thread, which I need to do 202 if I don't want to leak memory. 203 204 If the isDaemon fails because the Thread is gone (unlikely) than it can't 205 trigger unwanted behavior the isDaemon is preventing in the first place. 206 207 So it is fine if the exception is ignored. 208 */ 209 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 210 } catch (Exception e) {} 211 } 212 213 package void executeInNewThread(VoidDelegate fn) @system nothrow { 214 import concurrency.utils : closure; 215 import core.thread : Thread, thread_detachThis, thread_detachInstance; 216 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 217 218 auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted { 219 fn(); 220 version (Posix) 221 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 222 }, fn)).start(); 223 try { 224 /* 225 the isDaemon is really only a protecting against a race condition in druntime, 226 which is only introduced because I am detaching the thread, which I need to do 227 if I don't want to leak memory. 228 229 If the isDaemon fails because the Thread is gone (unlikely) then it can't 230 trigger unwanted behavior the isDaemon is preventing in the first place. 231 232 So it is fine if the exception is ignored. 233 */ 234 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 235 } catch (Exception e) {} 236 } 237 238 class ThreadExecutor : Executor { 239 void execute(VoidFunction fn) @trusted { 240 executeInNewThread(fn); 241 } 242 void execute(VoidDelegate fn) @trusted { 243 executeInNewThread(fn); 244 } 245 bool isInContext() @safe { return false; } 246 } 247 248 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) { 249 import core.sync.semaphore; 250 import std.concurrency; 251 import std.traits; 252 253 if (executor.isInContext) 254 return work(args); 255 256 Semaphore semaphore = localSemaphore(); 257 258 alias RT = ReturnType!Work; 259 struct Context { 260 Work work; 261 Args args; 262 Semaphore semaphore; 263 static if (is(RT == void)) { 264 void run() { 265 work(args); 266 semaphore.notify(); 267 } 268 } else { 269 RT result; 270 void run() { 271 result = work(args); 272 semaphore.notify(); 273 } 274 } 275 } 276 Context c = Context(work, args, semaphore); 277 executor.execute(cast(VoidDelegate)&c.run); 278 semaphore.wait(); 279 static if (!is(RT == void)) { 280 return c.result; 281 } 282 } 283 284 shared static this() { 285 import concurrency.utils : resetScheduler; 286 287 resetScheduler(); 288 } 289 290 struct ThreadSender { 291 static assert (models!(typeof(this), isSender)); 292 alias Value = void; 293 static struct Op(Receiver) { 294 private Receiver receiver; 295 this(Receiver receiver) { 296 this.receiver = receiver; 297 } 298 void start() @trusted nothrow scope { 299 executeInNewThread(cast(VoidDelegate)&run); 300 } 301 void run() @trusted { 302 import concurrency.receiver : setValueOrError; 303 import concurrency.error : clone; 304 import concurrency : parentStopSource; 305 306 parentStopSource = receiver.getStopToken().source; 307 308 try { 309 receiver.setValue(); 310 } catch (Exception e) { 311 receiver.setError(e); 312 } catch (Throwable t) { 313 receiver.setError(t.clone()); 314 } 315 316 parentStopSource = null; 317 } 318 } 319 auto connect(Receiver)(return Receiver receiver) @safe scope return { 320 // ensure NRVO 321 auto op = Op!(Receiver)(receiver); 322 return op; 323 } 324 } 325 326 struct StdTaskPool { 327 import std.parallelism : Task, TaskPool; 328 TaskPool pool; 329 @disable this(ref return scope typeof(this) rhs); 330 @disable this(this); 331 this(TaskPool pool) @trusted scope shared { 332 this.pool = cast(shared)pool; 333 } 334 ~this() nothrow @trusted scope { 335 try { 336 pool.finish(true); 337 } catch (Exception e) { 338 // can't really happen 339 assert(0); 340 } 341 } 342 auto getScheduler() scope @safe { 343 return StdTaskPoolProtoScheduler(pool); 344 } 345 auto getScheduler() scope @trusted shared { 346 return StdTaskPoolProtoScheduler(cast()pool); 347 } 348 } 349 350 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe { 351 import std.parallelism : TaskPool; 352 return shared StdTaskPool(new TaskPool(nWorkers)); 353 } 354 355 struct StdTaskPoolProtoScheduler { 356 import std.parallelism : TaskPool; 357 TaskPool pool; 358 auto schedule() { 359 return TaskPoolSender(pool); 360 } 361 auto withBaseScheduler(Scheduler)(Scheduler scheduler) { 362 return StdTaskPoolScheduler!(Scheduler)(pool, scheduler); 363 } 364 } 365 366 private struct StdTaskPoolScheduler(Scheduler) { 367 import std.parallelism : TaskPool; 368 import core.time : Duration; 369 TaskPool pool; 370 Scheduler scheduler; 371 auto schedule() { 372 return TaskPoolSender(pool); 373 } 374 auto scheduleAfter(Duration run) { 375 import concurrency.operations : via; 376 return schedule().via(scheduler.scheduleAfter(run)); 377 } 378 } 379 380 private struct TaskPoolSender { 381 import std.parallelism : Task, TaskPool, scopedTask, task; 382 import std.traits : ReturnType; 383 import concurrency.error : clone; 384 alias Value = void; 385 TaskPool pool; 386 static struct Op(Receiver) { 387 static void setValue(Receiver receiver) @trusted nothrow { 388 import concurrency : parentStopSource; 389 parentStopSource = receiver.getStopToken().source; 390 try { 391 receiver.setValue(); 392 } catch (Exception e) { 393 receiver.setError(e); 394 } catch (Throwable t) { 395 receiver.setError(t.clone); 396 } 397 parentStopSource = null; 398 } 399 TaskPool pool; 400 alias TaskType = typeof(task!setValue(Receiver.init)); 401 TaskType myTask; 402 @disable this(ref return scope typeof(this) rhs); 403 @disable this(this); 404 this(Receiver receiver, TaskPool pool) @safe return scope { 405 myTask = task!(setValue)(receiver); 406 this.pool = pool; 407 } 408 void start() @trusted nothrow scope { 409 try { 410 pool.put(myTask); 411 } catch (Exception e) { 412 myTask.args[0].setError(e); 413 } 414 } 415 } 416 auto connect(Receiver)(return Receiver receiver) @safe scope return { 417 // ensure NRVO 418 auto op = Op!(Receiver)(receiver, pool); 419 return op; 420 } 421 }