1 module concurrency.nursery; 2 3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; 4 import concurrency.thread : LocalThreadExecutor; 5 import concurrency.receiver : getStopToken; 6 import concurrency.scheduler : SchedulerObjectBase; 7 import std.typecons : Nullable; 8 9 /// A Nursery is a place for senders to be ran in, while being a Sender itself. 10 /// Stopping the Nursery cancels all senders. 11 /// When any Sender completes with an Error all Senders are canceled as well. 12 /// Cancellation is signaled with a StopToken. 13 /// Senders themselves bare the responsibility to respond to stop requests. 14 /// When cancellation happens all Senders are waited on for completion. 15 /// Senders can be added to the Nursery at any time. 16 /// Senders are only started when the Nursery itself is being awaited on. 17 class Nursery : StopSource { 18 import concurrency.sender : isSender, OperationalStateBase; 19 import core.sync.mutex : Mutex; 20 21 alias Value = void; 22 private { 23 Node[] operations; 24 struct Node { 25 OperationalStateBase state; 26 void start() @safe nothrow { 27 state.start(); 28 } 29 size_t id; 30 } 31 Mutex mutex; 32 shared size_t busy = 0; 33 shared size_t counter = 0; 34 Exception exception; // first exception from sender, if any 35 ReceiverObject receiver; 36 StopCallback stopCallback; 37 Nursery assumeThreadSafe() @trusted shared nothrow { 38 return cast(Nursery)this; 39 } 40 } 41 42 this() @safe shared { 43 import concurrency.utils : resetScheduler; 44 resetScheduler(); 45 with(assumeThreadSafe) mutex = new Mutex(); 46 } 47 48 StopToken getStopToken() nothrow @trusted shared { 49 return StopToken(cast(Nursery)this); 50 } 51 52 private auto getScheduler() nothrow @trusted shared { 53 return (cast()receiver).getScheduler(); 54 } 55 56 private void setError(Exception e, size_t id) nothrow @safe shared { 57 import core.atomic : cas; 58 with(assumeThreadSafe) cas(&exception, cast(Exception)null, e); // store exception if not already 59 done(id); 60 stop(); 61 } 62 63 private void done(size_t id) nothrow @trusted shared { 64 import std.algorithm : countUntil, remove; 65 import core.atomic : atomicOp; 66 67 with (assumeThreadSafe) { 68 mutex.lock_nothrow(); 69 auto idx = operations.countUntil!(o => o.id == id); 70 if (idx != -1) 71 operations = operations.remove(idx); 72 bool isDone = atomicOp!"-="(busy,1) == 0; 73 auto localReceiver = receiver; 74 auto localException = exception; 75 if (isDone) { 76 exception = null; 77 receiver = null; 78 stopCallback.dispose(); 79 stopCallback = null; 80 } 81 mutex.unlock_nothrow(); 82 83 if (isDone && localReceiver !is null) { 84 if (localException !is null) { 85 localReceiver.setError(localException); 86 } else if (isStopRequested()) { 87 localReceiver.setDone(); 88 } else { 89 try { 90 localReceiver.setValue(); 91 } catch (Exception e) { 92 localReceiver.setError(e); 93 } 94 } 95 } 96 } 97 } 98 99 void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) { 100 if (!sender.isNull) 101 run(sender.get()); 102 } 103 104 void run(Sender)(Sender sender) shared @trusted { 105 import concepts; 106 static assert (models!(Sender, isSender)); 107 import std.typecons : Nullable; 108 import core.atomic : atomicOp; 109 import concurrency.sender : connectHeap; 110 111 static if (is(Sender == class) || is(Sender == interface)) 112 if (sender is null) 113 return; 114 115 size_t id = atomicOp!"+="(counter, 1); 116 auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id)); 117 118 mutex.lock_nothrow(); 119 operations ~= cast(shared) Node(op, id); 120 atomicOp!"+="(busy, 1); 121 bool hasStarted = this.receiver !is null; 122 mutex.unlock_nothrow(); 123 124 if (hasStarted) 125 op.start(); 126 } 127 128 auto connect(Receiver)(return Receiver receiver) @trusted scope { 129 return (cast(shared)this).connect(receiver); 130 } 131 132 auto connect(Receiver)(Receiver receiver) shared scope @safe { 133 final class ReceiverImpl : ReceiverObject { 134 Receiver receiver; 135 this(Receiver receiver) { this.receiver = receiver; } 136 void setValue() @safe { receiver.setValue(); } 137 void setDone() nothrow @safe { receiver.setDone(); } 138 void setError(Exception e) nothrow @safe { receiver.setError(e); } 139 SchedulerObjectBase getScheduler() nothrow @safe { 140 import concurrency.scheduler : toSchedulerObject; 141 return receiver.getScheduler().toSchedulerObject(); 142 } 143 } 144 static struct Op { 145 shared Nursery nursery; 146 StopCallback cb; 147 ReceiverObject receiver; 148 @disable this(ref return scope typeof(this) rhs); 149 @disable this(this); 150 this(shared Nursery n, StopCallback cb, ReceiverObject r) { 151 nursery = n; 152 this.cb = cb; 153 receiver = r; 154 } 155 void start() nothrow scope @trusted { 156 nursery.setReceiver(receiver, cb); 157 } 158 } 159 auto stopToken = receiver.getStopToken(); 160 auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))(); 161 return Op(this, cb, new ReceiverImpl(receiver)); 162 } 163 164 private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { 165 with(assumeThreadSafe) { 166 mutex.lock_nothrow(); 167 assert(this.receiver is null, "Cannot await a nursery twice."); 168 receiver = r; 169 stopCallback = cb; 170 auto ops = operations.dup(); 171 mutex.unlock_nothrow(); 172 173 // start all work 174 foreach(op; ops) 175 op.start(); 176 } 177 } 178 } 179 180 private interface ReceiverObject { 181 void setValue() @safe; 182 void setDone() nothrow @safe; 183 void setError(Exception e) nothrow @safe; 184 SchedulerObjectBase getScheduler() nothrow @safe; 185 } 186 187 private struct NurseryReceiver(Value) { 188 shared Nursery nursery; 189 size_t id; 190 this(shared Nursery nursery, size_t id) { 191 this.nursery = nursery; 192 this.id = id; 193 } 194 195 static if (is(Value == void)) { 196 void setValue() shared @safe { 197 (cast() this).setDone(); 198 } 199 void setValue() @safe { 200 (cast() this).setDone(); 201 } 202 } else { 203 void setValue(Value val) shared @trusted { 204 (cast() this).setDone(); 205 } 206 void setValue(Value val) @safe { 207 nursery.done(id); 208 } 209 } 210 211 void setDone() nothrow @safe { 212 nursery.done(id); 213 } 214 215 void setError(Exception e) nothrow @safe { 216 nursery.setError(e, id); 217 } 218 219 auto getStopToken() @safe { 220 return nursery.getStopToken(); 221 } 222 223 auto getScheduler() @safe { 224 return nursery.getScheduler(); 225 } 226 }