1 module concurrency.nursery;
2 
3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
4 import concurrency.thread : LocalThreadExecutor;
5 import concurrency.receiver : getStopToken;
6 import concurrency.scheduler : SchedulerObjectBase;
7 import std.typecons : Nullable;
8 
9 /// A Nursery is a place for senders to be ran in, while being a Sender itself.
10 /// Stopping the Nursery cancels all senders.
11 /// When any Sender completes with an Error all Senders are canceled as well.
12 /// Cancellation is signaled with a StopToken.
13 /// Senders themselves bare the responsibility to respond to stop requests.
14 /// When cancellation happens all Senders are waited on for completion.
15 /// Senders can be added to the Nursery at any time.
16 /// Senders are only started when the Nursery itself is being awaited on.
17 class Nursery : StopSource {
18   import concurrency.sender : isSender, OperationalStateBase;
19   import core.sync.mutex : Mutex;
20 
21   alias Value = void;
22   private {
23     Node[] operations;
24     struct Node {
25       OperationalStateBase state;
26       void start() @safe nothrow {
27         state.start();
28       }
29       size_t id;
30     }
31     Mutex mutex;
32     shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks
33     shared size_t counter = 0;
34     Throwable throwable; // first throwable from sender, if any
35     ReceiverObject receiver;
36     StopCallback stopCallback;
37     Nursery assumeThreadSafe() @trusted shared nothrow {
38       return cast(Nursery)this;
39     }
40   }
41 
42   this() @safe shared {
43     import concurrency.utils : resetScheduler;
44     resetScheduler();
45     with(assumeThreadSafe) mutex = new Mutex();
46   }
47 
48   override bool stop() nothrow @trusted {
49     auto result = super.stop();
50 
51     if (result)
52       (cast(shared)this).done(-1);
53 
54     return result;
55   }
56 
57   override bool stop() nothrow @trusted shared {
58     return (cast(Nursery)this).stop();
59   }
60 
61   StopToken getStopToken() nothrow @trusted shared {
62     return StopToken(cast(Nursery)this);
63   }
64 
65   private auto getScheduler() nothrow @trusted shared {
66     return (cast()receiver).getScheduler();
67   }
68 
69   private void setError(Throwable e, size_t id) nothrow @safe shared {
70     import core.atomic : cas;
71     with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
72     done(id);
73     stop();
74   }
75 
76   private void done(size_t id) nothrow @trusted shared {
77     import std.algorithm : countUntil, remove;
78     import core.atomic : atomicOp;
79 
80     with (assumeThreadSafe) {
81       mutex.lock_nothrow();
82       auto idx = operations.countUntil!(o => o.id == id);
83       if (idx != -1)
84         operations = operations.remove(idx);
85       bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0;
86       auto localReceiver = receiver;
87       auto localThrowable = throwable;
88       if (isDone) {
89         throwable = null;
90         receiver = null;
91         if (stopCallback)
92           stopCallback.dispose();
93         stopCallback = null;
94       }
95       mutex.unlock_nothrow();
96 
97       if (isDone && localReceiver !is null) {
98         if (localThrowable !is null) {
99           localReceiver.setError(localThrowable);
100         } else if (isStopRequested()) {
101           localReceiver.setDone();
102         } else {
103           try {
104             localReceiver.setValue();
105           } catch (Exception e) {
106             localReceiver.setError(e);
107           }
108         }
109       }
110     }
111   }
112 
113   void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) {
114     if (!sender.isNull)
115       run(sender.get());
116   }
117 
118   void run(Sender)(Sender sender) shared @trusted {
119     import concepts;
120     static assert (models!(Sender, isSender));
121     import std.typecons : Nullable;
122     import core.atomic : atomicOp, atomicLoad;
123     import concurrency.sender : connectHeap;
124 
125     static if (is(Sender == class) || is(Sender == interface))
126       if (sender is null)
127         return;
128 
129     if (busy.atomicLoad() == 0)
130       throw new Exception("This nursery is already stopped, it cannot accept more work.");
131 
132     size_t id = atomicOp!"+="(counter, 1);
133     auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id));
134 
135     mutex.lock_nothrow();
136 
137     operations ~= cast(shared) Node(op, id);
138     atomicOp!"+="(busy, 1);
139     bool hasStarted = this.receiver !is null;
140     mutex.unlock_nothrow();
141 
142     if (hasStarted)
143       op.start();
144   }
145 
146   auto connect(Receiver)(return Receiver receiver) @trusted scope {
147     return (cast(shared)this).connect(receiver);
148   }
149 
150   auto connect(Receiver)(Receiver receiver) shared scope @safe {
151     final class ReceiverImpl : ReceiverObject {
152       Receiver receiver;
153       this(Receiver receiver) { this.receiver = receiver; }
154       void setValue() @safe { receiver.setValue(); }
155       void setDone() nothrow @safe { receiver.setDone(); }
156       void setError(Throwable e) nothrow @safe { receiver.setError(e); }
157       SchedulerObjectBase getScheduler() nothrow @safe {
158         import concurrency.scheduler : toSchedulerObject;
159         return receiver.getScheduler().toSchedulerObject();
160       }
161     }
162     auto stopToken = receiver.getStopToken();
163     auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))();
164     return NurseryOp(this, cb, new ReceiverImpl(receiver));
165   }
166 
167   private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
168     with(assumeThreadSafe) {
169       mutex.lock_nothrow();
170       assert(this.receiver is null, "Cannot await a nursery twice.");
171       receiver = r;
172       stopCallback = cb;
173       auto ops = operations.dup();
174       mutex.unlock_nothrow();
175 
176       // start all work
177       foreach(op; ops)
178         op.start();
179     }
180   }
181 }
182 
183 private interface ReceiverObject {
184   void setValue() @safe;
185   void setDone() nothrow @safe;
186   void setError(Throwable e) nothrow @safe;
187   SchedulerObjectBase getScheduler() nothrow @safe;
188 }
189 
190 private struct NurseryReceiver(Value) {
191   shared Nursery nursery;
192   size_t id;
193   this(shared Nursery nursery, size_t id) {
194     this.nursery = nursery;
195     this.id = id;
196   }
197 
198   static if (is(Value == void)) {
199     void setValue() shared @safe {
200       (cast() this).setDone();
201     }
202     void setValue() @safe {
203       (cast() this).setDone();
204     }
205   } else {
206     void setValue(Value val) shared @trusted {
207       (cast() this).setDone();
208     }
209     void setValue(Value val) @safe {
210       nursery.done(id);
211     }
212   }
213 
214   void setDone() nothrow @safe {
215     nursery.done(id);
216   }
217 
218   void setError(Throwable e) nothrow @safe {
219     nursery.setError(e, id);
220   }
221 
222   auto getStopToken() @safe {
223     return nursery.getStopToken();
224   }
225 
226   auto getScheduler() @safe {
227     return nursery.getScheduler();
228   }
229 }
230 
231 private struct NurseryOp {
232   shared Nursery nursery;
233   StopCallback cb;
234   ReceiverObject receiver;
235   @disable this(ref return scope typeof(this) rhs);
236   @disable this(this);
237   this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe scope return {
238     nursery = n;
239     this.cb = cb;
240     receiver = r;
241   }
242   void start() nothrow scope @trusted {
243     import core.atomic : atomicLoad;
244     if (nursery.busy.atomicLoad == 0)
245       receiver.setDone();
246     else
247       nursery.setReceiver(receiver, cb);
248   }
249 }