1 module concurrency.stoptoken; 2 3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis 4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 5 6 class StopSource { 7 private stop_state state; 8 bool stop() nothrow @safe { 9 return state.request_stop(); 10 } 11 12 bool stop() nothrow @trusted shared { 13 return (cast(StopSource)this).state.request_stop(); 14 } 15 16 bool isStopRequested() nothrow @safe @nogc { 17 return state.is_stop_requested(); 18 } 19 20 bool isStopRequested() nothrow @trusted @nogc shared { 21 return (cast(StopSource)this).isStopRequested(); 22 } 23 24 /// resets the internal state, only do this if you are sure nothing else is looking at this... 25 void reset() @system @nogc { 26 this.state = stop_state(); 27 } 28 } 29 30 struct StopToken { 31 private StopSource source; 32 this(StopSource source) nothrow @safe @nogc { 33 this.source = source; 34 isStopPossible = source !is null; 35 } 36 37 this(shared StopSource source) nothrow @trusted @nogc { 38 this.source = cast()source; 39 } 40 41 bool isStopRequested() nothrow @safe @nogc { 42 return isStopPossible && source.isStopRequested(); 43 } 44 45 const bool isStopPossible; 46 } 47 48 struct NeverStopToken { 49 enum isStopRequested = false; 50 enum isStopPossible = false; 51 } 52 53 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe { 54 import std.traits : hasMember; 55 auto cb = new StopCallback(callback); 56 if (stopToken.isStopPossible) { 57 if (stopToken.source.state.try_add_callback(cb, true)) 58 cb.source = stopToken.source; 59 } 60 return cb; 61 } 62 63 class StopCallback { 64 void dispose() nothrow @trusted @nogc { 65 import core.atomic : cas; 66 67 if (source is null) 68 return; 69 auto local = source; 70 static if (__traits(compiles, cas(&source, local, null))) { 71 if (!cas(&source, local, null)) { 72 assert(source is null); 73 return; 74 } 75 } else { 76 if (!cas(cast(shared)&source, cast(shared)local, null)) { 77 assert(source is null); 78 return; 79 } 80 } 81 local.state.remove_callback(this); 82 } 83 84 private: 85 this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { 86 this.callback = callback; 87 } 88 89 void delegate() nothrow shared @safe callback; 90 StopSource source; 91 92 StopCallback next_ = null; 93 StopCallback* prev_ = null; 94 bool* isRemoved_ = null; 95 shared bool callbackFinishedExecuting = false; 96 97 void execute() nothrow @safe { 98 callback(); 99 } 100 } 101 102 deprecated("Use regular StopToken") alias StopTokenObject = StopToken; 103 104 auto stopTokenObject(StopToken stopToken) { 105 return stopToken; 106 } 107 108 auto stopTokenObject(NeverStopToken stopToken) { 109 StopSource s = null; 110 return StopToken(s); 111 } 112 113 private void spin_yield() nothrow @trusted @nogc { 114 // TODO: could use the pause asm instruction 115 // it is available in LDC as intrinsic... but not in DMD 116 import core.thread : Thread; 117 118 Thread.yield(); 119 } 120 121 private struct stop_state { 122 import core.thread : Thread; 123 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp; 124 125 static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () { 126 import core.internal.atomic : atomicCompareExchangeWeakNoResult; 127 })) 128 import core.atomic : casWeak; 129 else 130 auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe { 131 import core.atomic : cas; 132 133 static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis))) 134 return cas!(M1, M2)(here, ifThis, writeThis); 135 else 136 return cas(here, ifThis, writeThis); 137 } 138 139 public: 140 void add_token_reference() nothrow @safe @nogc { 141 // TODO: want to use atomicFetchAdd but (proper) support is only recent 142 // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); 143 state_.atomicOp!"+="(token_ref_increment); 144 } 145 146 void remove_token_reference() nothrow @safe @nogc { 147 // TODO: want to use atomicFetchSub but (proper) support is only recent 148 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); 149 state_.atomicOp!"-="(token_ref_increment); 150 } 151 152 void add_source_reference() nothrow @safe @nogc { 153 // TODO: want to use atomicFetchAdd but (proper) support is only recent 154 // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); 155 state_.atomicOp!"+="(source_ref_increment); 156 } 157 158 void remove_source_reference() nothrow @safe @nogc { 159 // TODO: want to use atomicFetchSub but (proper) support is only recent 160 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); 161 state_.atomicOp!"-="(source_ref_increment); 162 } 163 164 bool request_stop() nothrow @safe { 165 166 if (!try_lock_and_signal_until_signalled()) { 167 // Stop has already been requested. 168 return false; 169 } 170 171 // Set the 'stop_requested' signal and acquired the lock. 172 173 signallingThread_ = Thread.getThis(); 174 175 while (head_ !is null) { 176 // Dequeue the head of the queue 177 auto cb = head_; 178 head_ = cb.next_; 179 const bool anyMore = head_ !is null; 180 if (anyMore) { 181 (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... 182 } 183 // Mark this item as removed from the list. 184 cb.prev_ = null; 185 186 // Don't hold lock while executing callback 187 // so we don't block other threads from deregistering callbacks. 188 unlock(); 189 190 // TRICKY: Need to store a flag on the stack here that the callback 191 // can use to signal that the destructor was executed inline 192 // during the call. If the destructor was executed inline then 193 // it's not safe to dereference cb after execute() returns. 194 // If the destructor runs on some other thread then the other 195 // thread will block waiting for this thread to signal that the 196 // callback has finished executing. 197 bool isRemoved = false; 198 (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. 199 200 cb.execute(); 201 202 if (!isRemoved) { 203 cb.isRemoved_ = null; 204 cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true); 205 } 206 207 if (!anyMore) { 208 // This was the last item in the queue when we dequeued it. 209 // No more items should be added to the queue after we have 210 // marked the state as interrupted, only removed from the queue. 211 // Avoid acquring/releasing the lock in this case. 212 return true; 213 } 214 215 lock(); 216 } 217 218 unlock(); 219 220 return true; 221 } 222 223 bool is_stop_requested() nothrow @safe @nogc { 224 return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); 225 } 226 227 bool is_stop_requestable() nothrow @safe @nogc { 228 return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); 229 } 230 231 bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { 232 ulong oldState; 233 do { 234 goto load_state; 235 do { 236 spin_yield(); 237 load_state: 238 oldState = state_.atomicLoad!(MemoryOrder.acq); 239 if (is_stop_requested(oldState)) { 240 cb.execute(); 241 return false; 242 } 243 else if (!is_stop_requestable(oldState)) { 244 return false; 245 } 246 } 247 while (is_locked(oldState)); 248 } 249 while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag)); 250 251 // Push callback onto callback list. 252 cb.next_ = head_; 253 if (cb.next_ !is null) { 254 cb.next_.prev_ = &cb.next_; 255 } 256 cb.prev_ = &head_; 257 head_ = cb; 258 259 if (incrementRefCountIfSuccessful) { 260 unlock_and_increment_token_ref_count(); 261 } 262 else { 263 unlock(); 264 } 265 266 // Successfully added the callback. 267 return true; 268 } 269 270 void remove_callback(StopCallback cb) nothrow @safe @nogc { 271 lock(); 272 273 if (cb.prev_ !is null) { 274 // Still registered, not yet executed 275 // Just remove from the list. 276 *cb.prev_ = cb.next_; 277 if (cb.next_ !is null) { 278 cb.next_.prev_ = cb.prev_; 279 } 280 281 unlock_and_decrement_token_ref_count(); 282 283 return; 284 } 285 286 unlock(); 287 288 // Callback has either already executed or is executing 289 // concurrently on another thread. 290 291 if (signallingThread_ is Thread.getThis()) { 292 // Callback executed on this thread or is still currently executing 293 // and is deregistering itself from within the callback. 294 if (cb.isRemoved_ !is null) { 295 // Currently inside the callback, let the request_stop() method 296 // know the object is about to be destructed and that it should 297 // not try to access the object when the callback returns. 298 *cb.isRemoved_ = true; 299 } 300 } 301 else { 302 // Callback is currently executing on another thread, 303 // block until it finishes executing. 304 while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { 305 spin_yield(); 306 } 307 } 308 309 remove_token_reference(); 310 } 311 312 private: 313 static bool is_locked(ulong state) nothrow @safe @nogc { 314 return (state & locked_flag) != 0; 315 } 316 317 static bool is_stop_requested(ulong state) nothrow @safe @nogc { 318 return (state & stop_requested_flag) != 0; 319 } 320 321 static bool is_stop_requestable(ulong state) nothrow @safe @nogc { 322 // Interruptible if it has already been interrupted or if there are 323 // still interrupt_source instances in existence. 324 return is_stop_requested(state) || (state >= source_ref_increment); 325 } 326 327 bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { 328 ulong oldState; 329 do { 330 oldState = state_.atomicLoad!(MemoryOrder.acq); 331 if (is_stop_requested(oldState)) 332 return false; 333 while (is_locked(oldState)) { 334 spin_yield(); 335 oldState = state_.atomicLoad!(MemoryOrder.acq); 336 if (is_stop_requested(oldState)) 337 return false; 338 } 339 } 340 while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState, 341 oldState | stop_requested_flag | locked_flag)); 342 return true; 343 } 344 345 void lock() nothrow @safe @nogc { 346 ulong oldState; 347 do { 348 oldState = state_.atomicLoad!(MemoryOrder.raw); 349 while (is_locked(oldState)) { 350 spin_yield(); 351 oldState = state_.atomicLoad!(MemoryOrder.raw); 352 } 353 } 354 while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState, 355 oldState | locked_flag)); 356 } 357 358 void unlock() nothrow @safe @nogc { 359 // TODO: want to use atomicFetchSub but (proper) support is only recent 360 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); 361 state_.atomicOp!"-="(locked_flag); 362 } 363 364 void unlock_and_increment_token_ref_count() nothrow @safe @nogc { 365 // TODO: want to use atomicFetchSub but (proper) support is only recent 366 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); 367 state_.atomicOp!"-="(locked_flag - token_ref_increment); 368 } 369 370 void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { 371 // TODO: want to use atomicFetchSub but (proper) support is only recent 372 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); 373 state_.atomicOp!"-="(locked_flag + token_ref_increment); 374 } 375 376 enum stop_requested_flag = 1L; 377 enum locked_flag = 2L; 378 enum token_ref_increment = 4L; 379 enum source_ref_increment = 1L << 33u; 380 381 // bit 0 - stop-requested 382 // bit 1 - locked 383 // bits 2-32 - token ref count (31 bits) 384 // bits 33-63 - source ref count (31 bits) 385 shared ulong state_ = source_ref_increment; 386 StopCallback head_ = null; 387 Thread signallingThread_; 388 }