1 module concurrency.stoptoken;
2 
3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
5 
6 class StopSource {
7   private stop_state state;
8   bool stop() nothrow @safe {
9     return state.request_stop();
10   }
11 
12   bool stop() nothrow @trusted shared {
13     return (cast(StopSource)this).state.request_stop();
14   }
15 
16   bool isStopRequested() nothrow @safe @nogc {
17     return state.is_stop_requested();
18   }
19 
20   bool isStopRequested() nothrow @trusted @nogc shared {
21     return (cast(StopSource)this).isStopRequested();
22   }
23 
24   /// resets the internal state, only do this if you are sure nothing else is looking at this...
25   void reset(this t)() @system @nogc {
26     this.state = stop_state();
27   }
28 }
29 
30 struct StopToken {
31   package(concurrency) StopSource source;
32   this(StopSource source) nothrow @safe @nogc {
33     this.source = source;
34     isStopPossible = source !is null;
35   }
36 
37   this(shared StopSource source) nothrow @trusted @nogc {
38     this.source = cast()source;
39     isStopPossible = source !is null;
40   }
41 
42   bool isStopRequested() nothrow @safe @nogc {
43     return isStopPossible && source.isStopRequested();
44   }
45 
46   const bool isStopPossible;
47 }
48 
49 struct NeverStopToken {
50   enum isStopRequested = false;
51   enum isStopPossible = false;
52 }
53 
54 StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe {
55   auto cb = new StopCallback(callback);
56   if (stopSource.state.try_add_callback(cb, true))
57     cb.source = stopSource;
58   return cb;
59 }
60 
61 StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted {
62   import std.functional : toDelegate;
63   return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
64 }
65 
66 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe {
67   if (stopToken.isStopPossible) {
68     return stopToken.source.onStop(callback);
69   }
70   return new StopCallback(callback);
71 }
72 
73 StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted {
74   import std.functional : toDelegate;
75   return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
76 }
77 
78 class StopCallback {
79   void dispose() nothrow @trusted @nogc {
80     import core.atomic : cas;
81 
82     if (source is null)
83       return;
84     auto local = source;
85     static if (__traits(compiles, cas(&source, local, null))) {
86       if (!cas(&source, local, null)) {
87         assert(source is null);
88         return;
89       }
90     } else {
91       if (!cas(cast(shared)&source, cast(shared)local, null)) {
92         assert(source is null);
93         return;
94       }
95     }
96     local.state.remove_callback(this);
97   }
98 
99 private:
100   this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
101     this.callback = callback;
102   }
103 
104   void delegate() nothrow shared @safe callback;
105   StopSource source;
106 
107   StopCallback next_ = null;
108   StopCallback* prev_ = null;
109   bool* isRemoved_ = null;
110   shared bool callbackFinishedExecuting = false;
111 
112   void execute() nothrow @safe {
113     callback();
114   }
115 }
116 
117 deprecated("Use regular StopToken") alias StopTokenObject = StopToken;
118 
119 auto stopTokenObject(StopToken stopToken) {
120   return stopToken;
121 }
122 
123 auto stopTokenObject(NeverStopToken stopToken) {
124   StopSource s = null;
125   return StopToken(s);
126 }
127 
128 private void spin_yield() nothrow @trusted @nogc {
129   // TODO: could use the pause asm instruction
130   // it is available in LDC as intrinsic... but not in DMD
131   import core.thread : Thread;
132 
133   Thread.yield();
134 }
135 
136 private struct stop_state {
137   import core.thread : Thread;
138   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp;
139 
140   static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () {
141       import core.internal.atomic : atomicCompareExchangeWeakNoResult;
142     }))
143     import core.atomic : casWeak;
144   else
145     auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe {
146       import core.atomic : cas;
147 
148       static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis)))
149         return cas!(M1, M2)(here, ifThis, writeThis);
150       else
151         return cas(here, ifThis, writeThis);
152     }
153 
154 public:
155   void add_token_reference() nothrow @safe @nogc {
156     // TODO: want to use atomicFetchAdd but (proper) support is only recent
157     // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
158     state_.atomicOp!"+="(token_ref_increment);
159   }
160 
161   void remove_token_reference() nothrow @safe @nogc {
162     // TODO: want to use atomicFetchSub but (proper) support is only recent
163     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
164     state_.atomicOp!"-="(token_ref_increment);
165   }
166 
167   void add_source_reference() nothrow @safe @nogc {
168     // TODO: want to use atomicFetchAdd but (proper) support is only recent
169     // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
170     state_.atomicOp!"+="(source_ref_increment);
171   }
172 
173   void remove_source_reference() nothrow @safe @nogc {
174     // TODO: want to use atomicFetchSub but (proper) support is only recent
175     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
176     state_.atomicOp!"-="(source_ref_increment);
177   }
178 
179   bool request_stop() nothrow @safe {
180 
181     if (!try_lock_and_signal_until_signalled()) {
182       // Stop has already been requested.
183       return false;
184     }
185 
186     // Set the 'stop_requested' signal and acquired the lock.
187 
188     signallingThread_ = Thread.getThis();
189 
190     while (head_ !is null) {
191       // Dequeue the head of the queue
192       auto cb = head_;
193       head_ = cb.next_;
194       const bool anyMore = head_ !is null;
195       if (anyMore) {
196         (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
197       }
198       // Mark this item as removed from the list.
199       cb.prev_ = null;
200 
201       // Don't hold lock while executing callback
202       // so we don't block other threads from deregistering callbacks.
203       unlock();
204 
205       // TRICKY: Need to store a flag on the stack here that the callback
206       // can use to signal that the destructor was executed inline
207       // during the call. If the destructor was executed inline then
208       // it's not safe to dereference cb after execute() returns.
209       // If the destructor runs on some other thread then the other
210       // thread will block waiting for this thread to signal that the
211       // callback has finished executing.
212       bool isRemoved = false;
213       (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.
214 
215       cb.execute();
216 
217       if (!isRemoved) {
218         cb.isRemoved_ = null;
219         cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true);
220       }
221 
222       if (!anyMore) {
223         // This was the last item in the queue when we dequeued it.
224         // No more items should be added to the queue after we have
225         // marked the state as interrupted, only removed from the queue.
226         // Avoid acquring/releasing the lock in this case.
227         return true;
228       }
229 
230       lock();
231     }
232 
233     unlock();
234 
235     return true;
236   }
237 
238   bool is_stop_requested() nothrow @safe @nogc {
239     return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
240   }
241 
242   bool is_stop_requestable() nothrow @safe @nogc {
243     return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
244   }
245 
246   bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
247     ulong oldState;
248     do {
249       goto load_state;
250       do {
251         spin_yield();
252       load_state:
253         oldState = state_.atomicLoad!(MemoryOrder.acq);
254         if (is_stop_requested(oldState)) {
255           cb.execute();
256           return false;
257         }
258         else if (!is_stop_requestable(oldState)) {
259           return false;
260         }
261       }
262       while (is_locked(oldState));
263     }
264     while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag));
265 
266     // Push callback onto callback list.
267     cb.next_ = head_;
268     if (cb.next_ !is null) {
269       cb.next_.prev_ = &cb.next_;
270     }
271     () @trusted { cb.prev_ = &head_; } ();
272     head_ = cb;
273 
274     if (incrementRefCountIfSuccessful) {
275       unlock_and_increment_token_ref_count();
276     }
277     else {
278       unlock();
279     }
280 
281     // Successfully added the callback.
282     return true;
283   }
284 
285   void remove_callback(StopCallback cb) nothrow @safe @nogc {
286     lock();
287 
288     if (cb.prev_ !is null) {
289       // Still registered, not yet executed
290       // Just remove from the list.
291       *cb.prev_ = cb.next_;
292       if (cb.next_ !is null) {
293         cb.next_.prev_ = cb.prev_;
294       }
295 
296       unlock_and_decrement_token_ref_count();
297 
298       return;
299     }
300 
301     unlock();
302 
303     // Callback has either already executed or is executing
304     // concurrently on another thread.
305 
306     if (signallingThread_ is Thread.getThis()) {
307       // Callback executed on this thread or is still currently executing
308       // and is deregistering itself from within the callback.
309       if (cb.isRemoved_ !is null) {
310         // Currently inside the callback, let the request_stop() method
311         // know the object is about to be destructed and that it should
312         // not try to access the object when the callback returns.
313         *cb.isRemoved_ = true;
314       }
315     }
316     else {
317       // Callback is currently executing on another thread,
318       // block until it finishes executing.
319       while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
320         spin_yield();
321       }
322     }
323 
324     remove_token_reference();
325   }
326 
327 private:
328   static bool is_locked(ulong state) nothrow @safe @nogc {
329     return (state & locked_flag) != 0;
330   }
331 
332   static bool is_stop_requested(ulong state) nothrow @safe @nogc {
333     return (state & stop_requested_flag) != 0;
334   }
335 
336   static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
337     // Interruptible if it has already been interrupted or if there are
338     // still interrupt_source instances in existence.
339     return is_stop_requested(state) || (state >= source_ref_increment);
340   }
341 
342   bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
343     ulong oldState;
344     do {
345       oldState = state_.atomicLoad!(MemoryOrder.acq);
346       if (is_stop_requested(oldState))
347         return false;
348       while (is_locked(oldState)) {
349         spin_yield();
350         oldState = state_.atomicLoad!(MemoryOrder.acq);
351         if (is_stop_requested(oldState))
352           return false;
353       }
354     }
355     while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState,
356         oldState | stop_requested_flag | locked_flag));
357     return true;
358   }
359 
360   void lock() nothrow @safe @nogc {
361     ulong oldState;
362     do {
363       oldState = state_.atomicLoad!(MemoryOrder.raw);
364       while (is_locked(oldState)) {
365         spin_yield();
366         oldState = state_.atomicLoad!(MemoryOrder.raw);
367       }
368     }
369     while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState,
370         oldState | locked_flag));
371   }
372 
373   void unlock() nothrow @safe @nogc {
374     // TODO: want to use atomicFetchSub but (proper) support is only recent
375     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
376     state_.atomicOp!"-="(locked_flag);
377   }
378 
379   void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
380     // TODO: want to use atomicFetchSub but (proper) support is only recent
381     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
382     state_.atomicOp!"-="(locked_flag - token_ref_increment);
383   }
384 
385   void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
386     // TODO: want to use atomicFetchSub but (proper) support is only recent
387     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
388     state_.atomicOp!"-="(locked_flag + token_ref_increment);
389   }
390 
391   enum stop_requested_flag = 1L;
392   enum locked_flag = 2L;
393   enum token_ref_increment = 4L;
394   enum source_ref_increment = 1L << 33u;
395 
396   // bit 0 - stop-requested
397   // bit 1 - locked
398   // bits 2-32 - token ref count (31 bits)
399   // bits 33-63 - source ref count (31 bits)
400   shared ulong state_ = source_ref_increment;
401   StopCallback head_ = null;
402   Thread signallingThread_;
403 }