1 module concurrency.stoptoken; 2 3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis 4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 5 6 class StopSource { 7 private stop_state state; 8 bool stop() nothrow @safe { 9 return state.request_stop(); 10 } 11 12 bool stop() nothrow @trusted shared { 13 return (cast(StopSource)this).state.request_stop(); 14 } 15 16 bool isStopRequested() nothrow @safe @nogc { 17 return state.is_stop_requested(); 18 } 19 20 bool isStopRequested() nothrow @trusted @nogc shared { 21 return (cast(StopSource)this).isStopRequested(); 22 } 23 24 /// resets the internal state, only do this if you are sure nothing else is looking at this... 25 void reset(this t)() @system @nogc { 26 this.state = stop_state(); 27 } 28 } 29 30 struct StopToken { 31 package(concurrency) StopSource source; 32 this(StopSource source) nothrow @safe @nogc { 33 this.source = source; 34 isStopPossible = source !is null; 35 } 36 37 this(shared StopSource source) nothrow @trusted @nogc { 38 this.source = cast()source; 39 isStopPossible = source !is null; 40 } 41 42 bool isStopRequested() nothrow @safe @nogc { 43 return isStopPossible && source.isStopRequested(); 44 } 45 46 const bool isStopPossible; 47 } 48 49 struct NeverStopToken { 50 enum isStopRequested = false; 51 enum isStopPossible = false; 52 } 53 54 StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe { 55 auto cb = new StopCallback(callback); 56 if (stopSource.state.try_add_callback(cb, true)) 57 cb.source = stopSource; 58 return cb; 59 } 60 61 StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted { 62 import std.functional : toDelegate; 63 return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); 64 } 65 66 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe { 67 if (stopToken.isStopPossible) { 68 return stopToken.source.onStop(callback); 69 } 70 return new StopCallback(callback); 71 } 72 73 StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted { 74 import std.functional : toDelegate; 75 return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate); 76 } 77 78 class StopCallback { 79 void dispose() nothrow @trusted @nogc { 80 import core.atomic : cas; 81 82 if (source is null) 83 return; 84 auto local = source; 85 static if (__traits(compiles, cas(&source, local, null))) { 86 if (!cas(&source, local, null)) { 87 assert(source is null); 88 return; 89 } 90 } else { 91 if (!cas(cast(shared)&source, cast(shared)local, null)) { 92 assert(source is null); 93 return; 94 } 95 } 96 local.state.remove_callback(this); 97 } 98 99 private: 100 this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { 101 this.callback = callback; 102 } 103 104 void delegate() nothrow shared @safe callback; 105 StopSource source; 106 107 StopCallback next_ = null; 108 StopCallback* prev_ = null; 109 bool* isRemoved_ = null; 110 shared bool callbackFinishedExecuting = false; 111 112 void execute() nothrow @safe { 113 callback(); 114 } 115 } 116 117 deprecated("Use regular StopToken") alias StopTokenObject = StopToken; 118 119 auto stopTokenObject(StopToken stopToken) { 120 return stopToken; 121 } 122 123 auto stopTokenObject(NeverStopToken stopToken) { 124 StopSource s = null; 125 return StopToken(s); 126 } 127 128 private void spin_yield() nothrow @trusted @nogc { 129 // TODO: could use the pause asm instruction 130 // it is available in LDC as intrinsic... but not in DMD 131 import core.thread : Thread; 132 133 Thread.yield(); 134 } 135 136 private struct stop_state { 137 import core.thread : Thread; 138 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp; 139 140 static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () { 141 import core.internal.atomic : atomicCompareExchangeWeakNoResult; 142 })) 143 import core.atomic : casWeak; 144 else 145 auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe { 146 import core.atomic : cas; 147 148 static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis))) 149 return cas!(M1, M2)(here, ifThis, writeThis); 150 else 151 return cas(here, ifThis, writeThis); 152 } 153 154 public: 155 void add_token_reference() nothrow @safe @nogc { 156 // TODO: want to use atomicFetchAdd but (proper) support is only recent 157 // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); 158 state_.atomicOp!"+="(token_ref_increment); 159 } 160 161 void remove_token_reference() nothrow @safe @nogc { 162 // TODO: want to use atomicFetchSub but (proper) support is only recent 163 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); 164 state_.atomicOp!"-="(token_ref_increment); 165 } 166 167 void add_source_reference() nothrow @safe @nogc { 168 // TODO: want to use atomicFetchAdd but (proper) support is only recent 169 // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); 170 state_.atomicOp!"+="(source_ref_increment); 171 } 172 173 void remove_source_reference() nothrow @safe @nogc { 174 // TODO: want to use atomicFetchSub but (proper) support is only recent 175 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); 176 state_.atomicOp!"-="(source_ref_increment); 177 } 178 179 bool request_stop() nothrow @safe { 180 181 if (!try_lock_and_signal_until_signalled()) { 182 // Stop has already been requested. 183 return false; 184 } 185 186 // Set the 'stop_requested' signal and acquired the lock. 187 188 signallingThread_ = Thread.getThis(); 189 190 while (head_ !is null) { 191 // Dequeue the head of the queue 192 auto cb = head_; 193 head_ = cb.next_; 194 const bool anyMore = head_ !is null; 195 if (anyMore) { 196 (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... 197 } 198 // Mark this item as removed from the list. 199 cb.prev_ = null; 200 201 // Don't hold lock while executing callback 202 // so we don't block other threads from deregistering callbacks. 203 unlock(); 204 205 // TRICKY: Need to store a flag on the stack here that the callback 206 // can use to signal that the destructor was executed inline 207 // during the call. If the destructor was executed inline then 208 // it's not safe to dereference cb after execute() returns. 209 // If the destructor runs on some other thread then the other 210 // thread will block waiting for this thread to signal that the 211 // callback has finished executing. 212 bool isRemoved = false; 213 (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. 214 215 cb.execute(); 216 217 if (!isRemoved) { 218 cb.isRemoved_ = null; 219 cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true); 220 } 221 222 if (!anyMore) { 223 // This was the last item in the queue when we dequeued it. 224 // No more items should be added to the queue after we have 225 // marked the state as interrupted, only removed from the queue. 226 // Avoid acquring/releasing the lock in this case. 227 return true; 228 } 229 230 lock(); 231 } 232 233 unlock(); 234 235 return true; 236 } 237 238 bool is_stop_requested() nothrow @safe @nogc { 239 return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); 240 } 241 242 bool is_stop_requestable() nothrow @safe @nogc { 243 return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); 244 } 245 246 bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { 247 ulong oldState; 248 do { 249 goto load_state; 250 do { 251 spin_yield(); 252 load_state: 253 oldState = state_.atomicLoad!(MemoryOrder.acq); 254 if (is_stop_requested(oldState)) { 255 cb.execute(); 256 return false; 257 } 258 else if (!is_stop_requestable(oldState)) { 259 return false; 260 } 261 } 262 while (is_locked(oldState)); 263 } 264 while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag)); 265 266 // Push callback onto callback list. 267 cb.next_ = head_; 268 if (cb.next_ !is null) { 269 cb.next_.prev_ = &cb.next_; 270 } 271 () @trusted { cb.prev_ = &head_; } (); 272 head_ = cb; 273 274 if (incrementRefCountIfSuccessful) { 275 unlock_and_increment_token_ref_count(); 276 } 277 else { 278 unlock(); 279 } 280 281 // Successfully added the callback. 282 return true; 283 } 284 285 void remove_callback(StopCallback cb) nothrow @safe @nogc { 286 lock(); 287 288 if (cb.prev_ !is null) { 289 // Still registered, not yet executed 290 // Just remove from the list. 291 *cb.prev_ = cb.next_; 292 if (cb.next_ !is null) { 293 cb.next_.prev_ = cb.prev_; 294 } 295 296 unlock_and_decrement_token_ref_count(); 297 298 return; 299 } 300 301 unlock(); 302 303 // Callback has either already executed or is executing 304 // concurrently on another thread. 305 306 if (signallingThread_ is Thread.getThis()) { 307 // Callback executed on this thread or is still currently executing 308 // and is deregistering itself from within the callback. 309 if (cb.isRemoved_ !is null) { 310 // Currently inside the callback, let the request_stop() method 311 // know the object is about to be destructed and that it should 312 // not try to access the object when the callback returns. 313 *cb.isRemoved_ = true; 314 } 315 } 316 else { 317 // Callback is currently executing on another thread, 318 // block until it finishes executing. 319 while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { 320 spin_yield(); 321 } 322 } 323 324 remove_token_reference(); 325 } 326 327 private: 328 static bool is_locked(ulong state) nothrow @safe @nogc { 329 return (state & locked_flag) != 0; 330 } 331 332 static bool is_stop_requested(ulong state) nothrow @safe @nogc { 333 return (state & stop_requested_flag) != 0; 334 } 335 336 static bool is_stop_requestable(ulong state) nothrow @safe @nogc { 337 // Interruptible if it has already been interrupted or if there are 338 // still interrupt_source instances in existence. 339 return is_stop_requested(state) || (state >= source_ref_increment); 340 } 341 342 bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { 343 ulong oldState; 344 do { 345 oldState = state_.atomicLoad!(MemoryOrder.acq); 346 if (is_stop_requested(oldState)) 347 return false; 348 while (is_locked(oldState)) { 349 spin_yield(); 350 oldState = state_.atomicLoad!(MemoryOrder.acq); 351 if (is_stop_requested(oldState)) 352 return false; 353 } 354 } 355 while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState, 356 oldState | stop_requested_flag | locked_flag)); 357 return true; 358 } 359 360 void lock() nothrow @safe @nogc { 361 ulong oldState; 362 do { 363 oldState = state_.atomicLoad!(MemoryOrder.raw); 364 while (is_locked(oldState)) { 365 spin_yield(); 366 oldState = state_.atomicLoad!(MemoryOrder.raw); 367 } 368 } 369 while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState, 370 oldState | locked_flag)); 371 } 372 373 void unlock() nothrow @safe @nogc { 374 // TODO: want to use atomicFetchSub but (proper) support is only recent 375 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); 376 state_.atomicOp!"-="(locked_flag); 377 } 378 379 void unlock_and_increment_token_ref_count() nothrow @safe @nogc { 380 // TODO: want to use atomicFetchSub but (proper) support is only recent 381 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); 382 state_.atomicOp!"-="(locked_flag - token_ref_increment); 383 } 384 385 void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { 386 // TODO: want to use atomicFetchSub but (proper) support is only recent 387 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); 388 state_.atomicOp!"-="(locked_flag + token_ref_increment); 389 } 390 391 enum stop_requested_flag = 1L; 392 enum locked_flag = 2L; 393 enum token_ref_increment = 4L; 394 enum source_ref_increment = 1L << 33u; 395 396 // bit 0 - stop-requested 397 // bit 1 - locked 398 // bits 2-32 - token ref count (31 bits) 399 // bits 33-63 - source ref count (31 bits) 400 shared ulong state_ = source_ref_increment; 401 StopCallback head_ = null; 402 Thread signallingThread_; 403 }