1 module concurrency.stoptoken; 2 3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis 4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 5 6 class StopSource { 7 private stop_state state; 8 bool stop() nothrow @safe { 9 return state.request_stop(); 10 } 11 12 bool stop() nothrow @trusted shared { 13 return (cast(StopSource)this).state.request_stop(); 14 } 15 16 bool isStopRequested() nothrow @safe @nogc { 17 return state.is_stop_requested(); 18 } 19 20 bool isStopRequested() nothrow @trusted @nogc shared { 21 return (cast(StopSource)this).isStopRequested(); 22 } 23 24 /// resets the internal state, only do this if you are sure nothing else is looking at this... 25 void reset(this t)() @system @nogc { 26 this.state = stop_state(); 27 } 28 } 29 30 struct StopToken { 31 package(concurrency) StopSource source; 32 this(StopSource source) nothrow @safe @nogc { 33 this.source = source; 34 isStopPossible = source !is null; 35 } 36 37 this(shared StopSource source) nothrow @trusted @nogc { 38 this.source = cast()source; 39 isStopPossible = source !is null; 40 } 41 42 bool isStopRequested() nothrow @safe @nogc { 43 return isStopPossible && source.isStopRequested(); 44 } 45 46 const bool isStopPossible; 47 } 48 49 struct NeverStopToken { 50 enum isStopRequested = false; 51 enum isStopPossible = false; 52 } 53 54 StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe { 55 auto cb = new StopCallback(callback); 56 if (stopSource.state.try_add_callback(cb, true)) 57 cb.source = stopSource; 58 return cb; 59 } 60 61 StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted { 62 import std.functional : toDelegate; 63 return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); 64 } 65 66 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe { 67 if (stopToken.isStopPossible) { 68 return stopToken.source.onStop(callback); 69 } 70 return new StopCallback(callback); 71 } 72 73 StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted { 74 import std.functional : toDelegate; 75 return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); 76 } 77 78 class StopCallback { 79 void dispose() nothrow @trusted @nogc { 80 import core.atomic : cas; 81 82 if (source is null) 83 return; 84 auto local = source; 85 static if (__traits(compiles, cas(&source, local, null))) { 86 if (!cas(&source, local, null)) { 87 assert(source is null); 88 return; 89 } 90 } else { 91 if (!cas(cast(shared)&source, cast(shared)local, null)) { 92 assert(source is null); 93 return; 94 } 95 } 96 local.state.remove_callback(this); 97 } 98 void dispose() shared nothrow @trusted @nogc { 99 (cast()this).dispose(); 100 } 101 102 private: 103 this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { 104 this.callback = callback; 105 } 106 107 void delegate() nothrow shared @safe callback; 108 StopSource source; 109 110 StopCallback next_ = null; 111 StopCallback* prev_ = null; 112 bool* isRemoved_ = null; 113 shared bool callbackFinishedExecuting = false; 114 115 void execute() nothrow @safe { 116 callback(); 117 } 118 } 119 120 deprecated("Use regular StopToken") alias StopTokenObject = StopToken; 121 122 auto stopTokenObject(StopToken stopToken) { 123 return stopToken; 124 } 125 126 auto stopTokenObject(NeverStopToken stopToken) { 127 StopSource s = null; 128 return StopToken(s); 129 } 130 131 private void spin_yield() nothrow @trusted @nogc { 132 // TODO: could use the pause asm instruction 133 // it is available in LDC as intrinsic... but not in DMD 134 import core.thread : Thread; 135 136 Thread.yield(); 137 } 138 139 private struct stop_state { 140 import core.thread : Thread; 141 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp; 142 143 static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () { 144 import core.internal.atomic : atomicCompareExchangeWeakNoResult; 145 })) 146 import core.atomic : casWeak; 147 else 148 auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe { 149 import core.atomic : cas; 150 151 static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis))) 152 return cas!(M1, M2)(here, ifThis, writeThis); 153 else 154 return cas(here, ifThis, writeThis); 155 } 156 157 public: 158 void add_token_reference() nothrow @safe @nogc { 159 // TODO: want to use atomicFetchAdd but (proper) support is only recent 160 // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); 161 state_.atomicOp!"+="(token_ref_increment); 162 } 163 164 void remove_token_reference() nothrow @safe @nogc { 165 // TODO: want to use atomicFetchSub but (proper) support is only recent 166 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); 167 state_.atomicOp!"-="(token_ref_increment); 168 } 169 170 void add_source_reference() nothrow @safe @nogc { 171 // TODO: want to use atomicFetchAdd but (proper) support is only recent 172 // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); 173 state_.atomicOp!"+="(source_ref_increment); 174 } 175 176 void remove_source_reference() nothrow @safe @nogc { 177 // TODO: want to use atomicFetchSub but (proper) support is only recent 178 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); 179 state_.atomicOp!"-="(source_ref_increment); 180 } 181 182 bool request_stop() nothrow @safe { 183 184 if (!try_lock_and_signal_until_signalled()) { 185 // Stop has already been requested. 186 return false; 187 } 188 189 // Set the 'stop_requested' signal and acquired the lock. 190 191 signallingThread_ = Thread.getThis(); 192 193 while (head_ !is null) { 194 // Dequeue the head of the queue 195 auto cb = head_; 196 head_ = cb.next_; 197 const bool anyMore = head_ !is null; 198 if (anyMore) { 199 (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... 200 } 201 // Mark this item as removed from the list. 202 cb.prev_ = null; 203 204 // Don't hold lock while executing callback 205 // so we don't block other threads from deregistering callbacks. 206 unlock(); 207 208 // TRICKY: Need to store a flag on the stack here that the callback 209 // can use to signal that the destructor was executed inline 210 // during the call. If the destructor was executed inline then 211 // it's not safe to dereference cb after execute() returns. 212 // If the destructor runs on some other thread then the other 213 // thread will block waiting for this thread to signal that the 214 // callback has finished executing. 215 bool isRemoved = false; 216 (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. 217 218 cb.execute(); 219 220 if (!isRemoved) { 221 cb.isRemoved_ = null; 222 cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true); 223 } 224 225 if (!anyMore) { 226 // This was the last item in the queue when we dequeued it. 227 // No more items should be added to the queue after we have 228 // marked the state as interrupted, only removed from the queue. 229 // Avoid acquring/releasing the lock in this case. 230 return true; 231 } 232 233 lock(); 234 } 235 236 unlock(); 237 238 return true; 239 } 240 241 bool is_stop_requested() nothrow @safe @nogc { 242 return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); 243 } 244 245 bool is_stop_requestable() nothrow @safe @nogc { 246 return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); 247 } 248 249 bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { 250 ulong oldState; 251 do { 252 goto load_state; 253 do { 254 spin_yield(); 255 load_state: 256 oldState = state_.atomicLoad!(MemoryOrder.acq); 257 if (is_stop_requested(oldState)) { 258 cb.execute(); 259 return false; 260 } 261 else if (!is_stop_requestable(oldState)) { 262 return false; 263 } 264 } 265 while (is_locked(oldState)); 266 } 267 while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag)); 268 269 // Push callback onto callback list. 270 cb.next_ = head_; 271 if (cb.next_ !is null) { 272 cb.next_.prev_ = &cb.next_; 273 } 274 () @trusted { cb.prev_ = &head_; } (); 275 head_ = cb; 276 277 if (incrementRefCountIfSuccessful) { 278 unlock_and_increment_token_ref_count(); 279 } 280 else { 281 unlock(); 282 } 283 284 // Successfully added the callback. 285 return true; 286 } 287 288 void remove_callback(StopCallback cb) nothrow @safe @nogc { 289 lock(); 290 291 if (cb.prev_ !is null) { 292 // Still registered, not yet executed 293 // Just remove from the list. 294 *cb.prev_ = cb.next_; 295 if (cb.next_ !is null) { 296 cb.next_.prev_ = cb.prev_; 297 } 298 299 unlock_and_decrement_token_ref_count(); 300 301 return; 302 } 303 304 unlock(); 305 306 // Callback has either already executed or is executing 307 // concurrently on another thread. 308 309 if (signallingThread_ is Thread.getThis()) { 310 // Callback executed on this thread or is still currently executing 311 // and is deregistering itself from within the callback. 312 if (cb.isRemoved_ !is null) { 313 // Currently inside the callback, let the request_stop() method 314 // know the object is about to be destructed and that it should 315 // not try to access the object when the callback returns. 316 *cb.isRemoved_ = true; 317 } 318 } 319 else { 320 // Callback is currently executing on another thread, 321 // block until it finishes executing. 322 while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { 323 spin_yield(); 324 } 325 } 326 327 remove_token_reference(); 328 } 329 330 private: 331 static bool is_locked(ulong state) nothrow @safe @nogc { 332 return (state & locked_flag) != 0; 333 } 334 335 static bool is_stop_requested(ulong state) nothrow @safe @nogc { 336 return (state & stop_requested_flag) != 0; 337 } 338 339 static bool is_stop_requestable(ulong state) nothrow @safe @nogc { 340 // Interruptible if it has already been interrupted or if there are 341 // still interrupt_source instances in existence. 342 return is_stop_requested(state) || (state >= source_ref_increment); 343 } 344 345 bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { 346 ulong oldState; 347 do { 348 oldState = state_.atomicLoad!(MemoryOrder.acq); 349 if (is_stop_requested(oldState)) 350 return false; 351 while (is_locked(oldState)) { 352 spin_yield(); 353 oldState = state_.atomicLoad!(MemoryOrder.acq); 354 if (is_stop_requested(oldState)) 355 return false; 356 } 357 } 358 while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState, 359 oldState | stop_requested_flag | locked_flag)); 360 return true; 361 } 362 363 void lock() nothrow @safe @nogc { 364 ulong oldState; 365 do { 366 oldState = state_.atomicLoad!(MemoryOrder.raw); 367 while (is_locked(oldState)) { 368 spin_yield(); 369 oldState = state_.atomicLoad!(MemoryOrder.raw); 370 } 371 } 372 while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState, 373 oldState | locked_flag)); 374 } 375 376 void unlock() nothrow @safe @nogc { 377 // TODO: want to use atomicFetchSub but (proper) support is only recent 378 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); 379 state_.atomicOp!"-="(locked_flag); 380 } 381 382 void unlock_and_increment_token_ref_count() nothrow @safe @nogc { 383 // TODO: want to use atomicFetchSub but (proper) support is only recent 384 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); 385 state_.atomicOp!"-="(locked_flag - token_ref_increment); 386 } 387 388 void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { 389 // TODO: want to use atomicFetchSub but (proper) support is only recent 390 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); 391 state_.atomicOp!"-="(locked_flag + token_ref_increment); 392 } 393 394 enum stop_requested_flag = 1L; 395 enum locked_flag = 2L; 396 enum token_ref_increment = 4L; 397 enum source_ref_increment = 1L << 33u; 398 399 // bit 0 - stop-requested 400 // bit 1 - locked 401 // bits 2-32 - token ref count (31 bits) 402 // bits 33-63 - source ref count (31 bits) 403 shared ulong state_ = source_ref_increment; 404 StopCallback head_ = null; 405 Thread signallingThread_; 406 }