1 module concurrency.stoptoken;
2 
3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
5 
6 class StopSource {
7   private stop_state state;
8   bool stop() nothrow @safe {
9     return state.request_stop();
10   }
11 
12   bool stop() nothrow @trusted shared {
13     return (cast(StopSource)this).state.request_stop();
14   }
15 
16   bool isStopRequested() nothrow @safe @nogc {
17     return state.is_stop_requested();
18   }
19 
20   bool isStopRequested() nothrow @trusted @nogc shared {
21     return (cast(StopSource)this).isStopRequested();
22   }
23 
24   /// resets the internal state, only do this if you are sure nothing else is looking at this...
25   void reset(this t)() @system @nogc {
26     this.state = stop_state();
27   }
28 }
29 
30 struct StopToken {
31   package(concurrency) StopSource source;
32   this(StopSource source) nothrow @safe @nogc {
33     this.source = source;
34     isStopPossible = source !is null;
35   }
36 
37   this(shared StopSource source) nothrow @trusted @nogc {
38     this.source = cast()source;
39     isStopPossible = source !is null;
40   }
41 
42   bool isStopRequested() nothrow @safe @nogc {
43     return isStopPossible && source.isStopRequested();
44   }
45 
46   const bool isStopPossible;
47 }
48 
49 struct NeverStopToken {
50   enum isStopRequested = false;
51   enum isStopPossible = false;
52 }
53 
54 StopCallback onStop(StopSource stopSource, void delegate() nothrow @safe shared callback) nothrow @safe {
55   auto cb = new StopCallback(callback);
56   if (stopSource.state.try_add_callback(cb, true))
57     cb.source = stopSource;
58   return cb;
59 }
60 
61 StopCallback onStop(StopSource stopSource, void function() nothrow @safe callback) nothrow @trusted {
62   import std.functional : toDelegate;
63   return stopSource.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
64 }
65 
66 StopCallback onStop(StopToken)(StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe {
67   if (stopToken.isStopPossible) {
68     return stopToken.source.onStop(callback);
69   }
70   return new StopCallback(callback);
71 }
72 
73 StopCallback onStop(StopToken)(StopToken stopToken, void function() nothrow @safe callback) nothrow @trusted {
74   import std.functional : toDelegate;
75   return stopToken.onStop(cast(void delegate() nothrow @safe shared)callback.toDelegate);
76 }
77 
78 class StopCallback {
79   void dispose() nothrow @trusted @nogc {
80     import core.atomic : cas;
81 
82     if (source is null)
83       return;
84     auto local = source;
85     static if (__traits(compiles, cas(&source, local, null))) {
86       if (!cas(&source, local, null)) {
87         assert(source is null);
88         return;
89       }
90     } else {
91       if (!cas(cast(shared)&source, cast(shared)local, null)) {
92         assert(source is null);
93         return;
94       }
95     }
96     local.state.remove_callback(this);
97   }
98   void dispose() shared nothrow @trusted @nogc {
99     (cast()this).dispose();
100   }
101 
102 private:
103   this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
104     this.callback = callback;
105   }
106 
107   void delegate() nothrow shared @safe callback;
108   StopSource source;
109 
110   StopCallback next_ = null;
111   StopCallback* prev_ = null;
112   bool* isRemoved_ = null;
113   shared bool callbackFinishedExecuting = false;
114 
115   void execute() nothrow @safe {
116     callback();
117   }
118 }
119 
120 deprecated("Use regular StopToken") alias StopTokenObject = StopToken;
121 
122 auto stopTokenObject(StopToken stopToken) {
123   return stopToken;
124 }
125 
126 auto stopTokenObject(NeverStopToken stopToken) {
127   StopSource s = null;
128   return StopToken(s);
129 }
130 
131 private void spin_yield() nothrow @trusted @nogc {
132   // TODO: could use the pause asm instruction
133   // it is available in LDC as intrinsic... but not in DMD
134   import core.thread : Thread;
135 
136   Thread.yield();
137 }
138 
139 private struct stop_state {
140   import core.thread : Thread;
141   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp;
142 
143   static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () {
144       import core.internal.atomic : atomicCompareExchangeWeakNoResult;
145     }))
146     import core.atomic : casWeak;
147   else
148     auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe {
149       import core.atomic : cas;
150 
151       static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis)))
152         return cas!(M1, M2)(here, ifThis, writeThis);
153       else
154         return cas(here, ifThis, writeThis);
155     }
156 
157 public:
158   void add_token_reference() nothrow @safe @nogc {
159     // TODO: want to use atomicFetchAdd but (proper) support is only recent
160     // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
161     state_.atomicOp!"+="(token_ref_increment);
162   }
163 
164   void remove_token_reference() nothrow @safe @nogc {
165     // TODO: want to use atomicFetchSub but (proper) support is only recent
166     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
167     state_.atomicOp!"-="(token_ref_increment);
168   }
169 
170   void add_source_reference() nothrow @safe @nogc {
171     // TODO: want to use atomicFetchAdd but (proper) support is only recent
172     // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
173     state_.atomicOp!"+="(source_ref_increment);
174   }
175 
176   void remove_source_reference() nothrow @safe @nogc {
177     // TODO: want to use atomicFetchSub but (proper) support is only recent
178     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
179     state_.atomicOp!"-="(source_ref_increment);
180   }
181 
182   bool request_stop() nothrow @safe {
183 
184     if (!try_lock_and_signal_until_signalled()) {
185       // Stop has already been requested.
186       return false;
187     }
188 
189     // Set the 'stop_requested' signal and acquired the lock.
190 
191     signallingThread_ = Thread.getThis();
192 
193     while (head_ !is null) {
194       // Dequeue the head of the queue
195       auto cb = head_;
196       head_ = cb.next_;
197       const bool anyMore = head_ !is null;
198       if (anyMore) {
199         (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
200       }
201       // Mark this item as removed from the list.
202       cb.prev_ = null;
203 
204       // Don't hold lock while executing callback
205       // so we don't block other threads from deregistering callbacks.
206       unlock();
207 
208       // TRICKY: Need to store a flag on the stack here that the callback
209       // can use to signal that the destructor was executed inline
210       // during the call. If the destructor was executed inline then
211       // it's not safe to dereference cb after execute() returns.
212       // If the destructor runs on some other thread then the other
213       // thread will block waiting for this thread to signal that the
214       // callback has finished executing.
215       bool isRemoved = false;
216       (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.
217 
218       cb.execute();
219 
220       if (!isRemoved) {
221         cb.isRemoved_ = null;
222         cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true);
223       }
224 
225       if (!anyMore) {
226         // This was the last item in the queue when we dequeued it.
227         // No more items should be added to the queue after we have
228         // marked the state as interrupted, only removed from the queue.
229         // Avoid acquring/releasing the lock in this case.
230         return true;
231       }
232 
233       lock();
234     }
235 
236     unlock();
237 
238     return true;
239   }
240 
241   bool is_stop_requested() nothrow @safe @nogc {
242     return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
243   }
244 
245   bool is_stop_requestable() nothrow @safe @nogc {
246     return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
247   }
248 
249   bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
250     ulong oldState;
251     do {
252       goto load_state;
253       do {
254         spin_yield();
255       load_state:
256         oldState = state_.atomicLoad!(MemoryOrder.acq);
257         if (is_stop_requested(oldState)) {
258           cb.execute();
259           return false;
260         }
261         else if (!is_stop_requestable(oldState)) {
262           return false;
263         }
264       }
265       while (is_locked(oldState));
266     }
267     while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag));
268 
269     // Push callback onto callback list.
270     cb.next_ = head_;
271     if (cb.next_ !is null) {
272       cb.next_.prev_ = &cb.next_;
273     }
274     () @trusted { cb.prev_ = &head_; } ();
275     head_ = cb;
276 
277     if (incrementRefCountIfSuccessful) {
278       unlock_and_increment_token_ref_count();
279     }
280     else {
281       unlock();
282     }
283 
284     // Successfully added the callback.
285     return true;
286   }
287 
288   void remove_callback(StopCallback cb) nothrow @safe @nogc {
289     lock();
290 
291     if (cb.prev_ !is null) {
292       // Still registered, not yet executed
293       // Just remove from the list.
294       *cb.prev_ = cb.next_;
295       if (cb.next_ !is null) {
296         cb.next_.prev_ = cb.prev_;
297       }
298 
299       unlock_and_decrement_token_ref_count();
300 
301       return;
302     }
303 
304     unlock();
305 
306     // Callback has either already executed or is executing
307     // concurrently on another thread.
308 
309     if (signallingThread_ is Thread.getThis()) {
310       // Callback executed on this thread or is still currently executing
311       // and is deregistering itself from within the callback.
312       if (cb.isRemoved_ !is null) {
313         // Currently inside the callback, let the request_stop() method
314         // know the object is about to be destructed and that it should
315         // not try to access the object when the callback returns.
316         *cb.isRemoved_ = true;
317       }
318     }
319     else {
320       // Callback is currently executing on another thread,
321       // block until it finishes executing.
322       while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
323         spin_yield();
324       }
325     }
326 
327     remove_token_reference();
328   }
329 
330 private:
331   static bool is_locked(ulong state) nothrow @safe @nogc {
332     return (state & locked_flag) != 0;
333   }
334 
335   static bool is_stop_requested(ulong state) nothrow @safe @nogc {
336     return (state & stop_requested_flag) != 0;
337   }
338 
339   static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
340     // Interruptible if it has already been interrupted or if there are
341     // still interrupt_source instances in existence.
342     return is_stop_requested(state) || (state >= source_ref_increment);
343   }
344 
345   bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
346     ulong oldState;
347     do {
348       oldState = state_.atomicLoad!(MemoryOrder.acq);
349       if (is_stop_requested(oldState))
350         return false;
351       while (is_locked(oldState)) {
352         spin_yield();
353         oldState = state_.atomicLoad!(MemoryOrder.acq);
354         if (is_stop_requested(oldState))
355           return false;
356       }
357     }
358     while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState,
359         oldState | stop_requested_flag | locked_flag));
360     return true;
361   }
362 
363   void lock() nothrow @safe @nogc {
364     ulong oldState;
365     do {
366       oldState = state_.atomicLoad!(MemoryOrder.raw);
367       while (is_locked(oldState)) {
368         spin_yield();
369         oldState = state_.atomicLoad!(MemoryOrder.raw);
370       }
371     }
372     while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState,
373         oldState | locked_flag));
374   }
375 
376   void unlock() nothrow @safe @nogc {
377     // TODO: want to use atomicFetchSub but (proper) support is only recent
378     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
379     state_.atomicOp!"-="(locked_flag);
380   }
381 
382   void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
383     // TODO: want to use atomicFetchSub but (proper) support is only recent
384     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
385     state_.atomicOp!"-="(locked_flag - token_ref_increment);
386   }
387 
388   void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
389     // TODO: want to use atomicFetchSub but (proper) support is only recent
390     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
391     state_.atomicOp!"-="(locked_flag + token_ref_increment);
392   }
393 
394   enum stop_requested_flag = 1L;
395   enum locked_flag = 2L;
396   enum token_ref_increment = 4L;
397   enum source_ref_increment = 1L << 33u;
398 
399   // bit 0 - stop-requested
400   // bit 1 - locked
401   // bits 2-32 - token ref count (31 bits)
402   // bits 33-63 - source ref count (31 bits)
403   shared ulong state_ = source_ref_increment;
404   StopCallback head_ = null;
405   Thread signallingThread_;
406 }