1 module concurrency.stoptoken;
2 
3 // originally this code is from https://github.com/josuttis/jthread by Nicolai Josuttis
4 // it is licensed under the Creative Commons Attribution 4.0 Internation License http://creativecommons.org/licenses/by/4.0
5 
6 class StopSource {
7   private stop_state state;
8   bool stop() nothrow @safe {
9     return state.request_stop();
10   }
11 
12   bool stop() nothrow @trusted shared {
13     return (cast(StopSource)this).state.request_stop();
14   }
15 
16   bool isStopRequested() nothrow @safe @nogc {
17     return state.is_stop_requested();
18   }
19 
20   bool isStopRequested() nothrow @trusted @nogc shared {
21     return (cast(StopSource)this).isStopRequested();
22   }
23 
24   /// resets the internal state, only do this if you are sure nothing else is looking at this...
25   void reset() @system @nogc {
26     this.state = stop_state();
27   }
28 }
29 
30 struct StopToken {
31   private StopSource source;
32   this(StopSource source) nothrow @safe @nogc {
33     this.source = source;
34   }
35 
36   bool isStopRequested() nothrow @safe @nogc {
37     return source.isStopRequested();
38   }
39 
40   enum isStopPossible = true;
41 }
42 
43 struct NeverStopToken {
44   enum isStopRequested = false;
45   enum isStopPossible = false;
46 }
47 
48 enum isStopToken(T) = true; // TODO:
49 
50 StopCallback onStop(StopToken)(auto ref StopToken stopToken, void delegate() nothrow @safe shared callback) nothrow @safe if (isStopToken!StopToken) {
51   import std.traits : hasMember;
52   auto cb = new StopCallback(callback);
53   static if (stopToken.isStopPossible && hasMember!(StopToken, "source")) {
54     if (stopToken.source.state.try_add_callback(cb, true))
55       cb.source = stopToken.source;
56   }
57   return cb;
58 }
59 
60 class StopCallback {
61   void dispose() nothrow @trusted @nogc {
62     import core.atomic : cas;
63 
64     if (source is null)
65       return;
66     auto local = source;
67     static if (__traits(compiles, cas(&source, local, null))) {
68       if (!cas(&source, local, null)) {
69         assert(source is null);
70         return;
71       }
72     } else {
73       if (!cas(cast(shared)&source, cast(shared)local, null)) {
74         assert(source is null);
75         return;
76       }
77     }
78     local.state.remove_callback(this);
79   }
80 
81 private:
82   this(void delegate() nothrow shared @safe callback) nothrow @safe @nogc {
83     this.callback = callback;
84   }
85 
86   void delegate() nothrow shared @safe callback;
87   StopSource source;
88 
89   StopCallback next_ = null;
90   StopCallback* prev_ = null;
91   bool* isRemoved_ = null;
92   shared bool callbackFinishedExecuting = false;
93 
94   void execute() nothrow @safe {
95     callback();
96   }
97 }
98 
99 interface StopTokenObject {
100   bool isStopRequested() nothrow @safe;
101   bool isStopPossible() nothrow @safe;
102   StopCallback onStop(void delegate() nothrow @safe shared callback) nothrow @safe;
103 }
104 
105 class StopTokenObjectImpl(StopToken) : StopTokenObject {
106   private StopToken token;
107   this(StopToken token) { this.token = token; }
108   bool isStopRequested() nothrow @safe {
109     return token.isStopRequested;
110   }
111   bool isStopPossible() nothrow @safe {
112     return token.isStopPossible;
113   }
114   StopCallback onStop(void delegate() nothrow @safe shared callback) nothrow @safe {
115     return token.onStop(callback);
116   }
117 }
118 
119 auto stopTokenObject(StopToken)(StopToken stopToken) {
120   return new StopTokenObjectImpl!(StopToken)(stopToken);
121 }
122 
123 
124 
125 private void spin_yield() nothrow @trusted @nogc {
126   // TODO: could use the pause asm instruction
127   // it is available in LDC as intrinsic... but not in DMD
128   import core.thread : Thread;
129 
130   Thread.yield();
131 }
132 
133 private struct stop_state {
134   import core.thread : Thread;
135   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicOp;
136 
137   static if (__traits(compiles, () { import core.atomic : casWeak; }) && __traits(compiles, () {
138       import core.internal.atomic : atomicCompareExchangeWeakNoResult;
139     }))
140     import core.atomic : casWeak;
141   else
142     auto casWeak(MemoryOrder M1, MemoryOrder M2, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) pure nothrow @nogc @safe {
143       import core.atomic : cas;
144 
145       static if (__traits(compiles, cas!(M1, M2)(here, ifThis, writeThis)))
146         return cas!(M1, M2)(here, ifThis, writeThis);
147       else
148         return cas(here, ifThis, writeThis);
149     }
150 
151 public:
152   void add_token_reference() nothrow @safe @nogc {
153     // TODO: want to use atomicFetchAdd but (proper) support is only recent
154     // state_.atomicFetchAdd!(MemoryOrder.raw)(token_ref_increment);
155     state_.atomicOp!"+="(token_ref_increment);
156   }
157 
158   void remove_token_reference() nothrow @safe @nogc {
159     // TODO: want to use atomicFetchSub but (proper) support is only recent
160     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(token_ref_increment);
161     state_.atomicOp!"-="(token_ref_increment);
162   }
163 
164   void add_source_reference() nothrow @safe @nogc {
165     // TODO: want to use atomicFetchAdd but (proper) support is only recent
166     // state_.atomicFetchAdd!(MemoryOrder.raw)(source_ref_increment);
167     state_.atomicOp!"+="(source_ref_increment);
168   }
169 
170   void remove_source_reference() nothrow @safe @nogc {
171     // TODO: want to use atomicFetchSub but (proper) support is only recent
172     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(source_ref_increment);
173     state_.atomicOp!"-="(source_ref_increment);
174   }
175 
176   bool request_stop() nothrow @safe {
177 
178     if (!try_lock_and_signal_until_signalled()) {
179       // Stop has already been requested.
180       return false;
181     }
182 
183     // Set the 'stop_requested' signal and acquired the lock.
184 
185     signallingThread_ = Thread.getThis();
186 
187     while (head_ !is null) {
188       // Dequeue the head of the queue
189       auto cb = head_;
190       head_ = cb.next_;
191       const bool anyMore = head_ !is null;
192       if (anyMore) {
193         (() @trusted => head_.prev_ = &head_)(); // compiler 2.091.1 complains "address of variable this assigned to this with longer lifetime". But this is this, how can it have a longer lifetime...
194       }
195       // Mark this item as removed from the list.
196       cb.prev_ = null;
197 
198       // Don't hold lock while executing callback
199       // so we don't block other threads from deregistering callbacks.
200       unlock();
201 
202       // TRICKY: Need to store a flag on the stack here that the callback
203       // can use to signal that the destructor was executed inline
204       // during the call. If the destructor was executed inline then
205       // it's not safe to dereference cb after execute() returns.
206       // If the destructor runs on some other thread then the other
207       // thread will block waiting for this thread to signal that the
208       // callback has finished executing.
209       bool isRemoved = false;
210       (() @trusted => cb.isRemoved_ = &isRemoved)(); // the pointer to the stack here is removed 3 lines down.
211 
212       cb.execute();
213 
214       if (!isRemoved) {
215         cb.isRemoved_ = null;
216         cb.callbackFinishedExecuting.atomicStore!(MemoryOrder.rel)(true);
217       }
218 
219       if (!anyMore) {
220         // This was the last item in the queue when we dequeued it.
221         // No more items should be added to the queue after we have
222         // marked the state as interrupted, only removed from the queue.
223         // Avoid acquring/releasing the lock in this case.
224         return true;
225       }
226 
227       lock();
228     }
229 
230     unlock();
231 
232     return true;
233   }
234 
235   bool is_stop_requested() nothrow @safe @nogc {
236     return is_stop_requested(state_.atomicLoad!(MemoryOrder.acq));
237   }
238 
239   bool is_stop_requestable() nothrow @safe @nogc {
240     return is_stop_requestable(state_.atomicLoad!(MemoryOrder.acq));
241   }
242 
243   bool try_add_callback(StopCallback cb, bool incrementRefCountIfSuccessful) nothrow @safe {
244     ulong oldState;
245     do {
246       goto load_state;
247       do {
248         spin_yield();
249       load_state:
250         oldState = state_.atomicLoad!(MemoryOrder.acq);
251         if (is_stop_requested(oldState)) {
252           cb.execute();
253           return false;
254         }
255         else if (!is_stop_requestable(oldState)) {
256           return false;
257         }
258       }
259       while (is_locked(oldState));
260     }
261     while (!casWeak!(MemoryOrder.acq, MemoryOrder.acq)(&state_, oldState, oldState | locked_flag));
262 
263     // Push callback onto callback list.
264     cb.next_ = head_;
265     if (cb.next_ !is null) {
266       cb.next_.prev_ = &cb.next_;
267     }
268     cb.prev_ = &head_;
269     head_ = cb;
270 
271     if (incrementRefCountIfSuccessful) {
272       unlock_and_increment_token_ref_count();
273     }
274     else {
275       unlock();
276     }
277 
278     // Successfully added the callback.
279     return true;
280   }
281 
282   void remove_callback(StopCallback cb) nothrow @safe @nogc {
283     lock();
284 
285     if (cb.prev_ !is null) {
286       // Still registered, not yet executed
287       // Just remove from the list.
288       *cb.prev_ = cb.next_;
289       if (cb.next_ !is null) {
290         cb.next_.prev_ = cb.prev_;
291       }
292 
293       unlock_and_decrement_token_ref_count();
294 
295       return;
296     }
297 
298     unlock();
299 
300     // Callback has either already executed or is executing
301     // concurrently on another thread.
302 
303     if (signallingThread_ is Thread.getThis()) {
304       // Callback executed on this thread or is still currently executing
305       // and is deregistering itself from within the callback.
306       if (cb.isRemoved_ !is null) {
307         // Currently inside the callback, let the request_stop() method
308         // know the object is about to be destructed and that it should
309         // not try to access the object when the callback returns.
310         *cb.isRemoved_ = true;
311       }
312     }
313     else {
314       // Callback is currently executing on another thread,
315       // block until it finishes executing.
316       while (!cb.callbackFinishedExecuting.atomicLoad!(MemoryOrder.acq)) {
317         spin_yield();
318       }
319     }
320 
321     remove_token_reference();
322   }
323 
324 private:
325   static bool is_locked(ulong state) nothrow @safe @nogc {
326     return (state & locked_flag) != 0;
327   }
328 
329   static bool is_stop_requested(ulong state) nothrow @safe @nogc {
330     return (state & stop_requested_flag) != 0;
331   }
332 
333   static bool is_stop_requestable(ulong state) nothrow @safe @nogc {
334     // Interruptible if it has already been interrupted or if there are
335     // still interrupt_source instances in existence.
336     return is_stop_requested(state) || (state >= source_ref_increment);
337   }
338 
339   bool try_lock_and_signal_until_signalled() nothrow @safe @nogc {
340     ulong oldState;
341     do {
342       oldState = state_.atomicLoad!(MemoryOrder.acq);
343       if (is_stop_requested(oldState))
344         return false;
345       while (is_locked(oldState)) {
346         spin_yield();
347         oldState = state_.atomicLoad!(MemoryOrder.acq);
348         if (is_stop_requested(oldState))
349           return false;
350       }
351     }
352     while (!casWeak!(MemoryOrder.seq, MemoryOrder.acq)(&state_, oldState,
353         oldState | stop_requested_flag | locked_flag));
354     return true;
355   }
356 
357   void lock() nothrow @safe @nogc {
358     ulong oldState;
359     do {
360       oldState = state_.atomicLoad!(MemoryOrder.raw);
361       while (is_locked(oldState)) {
362         spin_yield();
363         oldState = state_.atomicLoad!(MemoryOrder.raw);
364       }
365     }
366     while (!casWeak!(MemoryOrder.acq, MemoryOrder.raw)((&state_), oldState,
367         oldState | locked_flag));
368   }
369 
370   void unlock() nothrow @safe @nogc {
371     // TODO: want to use atomicFetchSub but (proper) support is only recent
372     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag);
373     state_.atomicOp!"-="(locked_flag);
374   }
375 
376   void unlock_and_increment_token_ref_count() nothrow @safe @nogc {
377     // TODO: want to use atomicFetchSub but (proper) support is only recent
378     // state_.atomicFetchSub!(MemoryOrder.rel)(locked_flag - token_ref_increment);
379     state_.atomicOp!"-="(locked_flag - token_ref_increment);
380   }
381 
382   void unlock_and_decrement_token_ref_count() nothrow @safe @nogc {
383     // TODO: want to use atomicFetchSub but (proper) support is only recent
384     // state_.atomicFetchSub!(MemoryOrder.acq_rel)(locked_flag + token_ref_increment);
385     state_.atomicOp!"-="(locked_flag + token_ref_increment);
386   }
387 
388   enum stop_requested_flag = 1L;
389   enum locked_flag = 2L;
390   enum token_ref_increment = 4L;
391   enum source_ref_increment = 1L << 33u;
392 
393   // bit 0 - stop-requested
394   // bit 1 - locked
395   // bits 2-32 - token ref count (31 bits)
396   // bits 33-63 - source ref count (31 bits)
397   shared ulong state_ = source_ref_increment;
398   StopCallback head_ = null;
399   Thread signallingThread_;
400 }