1 module concurrency.thread; 2 3 import concurrency.executor; 4 import concurrency.scheduler; 5 import concurrency.sender; 6 import concepts; 7 import core.sync.semaphore : Semaphore; 8 import mir.algebraic; 9 import concurrency.scheduler : Timer; 10 import core.time : Duration; 11 import concurrency.data.queue.waitable; 12 import concurrency.data.queue.mpsc; 13 14 // we export the getLocalThreadExecutor function so that dynamic libraries 15 // can load it to access the host's localThreadExecutor TLS instance. 16 // Otherwise they would access their own local instance. 17 // should not be called directly by usercode, call `silThreadExecutor` instead. 18 export extern(C) LocalThreadExecutor concurrency_getLocalThreadExecutor() @safe { 19 static LocalThreadExecutor localThreadExecutor; 20 if (localThreadExecutor is null) { 21 localThreadExecutor = new LocalThreadExecutor(); 22 } 23 24 return localThreadExecutor; 25 } 26 27 LocalThreadExecutor getLocalThreadExecutor() @trusted { 28 import concurrency.utils : dynamicLoad; 29 static LocalThreadExecutor localThreadExecutor; 30 if (localThreadExecutor is null) { 31 localThreadExecutor = dynamicLoad!concurrency_getLocalThreadExecutor()(); 32 } 33 return localThreadExecutor; 34 } 35 36 struct AddTimer { 37 Timer timer; 38 Duration dur; 39 } 40 41 struct RemoveTimer { 42 Timer timer; 43 } 44 45 struct Noop {} 46 47 alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer, Noop); // null signifies end 48 49 struct WorkNode { 50 WorkItem payload; 51 shared WorkNode* next; 52 } 53 54 alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode); 55 56 class LocalThreadExecutor : Executor { 57 import core.atomic : atomicOp, atomicStore, atomicLoad, cas; 58 import core.thread : ThreadID; 59 import std.process : thisThreadID; 60 import concurrency.scheduler : Timer; 61 import concurrency.timingwheels; 62 63 static struct Node { 64 VoidDelegate dg; 65 Node* next; 66 } 67 private { 68 ThreadID threadId; 69 WorkQueue queue; 70 TimingWheels!Timer wheels; 71 shared ulong nextTimerId; 72 } 73 74 this() @safe { 75 threadId = thisThreadID; 76 queue = new WorkQueue; 77 } 78 79 void execute(VoidDelegate dg) @trusted { 80 if (isInContext) 81 dg(); 82 else 83 queue.push(new WorkNode(WorkItem(dg))); 84 } 85 86 void execute(VoidFunction fn) @trusted { 87 if (isInContext) 88 fn(); 89 else 90 queue.push(new WorkNode(WorkItem(fn))); 91 } 92 93 bool isInContext() @trusted { 94 return thisThreadID == threadId; 95 } 96 } 97 98 package struct LocalThreadWorker { 99 import core.time : Duration, msecs, hnsecs; 100 import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate; 101 102 private { 103 LocalThreadExecutor executor; 104 } 105 106 this(LocalThreadExecutor e) @safe { 107 executor = e; 108 } 109 110 private bool removeTimer(RemoveTimer cmd) { 111 auto removed = executor.wheels.cancel(cmd.timer); 112 cmd.timer.dg(TimerTrigger.cancel); 113 return removed; 114 } 115 116 void start() @trusted { 117 assert(isInContext); // start can only be called on the thread 118 import std.datetime.systime : Clock; 119 import std.array : Appender; 120 Appender!(Timer[]) expiredTimers; 121 auto ticks = 1.msecs; // represents the granularity 122 executor.wheels.reset(); 123 executor.wheels.init(); 124 bool running = true; 125 while (running) { 126 import std.meta : AliasSeq; 127 alias handlers = AliasSeq!((typeof(null)){running = false;}, 128 (RemoveTimer cmd) => removeTimer(cmd), 129 (AddTimer cmd) { 130 auto real_now = Clock.currStdTime; 131 auto tw_now = executor.wheels.currStdTime(ticks); 132 auto delay = (real_now - tw_now).hnsecs; 133 auto at = (cmd.dur + delay)/ticks; 134 executor.wheels.schedule(cmd.timer, at); 135 }, 136 (VoidFunction fn) => fn(), 137 (VoidDelegate dg) => dg(), 138 (Noop){} 139 ); 140 auto nextTrigger = executor.wheels.timeUntilNextEvent(ticks, Clock.currStdTime); 141 bool handleIt = false; 142 if (nextTrigger.isNull()) { 143 auto work = executor.queue.pop(); 144 if (work is null) 145 continue; 146 work.payload.match!(handlers); 147 } else { 148 if (nextTrigger.get > 0.msecs) { 149 auto work = executor.queue.pop(nextTrigger.get); 150 if (work !is null) { 151 152 work.payload.match!(handlers); 153 continue; 154 } 155 } 156 int advance = executor.wheels.ticksToCatchUp(ticks, Clock.currStdTime); 157 if (advance > 0) { 158 import std.range : retro; 159 executor.wheels.advance(advance, expiredTimers); 160 // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse 161 foreach(t; expiredTimers.data.retro) { 162 t.dg(TimerTrigger.trigger); 163 } 164 expiredTimers.shrinkTo(0); 165 } 166 } 167 } 168 version (unittest) { 169 if (!executor.queue.empty) { 170 auto work = executor.queue.pop(); 171 import std.stdio; 172 writeln("Got unwanted message ", work); 173 assert(0); 174 } 175 assert(executor.wheels.totalTimers == 0, "Still timers left"); 176 } 177 } 178 179 void schedule(VoidDelegate dg) { 180 executor.queue.push(new WorkNode(WorkItem(dg))); 181 } 182 183 Timer addTimer(TimerDelegate dg, Duration dur) @trusted { 184 import core.atomic : atomicOp; 185 ulong id = executor.nextTimerId.atomicOp!("+=")(1); 186 Timer timer = Timer(dg, id); 187 executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur)))); 188 return timer; 189 } 190 191 void cancelTimer(Timer timer) @trusted { 192 import std.algorithm : find; 193 194 auto cmd = RemoveTimer(timer); 195 if (isInContext) { 196 if (removeTimer(cmd)) 197 return; 198 // if the timer is still in the queue, rewrite the queue node to a Noop 199 auto nodes = executor.queue[].find!((node) { 200 if (!node.payload._is!AddTimer) 201 return false; 202 return node.payload.get!AddTimer.timer.id == timer.id; 203 }); 204 205 if (!nodes.empty) { 206 nodes.front.payload = WorkItem(Noop()); 207 } 208 } else 209 executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer)))); 210 } 211 212 void stop() nothrow @trusted { 213 try { 214 executor.queue.push(new WorkNode(WorkItem(null))); 215 } catch (Exception e) { 216 assert(false, e.msg); 217 } 218 } 219 220 bool isInContext() @trusted { 221 return executor.isInContext; 222 } 223 } 224 225 private Semaphore localSemaphore() { 226 static Semaphore semaphore; 227 if (semaphore is null) 228 semaphore = new Semaphore(); 229 return semaphore; 230 } 231 232 package void executeInNewThread(VoidFunction fn) @system nothrow { 233 import concurrency.utils : closure; 234 import core.thread : Thread, thread_detachThis, thread_detachInstance; 235 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 236 237 auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted { 238 fn(); 239 version (Posix) 240 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 241 }, fn)).start(); 242 try { 243 /* 244 the isDaemon is really only a protecting against a race condition in druntime, 245 which is only introduced because I am detaching the thread, which I need to do 246 if I don't want to leak memory. 247 248 If the isDaemon fails because the Thread is gone (unlikely) than it can't 249 trigger unwanted behavior the isDaemon is preventing in the first place. 250 251 So it is fine if the exception is ignored. 252 */ 253 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 254 } catch (Exception e) {} 255 } 256 257 package void executeInNewThread(VoidDelegate fn) @system nothrow { 258 import concurrency.utils : closure; 259 import core.thread : Thread, thread_detachThis, thread_detachInstance; 260 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 261 262 auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted { 263 fn(); 264 version (Posix) 265 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 266 }, fn)).start(); 267 try { 268 /* 269 the isDaemon is really only a protecting against a race condition in druntime, 270 which is only introduced because I am detaching the thread, which I need to do 271 if I don't want to leak memory. 272 273 If the isDaemon fails because the Thread is gone (unlikely) then it can't 274 trigger unwanted behavior the isDaemon is preventing in the first place. 275 276 So it is fine if the exception is ignored. 277 */ 278 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 279 } catch (Exception e) {} 280 } 281 282 class ThreadExecutor : Executor { 283 void execute(VoidFunction fn) @trusted { 284 executeInNewThread(fn); 285 } 286 void execute(VoidDelegate fn) @trusted { 287 executeInNewThread(fn); 288 } 289 bool isInContext() @safe { return false; } 290 } 291 292 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) { 293 import core.sync.semaphore; 294 import std.traits; 295 296 if (executor.isInContext) 297 return work(args); 298 299 Semaphore semaphore = localSemaphore(); 300 301 alias RT = ReturnType!Work; 302 struct Context { 303 Work work; 304 Args args; 305 Semaphore semaphore; 306 static if (is(RT == void)) { 307 void run() { 308 work(args); 309 semaphore.notify(); 310 } 311 } else { 312 RT result; 313 void run() { 314 result = work(args); 315 semaphore.notify(); 316 } 317 } 318 } 319 Context c = Context(work, args, semaphore); 320 executor.execute(cast(VoidDelegate)&c.run); 321 semaphore.wait(); 322 static if (!is(RT == void)) { 323 return c.result; 324 } 325 } 326 327 shared static this() { 328 import concurrency.utils : resetScheduler; 329 330 resetScheduler(); 331 } 332 333 struct ThreadSender { 334 static assert (models!(typeof(this), isSender)); 335 alias Value = void; 336 static struct Op(Receiver) { 337 private Receiver receiver; 338 this(Receiver receiver) { 339 this.receiver = receiver; 340 } 341 void start() @trusted nothrow scope { 342 executeInNewThread(cast(VoidDelegate)&run); 343 } 344 void run() @trusted { 345 import concurrency.receiver : setValueOrError; 346 import concurrency.error : clone; 347 import concurrency : parentStopSource; 348 349 parentStopSource = receiver.getStopToken().source; 350 351 try { 352 receiver.setValue(); 353 } catch (Exception e) { 354 receiver.setError(e); 355 } catch (Throwable t) { 356 receiver.setError(t.clone()); 357 } 358 359 parentStopSource = null; 360 } 361 } 362 auto connect(Receiver)(return Receiver receiver) @safe scope return { 363 // ensure NRVO 364 auto op = Op!(Receiver)(receiver); 365 return op; 366 } 367 } 368 369 struct StdTaskPool { 370 import std.parallelism : Task, TaskPool; 371 TaskPool pool; 372 @disable this(ref return scope typeof(this) rhs); 373 @disable this(this); 374 this(TaskPool pool) @trusted scope shared { 375 this.pool = cast(shared)pool; 376 } 377 ~this() nothrow @trusted scope { 378 try { 379 pool.finish(true); 380 } catch (Exception e) { 381 // can't really happen 382 assert(0); 383 } 384 } 385 auto getScheduler() return @safe { 386 return StdTaskPoolProtoScheduler(pool); 387 } 388 auto getScheduler() return @trusted shared { 389 return StdTaskPoolProtoScheduler(cast()pool); 390 } 391 } 392 393 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe { 394 import std.parallelism : TaskPool; 395 return shared StdTaskPool(new TaskPool(nWorkers)); 396 } 397 398 struct StdTaskPoolProtoScheduler { 399 import std.parallelism : TaskPool; 400 TaskPool pool; 401 auto schedule() { 402 return TaskPoolSender(pool); 403 } 404 auto withBaseScheduler(Scheduler)(Scheduler scheduler) { 405 return StdTaskPoolScheduler!(Scheduler)(pool, scheduler); 406 } 407 } 408 409 private struct StdTaskPoolScheduler(Scheduler) { 410 import std.parallelism : TaskPool; 411 import core.time : Duration; 412 TaskPool pool; 413 Scheduler scheduler; 414 auto schedule() { 415 return TaskPoolSender(pool); 416 } 417 auto scheduleAfter(Duration run) { 418 import concurrency.operations : via; 419 return schedule().via(scheduler.scheduleAfter(run)); 420 } 421 } 422 423 private struct TaskPoolSender { 424 import std.parallelism : Task, TaskPool, scopedTask, task; 425 import std.traits : ReturnType; 426 import concurrency.error : clone; 427 alias Value = void; 428 TaskPool pool; 429 static struct Op(Receiver) { 430 static void setValue(Receiver receiver) @trusted nothrow { 431 import concurrency : parentStopSource; 432 parentStopSource = receiver.getStopToken().source; 433 try { 434 receiver.setValue(); 435 } catch (Exception e) { 436 receiver.setError(e); 437 } catch (Throwable t) { 438 receiver.setError(t.clone); 439 } 440 parentStopSource = null; 441 } 442 TaskPool pool; 443 alias TaskType = typeof(task!setValue(Receiver.init)); 444 TaskType myTask; 445 @disable this(ref return scope typeof(this) rhs); 446 @disable this(this); 447 this(Receiver receiver, TaskPool pool) @safe return scope { 448 myTask = task!(setValue)(receiver); 449 this.pool = pool; 450 } 451 void start() @trusted nothrow scope { 452 try { 453 pool.put(myTask); 454 } catch (Exception e) { 455 myTask.args[0].setError(e); 456 } 457 } 458 } 459 auto connect(Receiver)(return Receiver receiver) @safe scope return { 460 // ensure NRVO 461 auto op = Op!(Receiver)(receiver, pool); 462 return op; 463 } 464 }