1 module concurrency.nursery;
2 
3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
4 import concurrency.thread : LocalThreadExecutor;
5 import concurrency.receiver : getStopToken;
6 import concurrency.scheduler : SchedulerObjectBase;
7 import std.typecons : Nullable;
8 
9 /// A Nursery is a place for senders to be ran in, while being a Sender itself.
10 /// Stopping the Nursery cancels all senders.
11 /// When any Sender completes with an Error all Senders are canceled as well.
12 /// Cancellation is signaled with a StopToken.
13 /// Senders themselves bare the responsibility to respond to stop requests.
14 /// When cancellation happens all Senders are waited on for completion.
15 /// Senders can be added to the Nursery at any time.
16 /// Senders are only started when the Nursery itself is being awaited on.
17 class Nursery : StopSource {
18   import concurrency.sender : isSender, OperationalStateBase;
19   import core.sync.mutex : Mutex;
20   import concepts;
21   static assert (models!(typeof(this), isSender));
22 
23   alias Value = void;
24   private {
25     Node[] operations;
26     struct Node {
27       OperationalStateBase state;
28       void start() @safe nothrow {
29         state.start();
30       }
31       size_t id;
32     }
33     Mutex mutex;
34     shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks
35     shared size_t counter = 0;
36     Throwable throwable; // first throwable from sender, if any
37     ReceiverObject receiver;
38     StopCallback stopCallback;
39     Nursery assumeThreadSafe() @trusted shared nothrow {
40       return cast(Nursery)this;
41     }
42   }
43 
44   this() @safe shared {
45     import concurrency.utils : resetScheduler;
46     resetScheduler();
47     with(assumeThreadSafe) mutex = new Mutex();
48   }
49 
50   override bool stop() nothrow @trusted {
51     auto result = super.stop();
52 
53     if (result)
54       (cast(shared)this).done(-1);
55 
56     return result;
57   }
58 
59   override bool stop() nothrow @trusted shared {
60     return (cast(Nursery)this).stop();
61   }
62 
63   StopToken getStopToken() nothrow @trusted shared {
64     return StopToken(cast(Nursery)this);
65   }
66 
67   private auto getScheduler() nothrow @trusted shared {
68     return (cast()receiver).getScheduler();
69   }
70 
71   private void setError(Throwable e, size_t id) nothrow @safe shared {
72     import core.atomic : cas;
73     with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
74     done(id);
75     stop();
76   }
77 
78   private void done(size_t id) nothrow @trusted shared {
79     import std.algorithm : countUntil, remove;
80     import core.atomic : atomicOp;
81 
82     with (assumeThreadSafe) {
83       mutex.lock_nothrow();
84       auto idx = operations.countUntil!(o => o.id == id);
85       if (idx != -1)
86         operations = operations.remove(idx);
87       bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0;
88       auto localReceiver = receiver;
89       auto localThrowable = throwable;
90       if (isDone) {
91         throwable = null;
92         receiver = null;
93         if (stopCallback)
94           stopCallback.dispose();
95         stopCallback = null;
96       }
97       mutex.unlock_nothrow();
98 
99       if (isDone && localReceiver !is null) {
100         if (localThrowable !is null) {
101           localReceiver.setError(localThrowable);
102         } else if (isStopRequested()) {
103           localReceiver.setDone();
104         } else {
105           try {
106             localReceiver.setValue();
107           } catch (Exception e) {
108             localReceiver.setError(e);
109           }
110         }
111       }
112     }
113   }
114 
115   void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) {
116     if (!sender.isNull)
117       run(sender.get());
118   }
119 
120   void run(Sender)(Sender sender) shared @trusted {
121     import concepts;
122     static assert (models!(Sender, isSender));
123     import std.typecons : Nullable;
124     import core.atomic : atomicOp, atomicLoad;
125     import concurrency.sender : connectHeap;
126 
127     static if (is(Sender == class) || is(Sender == interface))
128       if (sender is null)
129         return;
130 
131     if (busy.atomicLoad() == 0)
132       throw new Exception("This nursery is already stopped, it cannot accept more work.");
133 
134     size_t id = atomicOp!"+="(counter, 1);
135     auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id));
136 
137     mutex.lock_nothrow();
138 
139     operations ~= cast(shared) Node(op, id);
140     atomicOp!"+="(busy, 1);
141     bool hasStarted = this.receiver !is null;
142     mutex.unlock_nothrow();
143 
144     if (hasStarted)
145       op.start();
146   }
147 
148   auto connect(Receiver)(return Receiver receiver) @safe return scope {
149     return asShared.connect(receiver);
150   }
151 
152   private shared(Nursery) asShared() @trusted return scope {
153     return cast(shared)this;
154   }
155 
156   auto connect(Receiver)(return Receiver receiver) shared @trusted return scope {
157     final class ReceiverImpl : ReceiverObject {
158       Receiver receiver;
159       SchedulerObjectBase scheduler;
160       this(return Receiver receiver) @trusted { this.receiver = receiver; }
161       void setValue() @safe { receiver.setValue(); }
162       void setDone() nothrow @safe { receiver.setDone(); }
163       void setError(Throwable e) nothrow @safe { receiver.setError(e); }
164       SchedulerObjectBase getScheduler() nothrow @safe {
165         import concurrency.scheduler : toSchedulerObject;
166         if (scheduler is null)
167           scheduler = receiver.getScheduler().toSchedulerObject();
168         return scheduler;
169       }
170     }
171     auto stopToken = receiver.getStopToken();
172     auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))();
173     return NurseryOp(this, cb, new ReceiverImpl(receiver));
174   }
175 
176   private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
177     with(assumeThreadSafe) {
178       mutex.lock_nothrow();
179       assert(this.receiver is null, "Cannot await a nursery twice.");
180       receiver = r;
181       stopCallback = cb;
182       auto ops = operations.dup();
183       mutex.unlock_nothrow();
184 
185       // start all work
186       foreach(op; ops)
187         op.start();
188     }
189   }
190 }
191 
192 private interface ReceiverObject {
193   void setValue() @safe;
194   void setDone() nothrow @safe;
195   void setError(Throwable e) nothrow @safe;
196   SchedulerObjectBase getScheduler() nothrow @safe;
197 }
198 
199 private struct NurseryReceiver(Value) {
200   shared Nursery nursery;
201   size_t id;
202   this(shared Nursery nursery, size_t id) {
203     this.nursery = nursery;
204     this.id = id;
205   }
206 
207   static if (is(Value == void)) {
208     void setValue() shared @safe {
209       (cast() this).setDone();
210     }
211     void setValue() @safe {
212       (cast() this).setDone();
213     }
214   } else {
215     void setValue(Value val) shared @trusted {
216       (cast() this).setDone();
217     }
218     void setValue(Value val) @safe {
219       nursery.done(id);
220     }
221   }
222 
223   void setDone() nothrow @safe {
224     nursery.done(id);
225   }
226 
227   void setError(Throwable e) nothrow @safe {
228     nursery.setError(e, id);
229   }
230 
231   auto getStopToken() @safe {
232     return nursery.getStopToken();
233   }
234 
235   auto getScheduler() @safe {
236     return nursery.getScheduler();
237   }
238 }
239 
240 private struct NurseryOp {
241   shared Nursery nursery;
242   StopCallback cb;
243   ReceiverObject receiver;
244   @disable this(ref return scope typeof(this) rhs);
245   @disable this(this);
246   this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe return scope {
247     nursery = n;
248     this.cb = cb;
249     receiver = r;
250   }
251   void start() nothrow scope @trusted {
252     import core.atomic : atomicLoad;
253     if (nursery.busy.atomicLoad == 0)
254       receiver.setDone();
255     else
256       nursery.setReceiver(receiver, cb);
257   }
258 }