1 module concurrency.thread; 2 3 import concurrency.executor; 4 import concurrency.scheduler; 5 import concurrency.sender; 6 import concepts; 7 import core.sync.semaphore : Semaphore; 8 import mir.algebraic; 9 import concurrency.scheduler : Timer; 10 import core.time : Duration; 11 import concurrency.data.queue.waitable; 12 import concurrency.data.queue.mpsc; 13 14 // we export the getLocalThreadExecutor function so that dynamic libraries 15 // can load it to access the host's localThreadExecutor TLS instance. 16 // Otherwise they would access their own local instance. 17 // should not be called directly by usercode, call `silThreadExecutor` instead. 18 export extern(C) LocalThreadExecutor concurrency_getLocalThreadExecutor() @safe { 19 static LocalThreadExecutor localThreadExecutor; 20 if (localThreadExecutor is null) { 21 localThreadExecutor = new LocalThreadExecutor(); 22 } 23 24 return localThreadExecutor; 25 } 26 27 LocalThreadExecutor getLocalThreadExecutor() @trusted { 28 import concurrency.utils : dynamicLoad; 29 static LocalThreadExecutor localThreadExecutor; 30 if (localThreadExecutor is null) { 31 localThreadExecutor = dynamicLoad!concurrency_getLocalThreadExecutor()(); 32 } 33 return localThreadExecutor; 34 } 35 36 private struct AddTimer { 37 Timer timer; 38 Duration dur; 39 } 40 41 private struct RemoveTimer { 42 Timer timer; 43 } 44 45 private struct Noop {} 46 47 private alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer, Noop); // null signifies end 48 49 private struct WorkNode { 50 WorkItem payload; 51 shared WorkNode* next; 52 } 53 54 private alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode); 55 56 class LocalThreadExecutor : Executor { 57 import core.atomic : atomicOp, atomicStore, atomicLoad, cas; 58 import core.thread : ThreadID; 59 import std.process : thisThreadID; 60 import concurrency.scheduler : Timer; 61 import concurrency.timingwheels; 62 63 static struct Node { 64 VoidDelegate dg; 65 Node* next; 66 } 67 private { 68 ThreadID threadId; 69 WorkQueue queue; 70 TimingWheels!Timer wheels; 71 shared ulong nextTimerId; 72 } 73 74 this() @safe { 75 threadId = thisThreadID; 76 queue = new WorkQueue; 77 } 78 79 void execute(VoidDelegate dg) @trusted { 80 if (isInContext) 81 dg(); 82 else 83 queue.push(new WorkNode(WorkItem(dg))); 84 } 85 86 void execute(VoidFunction fn) @trusted { 87 if (isInContext) 88 fn(); 89 else 90 queue.push(new WorkNode(WorkItem(fn))); 91 } 92 93 bool isInContext() @trusted { 94 return thisThreadID == threadId; 95 } 96 } 97 98 package struct LocalThreadWorker { 99 import core.time : Duration, msecs, hnsecs; 100 import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate; 101 102 private { 103 LocalThreadExecutor executor; 104 } 105 106 this(LocalThreadExecutor e) @safe { 107 executor = e; 108 } 109 110 private bool removeTimer(RemoveTimer cmd) { 111 auto removed = executor.wheels.cancel(cmd.timer); 112 cmd.timer.dg(TimerTrigger.cancel); 113 return removed; 114 } 115 116 void start() @trusted { 117 assert(isInContext); // start can only be called on the thread 118 import std.datetime.systime : Clock; 119 import std.array : Appender; 120 Appender!(Timer[]) expiredTimers; 121 auto ticks = 1.msecs; // represents the granularity 122 executor.wheels.reset(); 123 executor.wheels.init(); 124 bool running = true; 125 while (running) { 126 import std.meta : AliasSeq; 127 alias handlers = AliasSeq!((typeof(null)){running = false;}, 128 (RemoveTimer cmd) => removeTimer(cmd), 129 (AddTimer cmd) { 130 auto real_now = Clock.currStdTime; 131 auto tw_now = executor.wheels.currStdTime(ticks); 132 auto delay = (real_now - tw_now).hnsecs; 133 auto at = (cmd.dur + delay)/ticks; 134 executor.wheels.schedule(cmd.timer, at); 135 }, 136 (VoidFunction fn) => fn(), 137 (VoidDelegate dg) => dg(), 138 (Noop){} 139 ); 140 auto nextTrigger = executor.wheels.timeUntilNextEvent(ticks, Clock.currStdTime); 141 bool handleIt = false; 142 if (nextTrigger.isNull()) { 143 auto work = executor.queue.pop(); 144 if (work is null) 145 continue; 146 work.payload.match!(handlers); 147 } else { 148 if (nextTrigger.get > 0.msecs) { 149 auto work = executor.queue.pop(nextTrigger.get); 150 if (work !is null) { 151 152 work.payload.match!(handlers); 153 continue; 154 } 155 } 156 int advance = executor.wheels.ticksToCatchUp(ticks, Clock.currStdTime); 157 if (advance > 0) { 158 import std.range : retro; 159 executor.wheels.advance(advance, expiredTimers); 160 // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse 161 foreach(t; expiredTimers.data.retro) { 162 t.dg(TimerTrigger.trigger); 163 } 164 expiredTimers.shrinkTo(0); 165 } 166 } 167 } 168 version (unittest) { 169 if (!executor.queue.empty) { 170 auto work = executor.queue.pop(); 171 import std.stdio; 172 writeln("Got unwanted message ", work); 173 assert(0); 174 } 175 assert(executor.wheels.totalTimers == 0, "Still timers left"); 176 } 177 } 178 179 void schedule(VoidDelegate dg) { 180 executor.queue.push(new WorkNode(WorkItem(dg))); 181 } 182 183 Timer addTimer(TimerDelegate dg, Duration dur) @trusted { 184 import core.atomic : atomicOp; 185 ulong id = executor.nextTimerId.atomicOp!("+=")(1); 186 Timer timer = Timer(dg, id); 187 executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur)))); 188 return timer; 189 } 190 191 void cancelTimer(Timer timer) @trusted { 192 import std.algorithm : find; 193 194 auto cmd = RemoveTimer(timer); 195 if (isInContext) { 196 if (removeTimer(cmd)) 197 return; 198 // if the timer is still in the queue, rewrite the queue node to a Noop 199 auto nodes = executor.queue[].find!((node) { 200 if (!node.payload._is!AddTimer) 201 return false; 202 return node.payload.get!AddTimer.timer.id == timer.id; 203 }); 204 205 if (!nodes.empty) { 206 nodes.front.payload = WorkItem(Noop()); 207 } 208 } else 209 executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer)))); 210 } 211 212 void stop() nothrow @trusted { 213 try { 214 executor.queue.push(new WorkNode(WorkItem(null))); 215 } catch (Exception e) { 216 assert(false, e.msg); 217 } 218 } 219 220 bool isInContext() @trusted { 221 return executor.isInContext; 222 } 223 } 224 225 private Semaphore localSemaphore() { 226 static Semaphore semaphore; 227 if (semaphore is null) 228 semaphore = new Semaphore(); 229 return semaphore; 230 } 231 232 package void executeInNewThread(VoidFunction fn) @system nothrow { 233 import concurrency.utils : closure; 234 import core.thread : Thread, thread_detachThis, thread_detachInstance; 235 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 236 237 auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted { 238 fn(); 239 version (Posix) 240 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 241 }, fn)).start(); 242 try { 243 /* 244 the isDaemon is really only a protecting against a race condition in druntime, 245 which is only introduced because I am detaching the thread, which I need to do 246 if I don't want to leak memory. 247 248 If the isDaemon fails because the Thread is gone (unlikely) than it can't 249 trigger unwanted behavior the isDaemon is preventing in the first place. 250 251 So it is fine if the exception is ignored. 252 */ 253 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 254 } catch (Exception e) {} 255 } 256 257 package void executeInNewThread(VoidDelegate fn) @system nothrow { 258 import concurrency.utils : closure; 259 import core.thread : Thread, thread_detachThis, thread_detachInstance; 260 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 261 262 auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted { 263 fn(); 264 version (Posix) 265 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 266 }, fn)).start(); 267 try { 268 /* 269 the isDaemon is really only a protecting against a race condition in druntime, 270 which is only introduced because I am detaching the thread, which I need to do 271 if I don't want to leak memory. 272 273 If the isDaemon fails because the Thread is gone (unlikely) then it can't 274 trigger unwanted behavior the isDaemon is preventing in the first place. 275 276 So it is fine if the exception is ignored. 277 */ 278 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 279 } catch (Exception e) {} 280 } 281 282 class ThreadExecutor : Executor { 283 void execute(VoidFunction fn) @trusted { 284 executeInNewThread(fn); 285 } 286 void execute(VoidDelegate fn) @trusted { 287 executeInNewThread(fn); 288 } 289 bool isInContext() @safe { return false; } 290 } 291 292 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) { 293 import core.sync.semaphore; 294 import std.traits; 295 296 if (executor.isInContext) 297 return work(args); 298 299 Semaphore semaphore = localSemaphore(); 300 301 alias RT = ReturnType!Work; 302 struct Context { 303 Work work; 304 Args args; 305 Semaphore semaphore; 306 static if (is(RT == void)) { 307 void run() { 308 work(args); 309 semaphore.notify(); 310 } 311 } else { 312 RT result; 313 void run() { 314 result = work(args); 315 semaphore.notify(); 316 } 317 } 318 } 319 Context c = Context(work, args, semaphore); 320 executor.execute(cast(VoidDelegate)&c.run); 321 semaphore.wait(); 322 static if (!is(RT == void)) { 323 return c.result; 324 } 325 } 326 327 shared static this() { 328 import concurrency.utils : resetScheduler; 329 330 resetScheduler(); 331 } 332 333 struct ThreadSender { 334 static assert (models!(typeof(this), isSender)); 335 alias Value = void; 336 static struct Op(Receiver) { 337 private Receiver receiver; 338 @disable this(ref return scope typeof(this) rhs); 339 @disable this(this); 340 this(Receiver receiver) { 341 this.receiver = receiver; 342 } 343 void start() @trusted nothrow scope { 344 executeInNewThread(cast(VoidDelegate)&run); 345 } 346 void run() @trusted { 347 import concurrency.receiver : setValueOrError; 348 import concurrency.error : clone; 349 import concurrency : parentStopSource; 350 351 parentStopSource = receiver.getStopToken().source; 352 353 try { 354 receiver.setValue(); 355 } catch (Exception e) { 356 receiver.setError(e); 357 } catch (Throwable t) { 358 receiver.setError(t.clone()); 359 } 360 361 parentStopSource = null; 362 } 363 } 364 auto connect(Receiver)(return Receiver receiver) @safe return scope { 365 // ensure NRVO 366 auto op = Op!(Receiver)(receiver); 367 return op; 368 } 369 } 370 371 struct StdTaskPool { 372 import std.parallelism : Task, TaskPool; 373 TaskPool pool; 374 @disable this(ref return scope typeof(this) rhs); 375 @disable this(this); 376 this(TaskPool pool) @trusted scope shared { 377 this.pool = cast(shared)pool; 378 } 379 ~this() nothrow @trusted scope { 380 try { 381 pool.finish(true); 382 } catch (Exception e) { 383 // can't really happen 384 assert(0); 385 } 386 } 387 auto getScheduler() return @safe { 388 return StdTaskPoolProtoScheduler(pool); 389 } 390 auto getScheduler() return @trusted shared { 391 return StdTaskPoolProtoScheduler(cast()pool); 392 } 393 } 394 395 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe { 396 import std.parallelism : TaskPool; 397 return shared StdTaskPool(new TaskPool(nWorkers)); 398 } 399 400 struct StdTaskPoolProtoScheduler { 401 import std.parallelism : TaskPool; 402 TaskPool pool; 403 auto schedule() { 404 return TaskPoolSender(pool); 405 } 406 auto withBaseScheduler(Scheduler)(Scheduler scheduler) { 407 return StdTaskPoolScheduler!(Scheduler)(pool, scheduler); 408 } 409 } 410 411 private struct StdTaskPoolScheduler(Scheduler) { 412 import std.parallelism : TaskPool; 413 import core.time : Duration; 414 TaskPool pool; 415 Scheduler scheduler; 416 auto schedule() { 417 return TaskPoolSender(pool); 418 } 419 auto scheduleAfter(Duration run) { 420 import concurrency.operations : via; 421 return schedule().via(scheduler.scheduleAfter(run)); 422 } 423 } 424 425 private struct TaskPoolSender { 426 import std.parallelism : Task, TaskPool, scopedTask, task; 427 import std.traits : ReturnType; 428 import concurrency.error : clone; 429 alias Value = void; 430 TaskPool pool; 431 static struct Op(Receiver) { 432 static void setValue(Receiver receiver) @trusted nothrow { 433 import concurrency : parentStopSource; 434 parentStopSource = receiver.getStopToken().source; 435 try { 436 receiver.setValue(); 437 } catch (Exception e) { 438 receiver.setError(e); 439 } catch (Throwable t) { 440 receiver.setError(t.clone); 441 } 442 parentStopSource = null; 443 } 444 TaskPool pool; 445 alias TaskType = typeof(task!setValue(Receiver.init)); 446 TaskType myTask; 447 @disable this(ref return scope typeof(this) rhs); 448 @disable this(this); 449 this(Receiver receiver, TaskPool pool) @safe return scope { 450 myTask = task!(setValue)(receiver); 451 this.pool = pool; 452 } 453 void start() @trusted nothrow scope { 454 try { 455 pool.put(myTask); 456 } catch (Exception e) { 457 myTask.args[0].setError(e); 458 } 459 } 460 } 461 auto connect(Receiver)(return Receiver receiver) @safe return scope { 462 // ensure NRVO 463 auto op = Op!(Receiver)(receiver, pool); 464 return op; 465 } 466 }