1 module concurrency.stoptoken; 2 3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis 4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0 5 6 class StopSource { 7 private stop_state state; 8 bool stop() nothrow @safe { 9 return state.request_stop(); 10 } 11 12 bool stop() nothrow @trusted shared { 13 return (cast(StopSource)this).state.request_stop(); 14 } 15 16 bool isStopRequested() nothrow @safe @nogc { 17 return state.is_stop_requested(); 18 } 19 20 bool isStopRequested() nothrow @trusted @nogc shared { 21 return (cast(StopSource)this).isStopRequested(); 22 } 23 24 /// resets the internal state, only do this if you are sure nothing else is looking at this... 25 void reset() @system @nogc { 26 this.state = stop_state(); 27 } 28 } 29 30 struct StopToken { 31 private StopSource source; 32 this(StopSource source) nothrow @safe @nogc { 33 this.source = source; 34 } 35 36 bool isStopRequested() nothrow @safe @nogc { 37 return source.isStopRequested(); 38 } 39 40 enum isStopPossible = true; 41 } 42 43 struct NeverStopToken { 44 enum isStopRequested = false; 45 enum isStopPossible = false; 46 } 47 48 enum isStopToken(T) = true; // TODO: 49 50 StopCallback onStop(StopToken)(auto ref StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe if (isStopToken!StopToken) { 51 import std.traits : hasMember; 52 auto cb = new StopCallback(callback); 53 static if (stopToken.isStopPossible && hasMember!(StopToken, "source")) { 54 if (stopToken.source.state.try_add_callback(cb, true)) 55 cb.source = stopToken.source; 56 } 57 return cb; 58 } 59 60 class StopCallback { 61 void dispose() nothrow @trusted @nogc { 62 import core.atomic : cas; 63 64 if (source is null) 65 return; 66 auto local = source; 67 static if (__traits(compiles, cas(&source, local, null))) { 68 if (!cas(&source, local, null)) { 69 assert(source is null); 70 return; 71 } 72 } else { 73 if (!cas(cast(shared)&source, cast(shared)local, null)) { 74 assert(source is null); 75 return; 76 } 77 } 78 local.state.remove_callback(this); 79 } 80 81 private: 82 this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc { 83 this.callback = callback; 84 } 85 86 void delegate() nothrow shared @safe callback; 87 StopSource source; 88 89 StopCallback next_ = null; 90 StopCallback* prev_ = null; 91 bool* isRemoved_ = null; 92 shared bool callbackFinishedExecuting = false; 93 94 void execute() nothrow @safe { 95 callback(); 96 } 97 } 98 99 interface StopTokenObject { 100 bool isStopRequested() nothrow @safe; 101 bool isStopPossible() nothrow @safe; 102 StopCallback onStop(void delegate() nothrow @safe shared callback) nothrow @safe; 103 } 104 105 class StopTokenObjectImpl(StopToken) : StopTokenObject { 106 private StopToken token; 107 this(StopToken token) { this.token = token; } 108 bool isStopRequested() nothrow @safe { 109 return token.isStopRequested; 110 } 111 bool isStopPossible() nothrow @safe { 112 return token.isStopPossible; 113 } 114 StopCallback onStop(void delegate() nothrow @safe shared callback) nothrow @safe { 115 return token.onStop(callback); 116 } 117 } 118 119 auto stopTokenObject(StopToken)(StopToken stopToken) { 120 return new StopTokenObjectImpl!(StopToken)(stopToken); 121 } 122 123 124 125 private void spin_yield() nothrow @trusted @nogc { 126 // TODO: could use the pause asm instruction 127 // it is available in LDC as intrinsic... but not in DMD 128 import core.thread : Thread; 129 130 Thread.yield(); 131 } 132 133 private struct stop_state { 134 import core.thread : Thread; 135 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp; 136 137 static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () { 138 import core.internal.atomic : atomicCompareExchangeWeakNoResult; 139 })) 140 import core.atomic : casWeak; 141 else 142 auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe { 143 import core.atomic : cas; 144 145 static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis))) 146 return cas!(M1, M2)(here, ifThis, writeThis); 147 else 148 return cas(here, ifThis, writeThis); 149 } 150 151 public: 152 void add_token_reference() nothrow @safe @nogc { 153 // TODO: want to use atomicFetchAdd but (proper) support is only recent 154 // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment); 155 state_.atomicOp!"+="(token_ref_increment); 156 } 157 158 void remove_token_reference() nothrow @safe @nogc { 159 // TODO: want to use atomicFetchSub but (proper) support is only recent 160 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment); 161 state_.atomicOp!"-="(token_ref_increment); 162 } 163 164 void add_source_reference() nothrow @safe @nogc { 165 // TODO: want to use atomicFetchAdd but (proper) support is only recent 166 // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment); 167 state_.atomicOp!"+="(source_ref_increment); 168 } 169 170 void remove_source_reference() nothrow @safe @nogc { 171 // TODO: want to use atomicFetchSub but (proper) support is only recent 172 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment); 173 state_.atomicOp!"-="(source_ref_increment); 174 } 175 176 bool request_stop() nothrow @safe { 177 178 if (!try_lock_and_signal_until_signalled()) { 179 // Stop has already been requested. 180 return false; 181 } 182 183 // Set the 'stop_requested' signal and acquired the lock. 184 185 signallingThread_ = Thread.getThis(); 186 187 while (head_ !is null) { 188 // Dequeue the head of the queue 189 auto cb = head_; 190 head_ = cb.next_; 191 const bool anyMore = head_ !is null; 192 if (anyMore) { 193 (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime... 194 } 195 // Mark this item as removed from the list. 196 cb.prev_ = null; 197 198 // Don't hold lock while executing callback 199 // so we don't block other threads from deregistering callbacks. 200 unlock(); 201 202 // TRICKY: Need to store a flag on the stack here that the callback 203 // can use to signal that the destructor was executed inline 204 // during the call. If the destructor was executed inline then 205 // it's not safe to dereference cb after execute() returns. 206 // If the destructor runs on some other thread then the other 207 // thread will block waiting for this thread to signal that the 208 // callback has finished executing. 209 bool isRemoved = false; 210 (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down. 211 212 cb.execute(); 213 214 if (!isRemoved) { 215 cb.isRemoved_ = null; 216 cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true); 217 } 218 219 if (!anyMore) { 220 // This was the last item in the queue when we dequeued it. 221 // No more items should be added to the queue after we have 222 // marked the state as interrupted, only removed from the queue. 223 // Avoid acquring/releasing the lock in this case. 224 return true; 225 } 226 227 lock(); 228 } 229 230 unlock(); 231 232 return true; 233 } 234 235 bool is_stop_requested() nothrow @safe @nogc { 236 return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq)); 237 } 238 239 bool is_stop_requestable() nothrow @safe @nogc { 240 return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq)); 241 } 242 243 bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe { 244 ulong oldState; 245 do { 246 goto load_state; 247 do { 248 spin_yield(); 249 load_state: 250 oldState = state_.atomicLoad!(MemoryOrder.acq); 251 if (is_stop_requested(oldState)) { 252 cb.execute(); 253 return false; 254 } 255 else if (!is_stop_requestable(oldState)) { 256 return false; 257 } 258 } 259 while (is_locked(oldState)); 260 } 261 while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag)); 262 263 // Push callback onto callback list. 264 cb.next_ = head_; 265 if (cb.next_ !is null) { 266 cb.next_.prev_ = &cb.next_; 267 } 268 cb.prev_ = &head_; 269 head_ = cb; 270 271 if (incrementRefCountIfSuccessful) { 272 unlock_and_increment_token_ref_count(); 273 } 274 else { 275 unlock(); 276 } 277 278 // Successfully added the callback. 279 return true; 280 } 281 282 void remove_callback(StopCallback cb) nothrow @safe @nogc { 283 lock(); 284 285 if (cb.prev_ !is null) { 286 // Still registered, not yet executed 287 // Just remove from the list. 288 *cb.prev_ = cb.next_; 289 if (cb.next_ !is null) { 290 cb.next_.prev_ = cb.prev_; 291 } 292 293 unlock_and_decrement_token_ref_count(); 294 295 return; 296 } 297 298 unlock(); 299 300 // Callback has either already executed or is executing 301 // concurrently on another thread. 302 303 if (signallingThread_ is Thread.getThis()) { 304 // Callback executed on this thread or is still currently executing 305 // and is deregistering itself from within the callback. 306 if (cb.isRemoved_ !is null) { 307 // Currently inside the callback, let the request_stop() method 308 // know the object is about to be destructed and that it should 309 // not try to access the object when the callback returns. 310 *cb.isRemoved_ = true; 311 } 312 } 313 else { 314 // Callback is currently executing on another thread, 315 // block until it finishes executing. 316 while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) { 317 spin_yield(); 318 } 319 } 320 321 remove_token_reference(); 322 } 323 324 private: 325 static bool is_locked(ulong state) nothrow @safe @nogc { 326 return (state & locked_flag) != 0; 327 } 328 329 static bool is_stop_requested(ulong state) nothrow @safe @nogc { 330 return (state & stop_requested_flag) != 0; 331 } 332 333 static bool is_stop_requestable(ulong state) nothrow @safe @nogc { 334 // Interruptible if it has already been interrupted or if there are 335 // still interrupt_source instances in existence. 336 return is_stop_requested(state) || (state >= source_ref_increment); 337 } 338 339 bool try_lock_and_signal_until_signalled() nothrow @safe @nogc { 340 ulong oldState; 341 do { 342 oldState = state_.atomicLoad!(MemoryOrder.acq); 343 if (is_stop_requested(oldState)) 344 return false; 345 while (is_locked(oldState)) { 346 spin_yield(); 347 oldState = state_.atomicLoad!(MemoryOrder.acq); 348 if (is_stop_requested(oldState)) 349 return false; 350 } 351 } 352 while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState, 353 oldState | stop_requested_flag | locked_flag)); 354 return true; 355 } 356 357 void lock() nothrow @safe @nogc { 358 ulong oldState; 359 do { 360 oldState = state_.atomicLoad!(MemoryOrder.raw); 361 while (is_locked(oldState)) { 362 spin_yield(); 363 oldState = state_.atomicLoad!(MemoryOrder.raw); 364 } 365 } 366 while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState, 367 oldState | locked_flag)); 368 } 369 370 void unlock() nothrow @safe @nogc { 371 // TODO: want to use atomicFetchSub but (proper) support is only recent 372 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag); 373 state_.atomicOp!"-="(locked_flag); 374 } 375 376 void unlock_and_increment_token_ref_count() nothrow @safe @nogc { 377 // TODO: want to use atomicFetchSub but (proper) support is only recent 378 // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment); 379 state_.atomicOp!"-="(locked_flag - token_ref_increment); 380 } 381 382 void unlock_and_decrement_token_ref_count() nothrow @safe @nogc { 383 // TODO: want to use atomicFetchSub but (proper) support is only recent 384 // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment); 385 state_.atomicOp!"-="(locked_flag + token_ref_increment); 386 } 387 388 enum stop_requested_flag = 1L; 389 enum locked_flag = 2L; 390 enum token_ref_increment = 4L; 391 enum source_ref_increment = 1L << 33u; 392 393 // bit 0 - stop-requested 394 // bit 1 - locked 395 // bits 2-32 - token ref count (31 bits) 396 // bits 33-63 - source ref count (31 bits) 397 shared ulong state_ = source_ref_increment; 398 StopCallback head_ = null; 399 Thread signallingThread_; 400 }