1 module concurrency.thread; 2 3 import concurrency.executor; 4 import concurrency.scheduler; 5 import concurrency.sender; 6 import concepts; 7 import core.sync.semaphore : Semaphore; 8 9 LocalThreadExecutor getLocalThreadExecutor() @safe { 10 static LocalThreadExecutor localThreadExecutor; 11 if (localThreadExecutor is null) 12 localThreadExecutor = new LocalThreadExecutor(); 13 return localThreadExecutor; 14 } 15 16 class LocalThreadExecutor : Executor { 17 import core.atomic : atomicOp, atomicStore, atomicLoad, cas; 18 import std.concurrency : Tid, thisTid, send, receive; 19 private { 20 Tid tid; 21 } 22 23 this() @safe { 24 tid = thisTid; 25 } 26 27 void execute(VoidDelegate dg) @trusted { 28 version (unittest) { 29 // NOTE: We must call the delegate on the current thread instead of going to the main one 30 // the reason is that there is no nursery running on the main one because we are running unit tests 31 // when SIL becomes multithreaded we can revisit this 32 dg(); 33 } else { 34 if (isInContext) 35 dg(); 36 else 37 (cast() tid).send(dg); 38 } 39 } 40 41 void execute(VoidFunction fn) @trusted { 42 version (unittest) { 43 fn(); 44 } else { 45 if (isInContext) 46 fn(); 47 else 48 (cast() tid).send(fn); 49 } 50 } 51 52 bool isInContext() @trusted { 53 return thisTid == cast()tid; 54 } 55 } 56 57 package struct LocalThreadWorker { 58 import core.time : Duration, msecs, hnsecs; 59 import std.concurrency : Tid, thisTid, send, receive, receiveTimeout; 60 61 static struct AddTimer { 62 Timer timer; 63 Duration dur; 64 } 65 static struct Timer { 66 VoidDelegate dg; 67 ulong id_; 68 ulong id() { return id_; } 69 } 70 static struct RemoveTimer { 71 Timer timer; 72 shared Semaphore semaphore; 73 } 74 75 private { 76 Tid tid; 77 shared int counter; 78 shared ulong nextTimerId; 79 } 80 81 this(LocalThreadExecutor e) @safe { 82 this.tid = e.tid; 83 } 84 85 this(Tid tid) @safe { 86 this.tid = tid; 87 } 88 89 void start() @trusted { 90 assert(isInContext); // start can only be called on the thread 91 import concurrency.timingwheels; 92 import std.datetime.systime : Clock; 93 TimingWheels!Timer wheels; 94 auto ticks = 1.msecs; // represents the granularity 95 wheels.init(); 96 bool running = true; 97 auto addTimerHandler = (AddTimer cmd) scope { 98 auto real_now = Clock.currStdTime; 99 auto tw_now = wheels.currStdTime(ticks); 100 auto delay = (real_now - tw_now).hnsecs; 101 auto at = (cmd.dur + delay)/ticks; 102 wheels.schedule(cmd.timer, at); 103 }; 104 auto removeTimerHandler = (RemoveTimer cmd) scope { 105 wheels.cancel(cmd.timer); 106 if (cmd.semaphore !is null) 107 (cast()cmd.semaphore).notify; 108 }; 109 while (running) { 110 VoidFunction vFunc = null; 111 VoidDelegate vDel = null; 112 import std.meta : AliasSeq; 113 alias handlers = AliasSeq!((VoidFunction fn) => vFunc = fn, 114 (VoidDelegate dg) => vDel = dg, 115 addTimerHandler, 116 removeTimerHandler, 117 (bool _){running = false;} 118 ); 119 auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime); 120 bool handleIt = false; 121 if (nextTrigger.isNull()) { 122 receive(handlers); 123 goto handleIt; 124 } else { 125 if (nextTrigger.get > 0.msecs) { 126 if (receiveTimeout(nextTrigger.get, handlers)) { 127 goto handleIt; 128 } 129 } 130 int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime); 131 if (advance > 0) { 132 auto wr = wheels.advance(advance); 133 foreach(t; wr.timers) { 134 t.dg(); 135 } 136 } 137 continue; 138 } 139 handleIt: 140 if (vFunc !is null) { 141 vFunc(); 142 } 143 if (vDel !is null) { 144 vDel(); 145 } 146 } 147 version (unittest) { 148 import std.variant : Variant; 149 import core.time : seconds; 150 while(receiveTimeout(seconds(-1),(Variant v){ 151 import std.stdio; 152 writeln("Got unwanted message ", v); 153 assert(0); 154 })) {} 155 } 156 } 157 158 void schedule(VoidDelegate dg) { 159 tid.send(dg); 160 } 161 162 Timer addTimer(VoidDelegate dg, Duration dur) @trusted { 163 import core.atomic : atomicOp; 164 ulong id = nextTimerId.atomicOp!("+=")(1); 165 Timer timer = Timer(dg, id); 166 tid.send(AddTimer(timer, dur)); 167 return timer; 168 } 169 170 void cancelTimer(Timer timer) @trusted { 171 Semaphore semaphore; 172 if (!isInContext) { 173 semaphore = localSemaphore(); 174 } 175 tid.send(RemoveTimer(timer, cast(shared)semaphore)); 176 if (semaphore !is null) 177 semaphore.wait(); 178 } 179 180 void stop() nothrow @trusted { 181 try { 182 (cast()tid).send(false); 183 } catch (Exception e) { 184 assert(false, e.msg); 185 } 186 } 187 188 bool isInContext() @trusted { 189 return thisTid == cast()tid; 190 } 191 } 192 193 private Semaphore localSemaphore() { 194 static Semaphore semaphore; 195 if (semaphore is null) 196 semaphore = new Semaphore(); 197 return semaphore; 198 } 199 200 package void executeInNewThread(VoidFunction fn) @system nothrow { 201 import concurrency.utils : closure; 202 import core.thread : Thread, thread_detachThis, thread_detachInstance; 203 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 204 205 auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted { 206 fn(); 207 version (Posix) 208 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 209 }, fn)).start(); 210 try { 211 /* 212 the isDaemon is really only a protecting against a race condition in druntime, 213 which is only introduced because I am detaching the thread, which I need to do 214 if I don't want to leak memory. 215 216 If the isDaemon fails because the Thread is gone (unlikely) than it can't 217 trigger unwanted behavior the isDaemon is preventing in the first place. 218 219 So it is fine if the exception is ignored. 220 */ 221 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 222 } catch (Exception e) {} 223 } 224 225 package void executeInNewThread(VoidDelegate fn) @system nothrow { 226 import concurrency.utils : closure; 227 import core.thread : Thread, thread_detachThis, thread_detachInstance; 228 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 229 230 auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted { 231 fn(); 232 version (Posix) 233 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 234 }, fn)).start(); 235 try { 236 /* 237 the isDaemon is really only a protecting against a race condition in druntime, 238 which is only introduced because I am detaching the thread, which I need to do 239 if I don't want to leak memory. 240 241 If the isDaemon fails because the Thread is gone (unlikely) then it can't 242 trigger unwanted behavior the isDaemon is preventing in the first place. 243 244 So it is fine if the exception is ignored. 245 */ 246 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 247 } catch (Exception e) {} 248 } 249 250 class ThreadExecutor : Executor { 251 void execute(VoidFunction fn) @trusted { 252 executeInNewThread(fn); 253 } 254 void execute(VoidDelegate fn) @trusted { 255 executeInNewThread(fn); 256 } 257 bool isInContext() @safe { return false; } 258 } 259 260 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) { 261 import core.sync.semaphore; 262 import std.concurrency; 263 import std.traits; 264 265 if (executor.isInContext) 266 return work(args); 267 268 Semaphore semaphore = localSemaphore(); 269 270 alias RT = ReturnType!Work; 271 struct Context { 272 Work work; 273 Args args; 274 Semaphore semaphore; 275 static if (is(RT == void)) { 276 void run() { 277 work(args); 278 semaphore.notify(); 279 } 280 } else { 281 RT result; 282 void run() { 283 result = work(args); 284 semaphore.notify(); 285 } 286 } 287 } 288 Context c = Context(work, args, semaphore); 289 executor.execute(cast(VoidDelegate)&c.run); 290 semaphore.wait(); 291 static if (!is(RT == void)) { 292 return c.result; 293 } 294 } 295 296 shared static this() { 297 import concurrency.utils : resetScheduler; 298 299 resetScheduler(); 300 } 301 302 @models!(ThreadSender, isSender) 303 struct ThreadSender { 304 alias Value = void; 305 static struct Op(Receiver) { 306 private Receiver receiver; 307 this(Receiver receiver) { 308 this.receiver = receiver; 309 } 310 void start() @trusted nothrow scope { 311 executeInNewThread(cast(VoidDelegate)&run); 312 } 313 void run() @trusted { 314 import concurrency.receiver : setValueOrError; 315 try { 316 receiver.setValueOrError(); 317 } catch (Throwable t) { 318 import std.stdio; 319 stderr.writeln(t); 320 assert(0); 321 } 322 } 323 } 324 auto connect(Receiver)(return Receiver receiver) @safe scope return { 325 // ensure NRVO 326 auto op = Op!(Receiver)(receiver); 327 return op; 328 } 329 } 330 331 struct StdTaskPool { 332 import std.parallelism : Task, TaskPool; 333 TaskPool pool; 334 @disable this(ref return scope typeof(this) rhs); 335 @disable this(this); 336 this(TaskPool pool) @trusted scope shared { 337 this.pool = cast(shared)pool; 338 } 339 ~this() nothrow @trusted scope { 340 try { 341 pool.finish(true); 342 } catch (Exception e) { 343 // can't really happen 344 assert(0); 345 } 346 } 347 auto getScheduler() scope @safe { 348 return StdTaskPoolProtoScheduler(pool); 349 } 350 auto getScheduler() scope @trusted shared { 351 return StdTaskPoolProtoScheduler(cast()pool); 352 } 353 } 354 355 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe { 356 import std.parallelism : TaskPool; 357 return shared StdTaskPool(new TaskPool(nWorkers)); 358 } 359 360 struct StdTaskPoolProtoScheduler { 361 import std.parallelism : TaskPool; 362 TaskPool pool; 363 auto schedule() { 364 return TaskPoolSender(pool); 365 } 366 auto withBaseScheduler(Scheduler)(Scheduler scheduler) { 367 return StdTaskPoolScheduler!(Scheduler)(pool, scheduler); 368 } 369 } 370 371 private struct StdTaskPoolScheduler(Scheduler) { 372 import std.parallelism : TaskPool; 373 import core.time : Duration; 374 TaskPool pool; 375 Scheduler scheduler; 376 auto schedule() { 377 return TaskPoolSender(pool); 378 } 379 auto scheduleAfter(Duration run) { 380 import concurrency.operations : via; 381 return schedule().via(scheduler.scheduleAfter(run)); 382 } 383 } 384 385 private struct TaskPoolSender { 386 import std.parallelism : Task, TaskPool, scopedTask, task; 387 import std.traits : ReturnType; 388 alias Value = void; 389 TaskPool pool; 390 static struct Op(Receiver) { 391 static void setValue(Receiver receiver) @safe nothrow { 392 import concurrency.receiver : setValueOrError; 393 receiver.setValueOrError(); 394 } 395 TaskPool pool; 396 alias TaskType = typeof(task!setValue(Receiver.init)); 397 TaskType myTask; 398 @disable this(ref return scope typeof(this) rhs); 399 @disable this(this); 400 this(Receiver receiver, TaskPool pool) @safe return scope { 401 myTask = task!(setValue)(receiver); 402 this.pool = pool; 403 } 404 void start() @trusted nothrow scope { 405 try { 406 pool.put(myTask); 407 } catch (Exception e) { 408 myTask.args[0].setError(e); 409 } 410 } 411 } 412 auto connect(Receiver)(return Receiver receiver) @safe scope return { 413 // ensure NRVO 414 auto op = Op!(Receiver)(receiver, pool); 415 return op; 416 } 417 }