1 module concurrency.nursery;
2 
3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
4 import concurrency.thread : LocalThreadExecutor;
5 import concurrency.receiver : getStopToken;
6 import concurrency.scheduler : SchedulerObjectBase;
7 import std.typecons : Nullable;
8 
9 /// A Nursery is a place for senders to be ran in, while being a Sender itself.
10 /// Stopping the Nursery cancels all senders.
11 /// When any Sender completes with an Error all Senders are canceled as well.
12 /// Cancellation is signaled with a StopToken.
13 /// Senders themselves bare the responsibility to respond to stop requests.
14 /// When cancellation happens all Senders are waited on for completion.
15 /// Senders can be added to the Nursery at any time.
16 /// Senders are only started when the Nursery itself is being awaited on.
17 class Nursery : StopSource {
18   import concurrency.sender : isSender, OperationalStateBase;
19   import core.sync.mutex : Mutex;
20   import concepts;
21   static assert (models!(typeof(this), isSender));
22 
23   alias Value = void;
24   private {
25     Node[] operations;
26     struct Node {
27       OperationalStateBase state;
28       void start() @safe nothrow {
29         state.start();
30       }
31       size_t id;
32     }
33     Mutex mutex;
34     shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks
35     shared size_t counter = 0;
36     Throwable throwable; // first throwable from sender, if any
37     ReceiverObject receiver;
38     StopCallback stopCallback;
39     Nursery assumeThreadSafe() @trusted shared nothrow {
40       return cast(Nursery)this;
41     }
42   }
43 
44   this() @safe shared {
45     import concurrency.utils : resetScheduler;
46     resetScheduler();
47     with(assumeThreadSafe) mutex = new Mutex();
48   }
49 
50   override bool stop() nothrow @trusted {
51     auto result = super.stop();
52 
53     if (result)
54       (cast(shared)this).done(-1);
55 
56     return result;
57   }
58 
59   override bool stop() nothrow @trusted shared {
60     return (cast(Nursery)this).stop();
61   }
62 
63   StopToken getStopToken() nothrow @trusted shared {
64     return StopToken(cast(Nursery)this);
65   }
66 
67   private auto getScheduler() nothrow @trusted shared {
68     return (cast()receiver).getScheduler();
69   }
70 
71   private void setError(Throwable e, size_t id) nothrow @safe shared {
72     import core.atomic : cas;
73     with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
74     done(id);
75     stop();
76   }
77 
78   private void done(size_t id) nothrow @trusted shared {
79     import std.algorithm : countUntil, remove;
80     import core.atomic : atomicOp;
81 
82     with (assumeThreadSafe) {
83       mutex.lock_nothrow();
84       auto idx = operations.countUntil!(o => o.id == id);
85       if (idx != -1)
86         operations = operations.remove(idx);
87       bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0;
88       auto localReceiver = receiver;
89       auto localThrowable = throwable;
90       if (isDone) {
91         throwable = null;
92         receiver = null;
93         if (stopCallback)
94           stopCallback.dispose();
95         stopCallback = null;
96       }
97       mutex.unlock_nothrow();
98 
99       if (isDone && localReceiver !is null) {
100         if (localThrowable !is null) {
101           localReceiver.setError(localThrowable);
102         } else if (isStopRequested()) {
103           localReceiver.setDone();
104         } else {
105           try {
106             localReceiver.setValue();
107           } catch (Exception e) {
108             localReceiver.setError(e);
109           }
110         }
111       }
112     }
113   }
114 
115   void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) {
116     if (!sender.isNull)
117       run(sender.get());
118   }
119 
120   void run(Sender)(Sender sender) shared @trusted {
121     import concepts;
122     static assert (models!(Sender, isSender));
123     import std.typecons : Nullable;
124     import core.atomic : atomicOp, atomicLoad;
125     import concurrency.sender : connectHeap;
126 
127     static if (is(Sender == class) || is(Sender == interface))
128       if (sender is null)
129         return;
130 
131     if (busy.atomicLoad() == 0)
132       throw new Exception("This nursery is already stopped, it cannot accept more work.");
133 
134     size_t id = atomicOp!"+="(counter, 1);
135     auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id));
136 
137     mutex.lock_nothrow();
138 
139     operations ~= cast(shared) Node(op, id);
140     atomicOp!"+="(busy, 1);
141     bool hasStarted = this.receiver !is null;
142     mutex.unlock_nothrow();
143 
144     if (hasStarted)
145       op.start();
146   }
147 
148   auto connect(Receiver)(return Receiver receiver) @trusted scope return {
149     return (cast(shared)this).connect(receiver);
150   }
151 
152   auto connect(Receiver)(return Receiver receiver) shared scope return @trusted {
153     final class ReceiverImpl : ReceiverObject {
154       Receiver receiver;
155       SchedulerObjectBase scheduler;
156       this(return Receiver receiver) @trusted { this.receiver = receiver; }
157       void setValue() @safe { receiver.setValue(); }
158       void setDone() nothrow @safe { receiver.setDone(); }
159       void setError(Throwable e) nothrow @safe { receiver.setError(e); }
160       SchedulerObjectBase getScheduler() nothrow @safe {
161         import concurrency.scheduler : toSchedulerObject;
162         if (scheduler is null)
163           scheduler = receiver.getScheduler().toSchedulerObject();
164         return scheduler;
165       }
166     }
167     auto stopToken = receiver.getStopToken();
168     auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))();
169     return NurseryOp(this, cb, new ReceiverImpl(receiver));
170   }
171 
172   private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
173     with(assumeThreadSafe) {
174       mutex.lock_nothrow();
175       assert(this.receiver is null, "Cannot await a nursery twice.");
176       receiver = r;
177       stopCallback = cb;
178       auto ops = operations.dup();
179       mutex.unlock_nothrow();
180 
181       // start all work
182       foreach(op; ops)
183         op.start();
184     }
185   }
186 }
187 
188 private interface ReceiverObject {
189   void setValue() @safe;
190   void setDone() nothrow @safe;
191   void setError(Throwable e) nothrow @safe;
192   SchedulerObjectBase getScheduler() nothrow @safe;
193 }
194 
195 private struct NurseryReceiver(Value) {
196   shared Nursery nursery;
197   size_t id;
198   this(shared Nursery nursery, size_t id) {
199     this.nursery = nursery;
200     this.id = id;
201   }
202 
203   static if (is(Value == void)) {
204     void setValue() shared @safe {
205       (cast() this).setDone();
206     }
207     void setValue() @safe {
208       (cast() this).setDone();
209     }
210   } else {
211     void setValue(Value val) shared @trusted {
212       (cast() this).setDone();
213     }
214     void setValue(Value val) @safe {
215       nursery.done(id);
216     }
217   }
218 
219   void setDone() nothrow @safe {
220     nursery.done(id);
221   }
222 
223   void setError(Throwable e) nothrow @safe {
224     nursery.setError(e, id);
225   }
226 
227   auto getStopToken() @safe {
228     return nursery.getStopToken();
229   }
230 
231   auto getScheduler() @safe {
232     return nursery.getScheduler();
233   }
234 }
235 
236 private struct NurseryOp {
237   shared Nursery nursery;
238   StopCallback cb;
239   ReceiverObject receiver;
240   @disable this(ref return scope typeof(this) rhs);
241   @disable this(this);
242   this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe scope return {
243     nursery = n;
244     this.cb = cb;
245     receiver = r;
246   }
247   void start() nothrow scope @trusted {
248     import core.atomic : atomicLoad;
249     if (nursery.busy.atomicLoad == 0)
250       receiver.setDone();
251     else
252       nursery.setReceiver(receiver, cb);
253   }
254 }