1 module concurrency.nursery;
2 
3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop;
4 import concurrency.thread : LocalThreadExecutor;
5 import concurrency.receiver : getStopToken;
6 import concurrency.scheduler : SchedulerObjectBase;
7 import std.typecons : Nullable;
8 
9 /// A Nursery is a place for senders to be ran in, while being a Sender itself.
10 /// Stopping the Nursery cancels all senders.
11 /// When any Sender completes with an Error all Senders are canceled as well.
12 /// Cancellation is signaled with a StopToken.
13 /// Senders themselves bare the responsibility to respond to stop requests.
14 /// When cancellation happens all Senders are waited on for completion.
15 /// Senders can be added to the Nursery at any time.
16 /// Senders are only started when the Nursery itself is being awaited on.
17 class Nursery : StopSource {
18   import concurrency.sender : isSender, OperationalStateBase;
19   import core.sync.mutex : Mutex;
20 
21   alias Value = void;
22   private {
23     Node[] operations;
24     struct Node {
25       OperationalStateBase state;
26       void start() @safe nothrow {
27         state.start();
28       }
29       size_t id;
30     }
31     Mutex mutex;
32     shared size_t busy = 0;
33     shared size_t counter = 0;
34     Exception exception; // first exception from sender, if any
35     ReceiverObject receiver;
36     StopCallback stopCallback;
37     Nursery assumeThreadSafe() @trusted shared nothrow {
38       return cast(Nursery)this;
39     }
40   }
41 
42   this() @safe shared {
43     import concurrency.utils : resetScheduler;
44     resetScheduler();
45     with(assumeThreadSafe) mutex = new Mutex();
46   }
47 
48   StopToken getStopToken() nothrow @trusted shared {
49     return StopToken(cast(Nursery)this);
50   }
51 
52   private auto getScheduler() nothrow @trusted shared {
53     return (cast()receiver).getScheduler();
54   }
55 
56   private void setError(Exception e, size_t id) nothrow @safe shared {
57     import core.atomic : cas;
58     with(assumeThreadSafe) cas(&exception, cast(Exception)null, e); // store exception if not already
59     done(id);
60     stop();
61   }
62 
63   private void done(size_t id) nothrow @trusted shared {
64     import std.algorithm : countUntil, remove;
65     import core.atomic : atomicOp;
66 
67     with (assumeThreadSafe) {
68       mutex.lock_nothrow();
69       auto idx = operations.countUntil!(o => o.id == id);
70       if (idx != -1)
71         operations = operations.remove(idx);
72       bool isDone = atomicOp!"-="(busy,1) == 0;
73       auto localReceiver = receiver;
74       auto localException = exception;
75       if (isDone) {
76         exception = null;
77         receiver = null;
78         stopCallback.dispose();
79         stopCallback = null;
80       }
81       mutex.unlock_nothrow();
82 
83       if (isDone && localReceiver !is null) {
84         if (localException !is null) {
85           localReceiver.setError(localException);
86         } else if (isStopRequested()) {
87           localReceiver.setDone();
88         } else {
89           try {
90             localReceiver.setValue();
91           } catch (Exception e) {
92             localReceiver.setError(e);
93           }
94         }
95       }
96     }
97   }
98 
99   void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) {
100     if (!sender.isNull)
101       run(sender.get());
102   }
103 
104   void run(Sender)(Sender sender) shared @trusted {
105     import concepts;
106     static assert (models!(Sender, isSender));
107     import std.typecons : Nullable;
108     import core.atomic : atomicOp;
109     import concurrency.sender : connectHeap;
110 
111     static if (is(Sender == class) || is(Sender == interface))
112       if (sender is null)
113         return;
114 
115     size_t id = atomicOp!"+="(counter, 1);
116     auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id));
117 
118     mutex.lock_nothrow();
119     operations ~= cast(shared) Node(op, id);
120     atomicOp!"+="(busy, 1);
121     bool hasStarted = this.receiver !is null;
122     mutex.unlock_nothrow();
123 
124     if (hasStarted)
125       op.start();
126   }
127 
128   auto connect(Receiver)(return Receiver receiver) @trusted scope {
129     return (cast(shared)this).connect(receiver);
130   }
131 
132   auto connect(Receiver)(Receiver receiver) shared scope @safe {
133     final class ReceiverImpl : ReceiverObject {
134       Receiver receiver;
135       this(Receiver receiver) { this.receiver = receiver; }
136       void setValue() @safe { receiver.setValue(); }
137       void setDone() nothrow @safe { receiver.setDone(); }
138       void setError(Exception e) nothrow @safe { receiver.setError(e); }
139       SchedulerObjectBase getScheduler() nothrow @safe {
140         import concurrency.scheduler : toSchedulerObject;
141         return receiver.getScheduler().toSchedulerObject();
142       }
143     }
144     static struct Op {
145       shared Nursery nursery;
146       StopCallback cb;
147       ReceiverObject receiver;
148       @disable this(ref return scope typeof(this) rhs);
149       @disable this(this);
150       this(shared Nursery n, StopCallback cb, ReceiverObject r) {
151         nursery = n;
152         this.cb = cb;
153         receiver = r;
154       }
155       void start() nothrow scope @trusted {
156         nursery.setReceiver(receiver, cb);
157       }
158     }
159     auto stopToken = receiver.getStopToken();
160     auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))();
161     return Op(this, cb, new ReceiverImpl(receiver));
162   }
163 
164   private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared {
165     with(assumeThreadSafe) {
166       mutex.lock_nothrow();
167       assert(this.receiver is null, "Cannot await a nursery twice.");
168       receiver = r;
169       stopCallback = cb;
170       auto ops = operations.dup();
171       mutex.unlock_nothrow();
172 
173       // start all work
174       foreach(op; ops)
175         op.start();
176     }
177   }
178 }
179 
180 private interface ReceiverObject {
181   void setValue() @safe;
182   void setDone() nothrow @safe;
183   void setError(Exception e) nothrow @safe;
184   SchedulerObjectBase getScheduler() nothrow @safe;
185 }
186 
187 private struct NurseryReceiver(Value) {
188   shared Nursery nursery;
189   size_t id;
190   this(shared Nursery nursery, size_t id) {
191     this.nursery = nursery;
192     this.id = id;
193   }
194 
195   static if (is(Value == void)) {
196     void setValue() shared @safe {
197       (cast() this).setDone();
198     }
199     void setValue() @safe {
200       (cast() this).setDone();
201     }
202   } else {
203     void setValue(Value val) shared @trusted {
204       (cast() this).setDone();
205     }
206     void setValue(Value val) @safe {
207       nursery.done(id);
208     }
209   }
210 
211   void setDone() nothrow @safe {
212     nursery.done(id);
213   }
214 
215   void setError(Exception e) nothrow @safe {
216     nursery.setError(e, id);
217   }
218 
219   auto getStopToken() @safe {
220     return nursery.getStopToken();
221   }
222 
223   auto getScheduler() @safe {
224     return nursery.getScheduler();
225   }
226 }