1 module concurrency.nursery;
2 
3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
4 import concurrency.thread : LocalThreadExecutor;
5 import concurrency.receiver : getStopToken;
6 import concurrency.scheduler : SchedulerObjectBase;
7 import std.typecons : Nullable;
8 
9 /// A Nursery is a place for senders to be ran in, while being a Sender itself.
10 /// Stopping the Nursery cancels all senders.
11 /// When any Sender completes with an Error all Senders are canceled as well.
12 /// Cancellation is signaled with a StopToken.
13 /// Senders themselves bare the responsibility to respond to stop requests.
14 /// When cancellation happens all Senders are waited on for completion.
15 /// Senders can be added to the Nursery at any time.
16 /// Senders are only started when the Nursery itself is being awaited on.
17 class Nursery : StopSource {
18   import concurrency.sender : isSender, OperationalStateBase;
19   import core.sync.mutex : Mutex;
20 
21   alias Value = void;
22   private {
23     Node[] operations;
24     struct Node {
25       OperationalStateBase state;
26       void start() @safe nothrow {
27         state.start();
28       }
29       size_t id;
30     }
31     Mutex mutex;
32     shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks
33     shared size_t counter = 0;
34     Throwable throwable; // first throwable from sender, if any
35     ReceiverObject receiver;
36     StopCallback stopCallback;
37     Nursery assumeThreadSafe() @trusted shared nothrow {
38       return cast(Nursery)this;
39     }
40   }
41 
42   this() @safe shared {
43     import concurrency.utils : resetScheduler;
44     resetScheduler();
45     with(assumeThreadSafe) mutex = new Mutex();
46   }
47 
48   override bool stop() nothrow @trusted {
49     auto result = super.stop();
50 
51     if (result)
52       (cast(shared)this).done(-1);
53 
54     return result;
55   }
56 
57   override bool stop() nothrow @trusted shared {
58     return (cast(Nursery)this).stop();
59   }
60 
61   StopToken getStopToken() nothrow @trusted shared {
62     return StopToken(cast(Nursery)this);
63   }
64 
65   private auto getScheduler() nothrow @trusted shared {
66     return (cast()receiver).getScheduler();
67   }
68 
69   private void setError(Throwable e, size_t id) nothrow @safe shared {
70     import core.atomic : cas;
71     with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
72     done(id);
73     stop();
74   }
75 
76   private void done(size_t id) nothrow @trusted shared {
77     import std.algorithm : countUntil, remove;
78     import core.atomic : atomicOp;
79 
80     with (assumeThreadSafe) {
81       mutex.lock_nothrow();
82       auto idx = operations.countUntil!(o => o.id == id);
83       if (idx != -1)
84         operations = operations.remove(idx);
85       bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0;
86       auto localReceiver = receiver;
87       auto localThrowable = throwable;
88       if (isDone) {
89         throwable = null;
90         receiver = null;
91         if (stopCallback)
92           stopCallback.dispose();
93         stopCallback = null;
94       }
95       mutex.unlock_nothrow();
96 
97       if (isDone && localReceiver !is null) {
98         if (localThrowable !is null) {
99           localReceiver.setError(localThrowable);
100         } else if (isStopRequested()) {
101           localReceiver.setDone();
102         } else {
103           try {
104             localReceiver.setValue();
105           } catch (Exception e) {
106             localReceiver.setError(e);
107           }
108         }
109       }
110     }
111   }
112 
113   void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) {
114     if (!sender.isNull)
115       run(sender.get());
116   }
117 
118   void run(Sender)(Sender sender) shared @trusted {
119     import concepts;
120     static assert (models!(Sender, isSender));
121     import std.typecons : Nullable;
122     import core.atomic : atomicOp, atomicLoad;
123     import concurrency.sender : connectHeap;
124 
125     static if (is(Sender == class) || is(Sender == interface))
126       if (sender is null)
127         return;
128 
129     if (busy.atomicLoad() == 0)
130       throw new Exception("This nursery is already stopped, it cannot accept more work.");
131 
132     size_t id = atomicOp!"+="(counter, 1);
133     auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id));
134 
135     mutex.lock_nothrow();
136 
137     operations ~= cast(shared) Node(op, id);
138     atomicOp!"+="(busy, 1);
139     bool hasStarted = this.receiver !is null;
140     mutex.unlock_nothrow();
141 
142     if (hasStarted)
143       op.start();
144   }
145 
146   auto connect(Receiver)(return Receiver receiver) @trusted scope {
147     return (cast(shared)this).connect(receiver);
148   }
149 
150   auto connect(Receiver)(Receiver receiver) shared scope @safe {
151     final class ReceiverImpl : ReceiverObject {
152       Receiver receiver;
153       SchedulerObjectBase scheduler;
154       this(Receiver receiver) { this.receiver = receiver; }
155       void setValue() @safe { receiver.setValue(); }
156       void setDone() nothrow @safe { receiver.setDone(); }
157       void setError(Throwable e) nothrow @safe { receiver.setError(e); }
158       SchedulerObjectBase getScheduler() nothrow @safe {
159         import concurrency.scheduler : toSchedulerObject;
160         if (scheduler is null)
161           scheduler = receiver.getScheduler().toSchedulerObject();
162         return scheduler;
163       }
164     }
165     auto stopToken = receiver.getStopToken();
166     auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))();
167     return NurseryOp(this, cb, new ReceiverImpl(receiver));
168   }
169 
170   private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
171     with(assumeThreadSafe) {
172       mutex.lock_nothrow();
173       assert(this.receiver is null, "Cannot await a nursery twice.");
174       receiver = r;
175       stopCallback = cb;
176       auto ops = operations.dup();
177       mutex.unlock_nothrow();
178 
179       // start all work
180       foreach(op; ops)
181         op.start();
182     }
183   }
184 }
185 
186 private interface ReceiverObject {
187   void setValue() @safe;
188   void setDone() nothrow @safe;
189   void setError(Throwable e) nothrow @safe;
190   SchedulerObjectBase getScheduler() nothrow @safe;
191 }
192 
193 private struct NurseryReceiver(Value) {
194   shared Nursery nursery;
195   size_t id;
196   this(shared Nursery nursery, size_t id) {
197     this.nursery = nursery;
198     this.id = id;
199   }
200 
201   static if (is(Value == void)) {
202     void setValue() shared @safe {
203       (cast() this).setDone();
204     }
205     void setValue() @safe {
206       (cast() this).setDone();
207     }
208   } else {
209     void setValue(Value val) shared @trusted {
210       (cast() this).setDone();
211     }
212     void setValue(Value val) @safe {
213       nursery.done(id);
214     }
215   }
216 
217   void setDone() nothrow @safe {
218     nursery.done(id);
219   }
220 
221   void setError(Throwable e) nothrow @safe {
222     nursery.setError(e, id);
223   }
224 
225   auto getStopToken() @safe {
226     return nursery.getStopToken();
227   }
228 
229   auto getScheduler() @safe {
230     return nursery.getScheduler();
231   }
232 }
233 
234 private struct NurseryOp {
235   shared Nursery nursery;
236   StopCallback cb;
237   ReceiverObject receiver;
238   @disable this(ref return scope typeof(this) rhs);
239   @disable this(this);
240   this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe scope return {
241     nursery = n;
242     this.cb = cb;
243     receiver = r;
244   }
245   void start() nothrow scope @trusted {
246     import core.atomic : atomicLoad;
247     if (nursery.busy.atomicLoad == 0)
248       receiver.setDone();
249     else
250       nursery.setReceiver(receiver, cb);
251   }
252 }