1 module concurrency.thread; 2 3 import concurrency.executor; 4 import concurrency.scheduler; 5 import concurrency.sender; 6 import concepts; 7 import core.sync.semaphore : Semaphore; 8 import mir.algebraic; 9 import concurrency.scheduler : Timer; 10 import core.time : Duration; 11 import concurrency.data.queue.waitable; 12 import concurrency.data.queue.mpsc; 13 14 LocalThreadExecutor getLocalThreadExecutor() @safe { 15 static LocalThreadExecutor localThreadExecutor; 16 if (localThreadExecutor is null) 17 localThreadExecutor = new LocalThreadExecutor(); 18 return localThreadExecutor; 19 } 20 21 struct AddTimer { 22 Timer timer; 23 Duration dur; 24 } 25 26 struct RemoveTimer { 27 Timer timer; 28 } 29 30 alias WorkItem = Variant!(typeof(null), VoidDelegate, VoidFunction, AddTimer, RemoveTimer); // null signifies end 31 32 struct WorkNode { 33 WorkItem payload; 34 WorkNode* next; 35 } 36 37 alias WorkQueue = WaitableQueue!(MPSCQueue!WorkNode); 38 39 class LocalThreadExecutor : Executor { 40 import core.atomic : atomicOp, atomicStore, atomicLoad, cas; 41 import core.thread : ThreadID; 42 import std.process : thisThreadID; 43 static struct Node { 44 VoidDelegate dg; 45 Node* next; 46 } 47 private { 48 ThreadID threadId; 49 WorkQueue queue; 50 } 51 52 this() @safe { 53 // todo: probably use thisThreadID 54 threadId = thisThreadID; 55 queue = new WorkQueue; 56 } 57 58 void execute(VoidDelegate dg) @trusted { 59 if (isInContext) 60 dg(); 61 else 62 queue.push(new WorkNode(WorkItem(dg))); 63 } 64 65 void execute(VoidFunction fn) @trusted { 66 if (isInContext) 67 fn(); 68 else 69 queue.push(new WorkNode(WorkItem(fn))); 70 } 71 72 bool isInContext() @trusted { 73 return thisThreadID == threadId; 74 } 75 } 76 77 package struct LocalThreadWorker { 78 import core.time : Duration, msecs, hnsecs; 79 import concurrency.scheduler : Timer, TimerTrigger, TimerDelegate; 80 81 private { 82 shared int counter; 83 static shared ulong nextTimerId; 84 LocalThreadExecutor executor; 85 } 86 87 this(LocalThreadExecutor e) @safe { 88 executor = e; 89 } 90 91 void start() @trusted { 92 assert(isInContext); // start can only be called on the thread 93 import concurrency.timingwheels; 94 import std.datetime.systime : Clock; 95 import std.array : Appender; 96 Appender!(Timer[]) expiredTimers; 97 TimingWheels!Timer wheels; 98 auto ticks = 1.msecs; // represents the granularity 99 wheels.init(); 100 bool running = true; 101 while (running) { 102 import std.meta : AliasSeq; 103 alias handlers = AliasSeq!((typeof(null)){running = false;}, 104 (RemoveTimer cmd) { 105 wheels.cancel(cmd.timer); 106 cmd.timer.dg(TimerTrigger.cancel); 107 }, 108 (AddTimer cmd) { 109 auto real_now = Clock.currStdTime; 110 auto tw_now = wheels.currStdTime(ticks); 111 auto delay = (real_now - tw_now).hnsecs; 112 auto at = (cmd.dur + delay)/ticks; 113 wheels.schedule(cmd.timer, at); 114 }, 115 (VoidFunction fn) => fn(), 116 (VoidDelegate dg) => dg() 117 ); 118 auto nextTrigger = wheels.timeUntilNextEvent(ticks, Clock.currStdTime); 119 bool handleIt = false; 120 if (nextTrigger.isNull()) { 121 auto work = executor.queue.pop(); 122 if (work is null) 123 continue; 124 work.payload.match!(handlers); 125 } else { 126 if (nextTrigger.get > 0.msecs) { 127 auto work = executor.queue.pop(nextTrigger.get); 128 if (work !is null) { 129 130 work.payload.match!(handlers); 131 continue; 132 } 133 } 134 int advance = wheels.ticksToCatchUp(ticks, Clock.currStdTime); 135 if (advance > 0) { 136 import std.range : retro; 137 wheels.advance(advance, expiredTimers); 138 // NOTE timingwheels keeps the timers in reverse order, so we iterate in reverse 139 foreach(t; expiredTimers.data.retro) { 140 t.dg(TimerTrigger.trigger); 141 } 142 expiredTimers.shrinkTo(0); 143 } 144 } 145 } 146 version (unittest) { 147 if (!executor.queue.empty) { 148 auto work = executor.queue.pop(); 149 import std.stdio; 150 writeln("Got unwanted message ", work); 151 assert(0); 152 } 153 } 154 } 155 156 void schedule(VoidDelegate dg) { 157 executor.queue.push(new WorkNode(WorkItem(dg))); 158 } 159 160 Timer addTimer(TimerDelegate dg, Duration dur) @trusted { 161 import core.atomic : atomicOp; 162 ulong id = nextTimerId.atomicOp!("+=")(1); 163 Timer timer = Timer(dg, id); 164 executor.queue.push(new WorkNode(WorkItem(AddTimer(timer, dur)))); 165 return timer; 166 } 167 168 void cancelTimer(Timer timer) @trusted { 169 executor.queue.push(new WorkNode(WorkItem(RemoveTimer(timer)))); 170 } 171 172 void stop() nothrow @trusted { 173 try { 174 executor.queue.push(new WorkNode(WorkItem(null))); 175 } catch (Exception e) { 176 assert(false, e.msg); 177 } 178 } 179 180 bool isInContext() @trusted { 181 return executor.isInContext; 182 } 183 } 184 185 private Semaphore localSemaphore() { 186 static Semaphore semaphore; 187 if (semaphore is null) 188 semaphore = new Semaphore(); 189 return semaphore; 190 } 191 192 package void executeInNewThread(VoidFunction fn) @system nothrow { 193 import concurrency.utils : closure; 194 import core.thread : Thread, thread_detachThis, thread_detachInstance; 195 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 196 197 auto t = new Thread(cast(void delegate())closure((VoidFunction fn) @trusted { 198 fn(); 199 version (Posix) 200 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 201 }, fn)).start(); 202 try { 203 /* 204 the isDaemon is really only a protecting against a race condition in druntime, 205 which is only introduced because I am detaching the thread, which I need to do 206 if I don't want to leak memory. 207 208 If the isDaemon fails because the Thread is gone (unlikely) than it can't 209 trigger unwanted behavior the isDaemon is preventing in the first place. 210 211 So it is fine if the exception is ignored. 212 */ 213 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 214 } catch (Exception e) {} 215 } 216 217 package void executeInNewThread(VoidDelegate fn) @system nothrow { 218 import concurrency.utils : closure; 219 import core.thread : Thread, thread_detachThis, thread_detachInstance; 220 version (Posix) import core.sys.posix.pthread : pthread_detach, pthread_self; 221 222 auto t = new Thread(cast(void delegate())closure((VoidDelegate fn) @trusted { 223 fn(); 224 version (Posix) 225 pthread_detach(pthread_self); //NOTE: see git.symmetry.dev/SIL/plugins/alpha/web/-/issues/3 226 }, fn)).start(); 227 try { 228 /* 229 the isDaemon is really only a protecting against a race condition in druntime, 230 which is only introduced because I am detaching the thread, which I need to do 231 if I don't want to leak memory. 232 233 If the isDaemon fails because the Thread is gone (unlikely) then it can't 234 trigger unwanted behavior the isDaemon is preventing in the first place. 235 236 So it is fine if the exception is ignored. 237 */ 238 t.isDaemon = true; // is needed because of pthread_detach (otherwise there is a race between druntime joining and the thread exiting) 239 } catch (Exception e) {} 240 } 241 242 class ThreadExecutor : Executor { 243 void execute(VoidFunction fn) @trusted { 244 executeInNewThread(fn); 245 } 246 void execute(VoidDelegate fn) @trusted { 247 executeInNewThread(fn); 248 } 249 bool isInContext() @safe { return false; } 250 } 251 252 auto executeAndWait(Executor, Work, Args...)(Executor executor, Work work, Args args) { 253 import core.sync.semaphore; 254 import std.traits; 255 256 if (executor.isInContext) 257 return work(args); 258 259 Semaphore semaphore = localSemaphore(); 260 261 alias RT = ReturnType!Work; 262 struct Context { 263 Work work; 264 Args args; 265 Semaphore semaphore; 266 static if (is(RT == void)) { 267 void run() { 268 work(args); 269 semaphore.notify(); 270 } 271 } else { 272 RT result; 273 void run() { 274 result = work(args); 275 semaphore.notify(); 276 } 277 } 278 } 279 Context c = Context(work, args, semaphore); 280 executor.execute(cast(VoidDelegate)&c.run); 281 semaphore.wait(); 282 static if (!is(RT == void)) { 283 return c.result; 284 } 285 } 286 287 shared static this() { 288 import concurrency.utils : resetScheduler; 289 290 resetScheduler(); 291 } 292 293 struct ThreadSender { 294 static assert (models!(typeof(this), isSender)); 295 alias Value = void; 296 static struct Op(Receiver) { 297 private Receiver receiver; 298 this(Receiver receiver) { 299 this.receiver = receiver; 300 } 301 void start() @trusted nothrow scope { 302 executeInNewThread(cast(VoidDelegate)&run); 303 } 304 void run() @trusted { 305 import concurrency.receiver : setValueOrError; 306 import concurrency.error : clone; 307 import concurrency : parentStopSource; 308 309 parentStopSource = receiver.getStopToken().source; 310 311 try { 312 receiver.setValue(); 313 } catch (Exception e) { 314 receiver.setError(e); 315 } catch (Throwable t) { 316 receiver.setError(t.clone()); 317 } 318 319 parentStopSource = null; 320 } 321 } 322 auto connect(Receiver)(return Receiver receiver) @safe scope return { 323 // ensure NRVO 324 auto op = Op!(Receiver)(receiver); 325 return op; 326 } 327 } 328 329 struct StdTaskPool { 330 import std.parallelism : Task, TaskPool; 331 TaskPool pool; 332 @disable this(ref return scope typeof(this) rhs); 333 @disable this(this); 334 this(TaskPool pool) @trusted scope shared { 335 this.pool = cast(shared)pool; 336 } 337 ~this() nothrow @trusted scope { 338 try { 339 pool.finish(true); 340 } catch (Exception e) { 341 // can't really happen 342 assert(0); 343 } 344 } 345 auto getScheduler() scope @safe { 346 return StdTaskPoolProtoScheduler(pool); 347 } 348 auto getScheduler() scope @trusted shared { 349 return StdTaskPoolProtoScheduler(cast()pool); 350 } 351 } 352 353 shared(StdTaskPool) stdTaskPool(size_t nWorkers = 0) @safe { 354 import std.parallelism : TaskPool; 355 return shared StdTaskPool(new TaskPool(nWorkers)); 356 } 357 358 struct StdTaskPoolProtoScheduler { 359 import std.parallelism : TaskPool; 360 TaskPool pool; 361 auto schedule() { 362 return TaskPoolSender(pool); 363 } 364 auto withBaseScheduler(Scheduler)(Scheduler scheduler) { 365 return StdTaskPoolScheduler!(Scheduler)(pool, scheduler); 366 } 367 } 368 369 private struct StdTaskPoolScheduler(Scheduler) { 370 import std.parallelism : TaskPool; 371 import core.time : Duration; 372 TaskPool pool; 373 Scheduler scheduler; 374 auto schedule() { 375 return TaskPoolSender(pool); 376 } 377 auto scheduleAfter(Duration run) { 378 import concurrency.operations : via; 379 return schedule().via(scheduler.scheduleAfter(run)); 380 } 381 } 382 383 private struct TaskPoolSender { 384 import std.parallelism : Task, TaskPool, scopedTask, task; 385 import std.traits : ReturnType; 386 import concurrency.error : clone; 387 alias Value = void; 388 TaskPool pool; 389 static struct Op(Receiver) { 390 static void setValue(Receiver receiver) @trusted nothrow { 391 import concurrency : parentStopSource; 392 parentStopSource = receiver.getStopToken().source; 393 try { 394 receiver.setValue(); 395 } catch (Exception e) { 396 receiver.setError(e); 397 } catch (Throwable t) { 398 receiver.setError(t.clone); 399 } 400 parentStopSource = null; 401 } 402 TaskPool pool; 403 alias TaskType = typeof(task!setValue(Receiver.init)); 404 TaskType myTask; 405 @disable this(ref return scope typeof(this) rhs); 406 @disable this(this); 407 this(Receiver receiver, TaskPool pool) @safe return scope { 408 myTask = task!(setValue)(receiver); 409 this.pool = pool; 410 } 411 void start() @trusted nothrow scope { 412 try { 413 pool.put(myTask); 414 } catch (Exception e) { 415 myTask.args[0].setError(e); 416 } 417 } 418 } 419 auto connect(Receiver)(return Receiver receiver) @safe scope return { 420 // ensure NRVO 421 auto op = Op!(Receiver)(receiver, pool); 422 return op; 423 } 424 }