1 module concurrency.stoptoken;
2 
3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
5 
6 class StopSource {
7   private stop_state state;
8   bool stop() nothrow @safe {
9     return state.request_stop();
10   }
11 
12   bool stop() nothrow @trusted shared {
13     return (cast(StopSource)this).state.request_stop();
14   }
15 
16   bool isStopRequested() nothrow @safe @nogc {
17     return state.is_stop_requested();
18   }
19 
20   bool isStopRequested() nothrow @trusted @nogc shared {
21     return (cast(StopSource)this).isStopRequested();
22   }
23 
24   /// resets the internal state, only do this if you are sure nothing else is looking at this...
25   void reset() @system @nogc {
26     this.state = stop_state();
27   }
28 }
29 
30 struct StopToken {
31   private StopSource source;
32   this(StopSource source) nothrow @safe @nogc {
33     this.source = source;
34     isStopPossible = source !is null;
35   }
36 
37   this(shared StopSource source) nothrow @trusted @nogc {
38     this.source = cast()source;
39   }
40 
41   bool isStopRequested() nothrow @safe @nogc {
42     return isStopPossible && source.isStopRequested();
43   }
44 
45   const bool isStopPossible;
46 }
47 
48 struct NeverStopToken {
49   enum isStopRequested = false;
50   enum isStopPossible = false;
51 }
52 
53 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe {
54   import std.traits : hasMember;
55   auto cb = new StopCallback(callback);
56   if (stopToken.isStopPossible) {
57     if (stopToken.source.state.try_add_callback(cb, true))
58       cb.source = stopToken.source;
59   }
60   return cb;
61 }
62 
63 class StopCallback {
64   void dispose() nothrow @trusted @nogc {
65     import core.atomic : cas;
66 
67     if (source is null)
68       return;
69     auto local = source;
70     static if (__traits(compiles, cas(&source, local, null))) {
71       if (!cas(&source, local, null)) {
72         assert(source is null);
73         return;
74       }
75     } else {
76       if (!cas(cast(shared)&source, cast(shared)local, null)) {
77         assert(source is null);
78         return;
79       }
80     }
81     local.state.remove_callback(this);
82   }
83 
84 private:
85   this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
86     this.callback = callback;
87   }
88 
89   void delegate() nothrow shared @safe callback;
90   StopSource source;
91 
92   StopCallback next_ = null;
93   StopCallback* prev_ = null;
94   bool* isRemoved_ = null;
95   shared bool callbackFinishedExecuting = false;
96 
97   void execute() nothrow @safe {
98     callback();
99   }
100 }
101 
102 deprecated("Use regular StopToken") alias StopTokenObject = StopToken;
103 
104 auto stopTokenObject(StopToken stopToken) {
105   return stopToken;
106 }
107 
108 auto stopTokenObject(NeverStopToken stopToken) {
109   StopSource s = null;
110   return StopToken(s);
111 }
112 
113 private void spin_yield() nothrow @trusted @nogc {
114   // TODO: could use the pause asm instruction
115   // it is available in LDC as intrinsic... but not in DMD
116   import core.thread : Thread;
117 
118   Thread.yield();
119 }
120 
121 private struct stop_state {
122   import core.thread : Thread;
123   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp;
124 
125   static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () {
126       import core.internal.atomic : atomicCompareExchangeWeakNoResult;
127     }))
128     import core.atomic : casWeak;
129   else
130     auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe {
131       import core.atomic : cas;
132 
133       static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis)))
134         return cas!(M1, M2)(here, ifThis, writeThis);
135       else
136         return cas(here, ifThis, writeThis);
137     }
138 
139 public:
140   void add_token_reference() nothrow @safe @nogc {
141     // TODO: want to use atomicFetchAdd but (proper) support is only recent
142     // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
143     state_.atomicOp!"+="(token_ref_increment);
144   }
145 
146   void remove_token_reference() nothrow @safe @nogc {
147     // TODO: want to use atomicFetchSub but (proper) support is only recent
148     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
149     state_.atomicOp!"-="(token_ref_increment);
150   }
151 
152   void add_source_reference() nothrow @safe @nogc {
153     // TODO: want to use atomicFetchAdd but (proper) support is only recent
154     // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
155     state_.atomicOp!"+="(source_ref_increment);
156   }
157 
158   void remove_source_reference() nothrow @safe @nogc {
159     // TODO: want to use atomicFetchSub but (proper) support is only recent
160     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
161     state_.atomicOp!"-="(source_ref_increment);
162   }
163 
164   bool request_stop() nothrow @safe {
165 
166     if (!try_lock_and_signal_until_signalled()) {
167       // Stop has already been requested.
168       return false;
169     }
170 
171     // Set the 'stop_requested' signal and acquired the lock.
172 
173     signallingThread_ = Thread.getThis();
174 
175     while (head_ !is null) {
176       // Dequeue the head of the queue
177       auto cb = head_;
178       head_ = cb.next_;
179       const bool anyMore = head_ !is null;
180       if (anyMore) {
181         (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
182       }
183       // Mark this item as removed from the list.
184       cb.prev_ = null;
185 
186       // Don't hold lock while executing callback
187       // so we don't block other threads from deregistering callbacks.
188       unlock();
189 
190       // TRICKY: Need to store a flag on the stack here that the callback
191       // can use to signal that the destructor was executed inline
192       // during the call. If the destructor was executed inline then
193       // it's not safe to dereference cb after execute() returns.
194       // If the destructor runs on some other thread then the other
195       // thread will block waiting for this thread to signal that the
196       // callback has finished executing.
197       bool isRemoved = false;
198       (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.
199 
200       cb.execute();
201 
202       if (!isRemoved) {
203         cb.isRemoved_ = null;
204         cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true);
205       }
206 
207       if (!anyMore) {
208         // This was the last item in the queue when we dequeued it.
209         // No more items should be added to the queue after we have
210         // marked the state as interrupted, only removed from the queue.
211         // Avoid acquring/releasing the lock in this case.
212         return true;
213       }
214 
215       lock();
216     }
217 
218     unlock();
219 
220     return true;
221   }
222 
223   bool is_stop_requested() nothrow @safe @nogc {
224     return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
225   }
226 
227   bool is_stop_requestable() nothrow @safe @nogc {
228     return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
229   }
230 
231   bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
232     ulong oldState;
233     do {
234       goto load_state;
235       do {
236         spin_yield();
237       load_state:
238         oldState = state_.atomicLoad!(MemoryOrder.acq);
239         if (is_stop_requested(oldState)) {
240           cb.execute();
241           return false;
242         }
243         else if (!is_stop_requestable(oldState)) {
244           return false;
245         }
246       }
247       while (is_locked(oldState));
248     }
249     while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag));
250 
251     // Push callback onto callback list.
252     cb.next_ = head_;
253     if (cb.next_ !is null) {
254       cb.next_.prev_ = &cb.next_;
255     }
256     cb.prev_ = &head_;
257     head_ = cb;
258 
259     if (incrementRefCountIfSuccessful) {
260       unlock_and_increment_token_ref_count();
261     }
262     else {
263       unlock();
264     }
265 
266     // Successfully added the callback.
267     return true;
268   }
269 
270   void remove_callback(StopCallback cb) nothrow @safe @nogc {
271     lock();
272 
273     if (cb.prev_ !is null) {
274       // Still registered, not yet executed
275       // Just remove from the list.
276       *cb.prev_ = cb.next_;
277       if (cb.next_ !is null) {
278         cb.next_.prev_ = cb.prev_;
279       }
280 
281       unlock_and_decrement_token_ref_count();
282 
283       return;
284     }
285 
286     unlock();
287 
288     // Callback has either already executed or is executing
289     // concurrently on another thread.
290 
291     if (signallingThread_ is Thread.getThis()) {
292       // Callback executed on this thread or is still currently executing
293       // and is deregistering itself from within the callback.
294       if (cb.isRemoved_ !is null) {
295         // Currently inside the callback, let the request_stop() method
296         // know the object is about to be destructed and that it should
297         // not try to access the object when the callback returns.
298         *cb.isRemoved_ = true;
299       }
300     }
301     else {
302       // Callback is currently executing on another thread,
303       // block until it finishes executing.
304       while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
305         spin_yield();
306       }
307     }
308 
309     remove_token_reference();
310   }
311 
312 private:
313   static bool is_locked(ulong state) nothrow @safe @nogc {
314     return (state & locked_flag) != 0;
315   }
316 
317   static bool is_stop_requested(ulong state) nothrow @safe @nogc {
318     return (state & stop_requested_flag) != 0;
319   }
320 
321   static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
322     // Interruptible if it has already been interrupted or if there are
323     // still interrupt_source instances in existence.
324     return is_stop_requested(state) || (state >= source_ref_increment);
325   }
326 
327   bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
328     ulong oldState;
329     do {
330       oldState = state_.atomicLoad!(MemoryOrder.acq);
331       if (is_stop_requested(oldState))
332         return false;
333       while (is_locked(oldState)) {
334         spin_yield();
335         oldState = state_.atomicLoad!(MemoryOrder.acq);
336         if (is_stop_requested(oldState))
337           return false;
338       }
339     }
340     while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState,
341         oldState | stop_requested_flag | locked_flag));
342     return true;
343   }
344 
345   void lock() nothrow @safe @nogc {
346     ulong oldState;
347     do {
348       oldState = state_.atomicLoad!(MemoryOrder.raw);
349       while (is_locked(oldState)) {
350         spin_yield();
351         oldState = state_.atomicLoad!(MemoryOrder.raw);
352       }
353     }
354     while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState,
355         oldState | locked_flag));
356   }
357 
358   void unlock() nothrow @safe @nogc {
359     // TODO: want to use atomicFetchSub but (proper) support is only recent
360     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
361     state_.atomicOp!"-="(locked_flag);
362   }
363 
364   void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
365     // TODO: want to use atomicFetchSub but (proper) support is only recent
366     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
367     state_.atomicOp!"-="(locked_flag - token_ref_increment);
368   }
369 
370   void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
371     // TODO: want to use atomicFetchSub but (proper) support is only recent
372     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
373     state_.atomicOp!"-="(locked_flag + token_ref_increment);
374   }
375 
376   enum stop_requested_flag = 1L;
377   enum locked_flag = 2L;
378   enum token_ref_increment = 4L;
379   enum source_ref_increment = 1L << 33u;
380 
381   // bit 0 - stop-requested
382   // bit 1 - locked
383   // bits 2-32 - token ref count (31 bits)
384   // bits 33-63 - source ref count (31 bits)
385   shared ulong state_ = source_ref_increment;
386   StopCallback head_ = null;
387   Thread signallingThread_;
388 }