1 module concurrency.nursery; 2 3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; 4 import concurrency.thread : LocalThreadExecutor; 5 import concurrency.receiver : getStopToken; 6 import concurrency.scheduler : SchedulerObjectBase; 7 import std.typecons : Nullable; 8 9 /// A Nursery is a place for senders to be ran in, while being a Sender itself. 10 /// Stopping the Nursery cancels all senders. 11 /// When any Sender completes with an Error all Senders are canceled as well. 12 /// Cancellation is signaled with a StopToken. 13 /// Senders themselves bare the responsibility to respond to stop requests. 14 /// When cancellation happens all Senders are waited on for completion. 15 /// Senders can be added to the Nursery at any time. 16 /// Senders are only started when the Nursery itself is being awaited on. 17 class Nursery : StopSource { 18 import concurrency.sender : isSender, OperationalStateBase; 19 import core.sync.mutex : Mutex; 20 import concepts; 21 static assert (models!(typeof(this), isSender)); 22 23 alias Value = void; 24 private { 25 Node[] operations; 26 struct Node { 27 OperationalStateBase state; 28 void start() @safe nothrow { 29 state.start(); 30 } 31 size_t id; 32 } 33 Mutex mutex; 34 shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks 35 shared size_t counter = 0; 36 Throwable throwable; // first throwable from sender, if any 37 ReceiverObject receiver; 38 StopCallback stopCallback; 39 Nursery assumeThreadSafe() @trusted shared nothrow { 40 return cast(Nursery)this; 41 } 42 } 43 44 this() @safe shared { 45 import concurrency.utils : resetScheduler; 46 resetScheduler(); 47 with(assumeThreadSafe) mutex = new Mutex(); 48 } 49 50 override bool stop() nothrow @trusted { 51 auto result = super.stop(); 52 53 if (result) 54 (cast(shared)this).done(-1); 55 56 return result; 57 } 58 59 override bool stop() nothrow @trusted shared { 60 return (cast(Nursery)this).stop(); 61 } 62 63 StopToken getStopToken() nothrow @trusted shared { 64 return StopToken(cast(Nursery)this); 65 } 66 67 private auto getScheduler() nothrow @trusted shared { 68 return (cast()receiver).getScheduler(); 69 } 70 71 private void setError(Throwable e, size_t id) nothrow @safe shared { 72 import core.atomic : cas; 73 with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already 74 done(id); 75 stop(); 76 } 77 78 private void done(size_t id) nothrow @trusted shared { 79 import std.algorithm : countUntil, remove; 80 import core.atomic : atomicOp; 81 82 with (assumeThreadSafe) { 83 mutex.lock_nothrow(); 84 auto idx = operations.countUntil!(o => o.id == id); 85 if (idx != -1) 86 operations = operations.remove(idx); 87 bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0; 88 auto localReceiver = receiver; 89 auto localThrowable = throwable; 90 if (isDone) { 91 throwable = null; 92 receiver = null; 93 if (stopCallback) 94 stopCallback.dispose(); 95 stopCallback = null; 96 } 97 mutex.unlock_nothrow(); 98 99 if (isDone && localReceiver !is null) { 100 if (localThrowable !is null) { 101 localReceiver.setError(localThrowable); 102 } else if (isStopRequested()) { 103 localReceiver.setDone(); 104 } else { 105 try { 106 localReceiver.setValue(); 107 } catch (Exception e) { 108 localReceiver.setError(e); 109 } 110 } 111 } 112 } 113 } 114 115 void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) { 116 if (!sender.isNull) 117 run(sender.get()); 118 } 119 120 void run(Sender)(Sender sender) shared @trusted { 121 import concepts; 122 static assert (models!(Sender, isSender)); 123 import std.typecons : Nullable; 124 import core.atomic : atomicOp, atomicLoad; 125 import concurrency.sender : connectHeap; 126 127 static if (is(Sender == class) || is(Sender == interface)) 128 if (sender is null) 129 return; 130 131 if (busy.atomicLoad() == 0) 132 throw new Exception("This nursery is already stopped, it cannot accept more work."); 133 134 size_t id = atomicOp!"+="(counter, 1); 135 auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id)); 136 137 mutex.lock_nothrow(); 138 139 operations ~= cast(shared) Node(op, id); 140 atomicOp!"+="(busy, 1); 141 bool hasStarted = this.receiver !is null; 142 mutex.unlock_nothrow(); 143 144 if (hasStarted) 145 op.start(); 146 } 147 148 auto connect(Receiver)(return Receiver receiver) @safe return scope { 149 return asShared.connect(receiver); 150 } 151 152 private shared(Nursery) asShared() @trusted return scope { 153 return cast(shared)this; 154 } 155 156 auto connect(Receiver)(return Receiver receiver) shared @trusted return scope { 157 final class ReceiverImpl : ReceiverObject { 158 Receiver receiver; 159 SchedulerObjectBase scheduler; 160 this(return Receiver receiver) @trusted { this.receiver = receiver; } 161 void setValue() @safe { receiver.setValue(); } 162 void setDone() nothrow @safe { receiver.setDone(); } 163 void setError(Throwable e) nothrow @safe { receiver.setError(e); } 164 SchedulerObjectBase getScheduler() nothrow @safe { 165 import concurrency.scheduler : toSchedulerObject; 166 if (scheduler is null) 167 scheduler = receiver.getScheduler().toSchedulerObject(); 168 return scheduler; 169 } 170 } 171 auto stopToken = receiver.getStopToken(); 172 auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))(); 173 return NurseryOp(this, cb, new ReceiverImpl(receiver)); 174 } 175 176 private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { 177 with(assumeThreadSafe) { 178 mutex.lock_nothrow(); 179 assert(this.receiver is null, "Cannot await a nursery twice."); 180 receiver = r; 181 stopCallback = cb; 182 auto ops = operations.dup(); 183 mutex.unlock_nothrow(); 184 185 // start all work 186 foreach(op; ops) 187 op.start(); 188 } 189 } 190 } 191 192 private interface ReceiverObject { 193 void setValue() @safe; 194 void setDone() nothrow @safe; 195 void setError(Throwable e) nothrow @safe; 196 SchedulerObjectBase getScheduler() nothrow @safe; 197 } 198 199 private struct NurseryReceiver(Value) { 200 shared Nursery nursery; 201 size_t id; 202 this(shared Nursery nursery, size_t id) { 203 this.nursery = nursery; 204 this.id = id; 205 } 206 207 static if (is(Value == void)) { 208 void setValue() shared @safe { 209 (cast() this).setDone(); 210 } 211 void setValue() @safe { 212 (cast() this).setDone(); 213 } 214 } else { 215 void setValue(Value val) shared @trusted { 216 (cast() this).setDone(); 217 } 218 void setValue(Value val) @safe { 219 nursery.done(id); 220 } 221 } 222 223 void setDone() nothrow @safe { 224 nursery.done(id); 225 } 226 227 void setError(Throwable e) nothrow @safe { 228 nursery.setError(e, id); 229 } 230 231 auto getStopToken() @safe { 232 return nursery.getStopToken(); 233 } 234 235 auto getScheduler() @safe { 236 return nursery.getScheduler(); 237 } 238 } 239 240 private struct NurseryOp { 241 shared Nursery nursery; 242 StopCallback cb; 243 ReceiverObject receiver; 244 @disable this(ref return scope typeof(this) rhs); 245 @disable this(this); 246 this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe return scope { 247 nursery = n; 248 this.cb = cb; 249 receiver = r; 250 } 251 void start() nothrow scope @trusted { 252 import core.atomic : atomicLoad; 253 if (nursery.busy.atomicLoad == 0) 254 receiver.setDone(); 255 else 256 nursery.setReceiver(receiver, cb); 257 } 258 }