1 module concurrency.nursery; 2 3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; 4 import concurrency.thread : LocalThreadExecutor; 5 import concurrency.receiver : getStopToken; 6 import concurrency.scheduler : SchedulerObjectBase; 7 import std.typecons : Nullable; 8 9 /// A Nursery is a place for senders to be ran in, while being a Sender itself. 10 /// Stopping the Nursery cancels all senders. 11 /// When any Sender completes with an Error all Senders are canceled as well. 12 /// Cancellation is signaled with a StopToken. 13 /// Senders themselves bare the responsibility to respond to stop requests. 14 /// When cancellation happens all Senders are waited on for completion. 15 /// Senders can be added to the Nursery at any time. 16 /// Senders are only started when the Nursery itself is being awaited on. 17 class Nursery : StopSource { 18 import concurrency.sender : isSender, OperationalStateBase; 19 import core.sync.mutex : Mutex; 20 21 alias Value = void; 22 private { 23 Node[] operations; 24 struct Node { 25 OperationalStateBase state; 26 void start() @safe nothrow { 27 state.start(); 28 } 29 size_t id; 30 } 31 Mutex mutex; 32 shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks 33 shared size_t counter = 0; 34 Throwable throwable; // first throwable from sender, if any 35 ReceiverObject receiver; 36 StopCallback stopCallback; 37 Nursery assumeThreadSafe() @trusted shared nothrow { 38 return cast(Nursery)this; 39 } 40 } 41 42 this() @safe shared { 43 import concurrency.utils : resetScheduler; 44 resetScheduler(); 45 with(assumeThreadSafe) mutex = new Mutex(); 46 } 47 48 override bool stop() nothrow @trusted { 49 auto result = super.stop(); 50 51 if (result) 52 (cast(shared)this).done(-1); 53 54 return result; 55 } 56 57 override bool stop() nothrow @trusted shared { 58 return (cast(Nursery)this).stop(); 59 } 60 61 StopToken getStopToken() nothrow @trusted shared { 62 return StopToken(cast(Nursery)this); 63 } 64 65 private auto getScheduler() nothrow @trusted shared { 66 return (cast()receiver).getScheduler(); 67 } 68 69 private void setError(Throwable e, size_t id) nothrow @safe shared { 70 import core.atomic : cas; 71 with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already 72 done(id); 73 stop(); 74 } 75 76 private void done(size_t id) nothrow @trusted shared { 77 import std.algorithm : countUntil, remove; 78 import core.atomic : atomicOp; 79 80 with (assumeThreadSafe) { 81 mutex.lock_nothrow(); 82 auto idx = operations.countUntil!(o => o.id == id); 83 if (idx != -1) 84 operations = operations.remove(idx); 85 bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0; 86 auto localReceiver = receiver; 87 auto localThrowable = throwable; 88 if (isDone) { 89 throwable = null; 90 receiver = null; 91 if (stopCallback) 92 stopCallback.dispose(); 93 stopCallback = null; 94 } 95 mutex.unlock_nothrow(); 96 97 if (isDone && localReceiver !is null) { 98 if (localThrowable !is null) { 99 localReceiver.setError(localThrowable); 100 } else if (isStopRequested()) { 101 localReceiver.setDone(); 102 } else { 103 try { 104 localReceiver.setValue(); 105 } catch (Exception e) { 106 localReceiver.setError(e); 107 } 108 } 109 } 110 } 111 } 112 113 void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) { 114 if (!sender.isNull) 115 run(sender.get()); 116 } 117 118 void run(Sender)(Sender sender) shared @trusted { 119 import concepts; 120 static assert (models!(Sender, isSender)); 121 import std.typecons : Nullable; 122 import core.atomic : atomicOp, atomicLoad; 123 import concurrency.sender : connectHeap; 124 125 static if (is(Sender == class) || is(Sender == interface)) 126 if (sender is null) 127 return; 128 129 if (busy.atomicLoad() == 0) 130 throw new Exception("This nursery is already stopped, it cannot accept more work."); 131 132 size_t id = atomicOp!"+="(counter, 1); 133 auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id)); 134 135 mutex.lock_nothrow(); 136 137 operations ~= cast(shared) Node(op, id); 138 atomicOp!"+="(busy, 1); 139 bool hasStarted = this.receiver !is null; 140 mutex.unlock_nothrow(); 141 142 if (hasStarted) 143 op.start(); 144 } 145 146 auto connect(Receiver)(return Receiver receiver) @trusted scope { 147 return (cast(shared)this).connect(receiver); 148 } 149 150 auto connect(Receiver)(Receiver receiver) shared scope @safe { 151 final class ReceiverImpl : ReceiverObject { 152 Receiver receiver; 153 this(Receiver receiver) { this.receiver = receiver; } 154 void setValue() @safe { receiver.setValue(); } 155 void setDone() nothrow @safe { receiver.setDone(); } 156 void setError(Throwable e) nothrow @safe { receiver.setError(e); } 157 SchedulerObjectBase getScheduler() nothrow @safe { 158 import concurrency.scheduler : toSchedulerObject; 159 return receiver.getScheduler().toSchedulerObject(); 160 } 161 } 162 auto stopToken = receiver.getStopToken(); 163 auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))(); 164 return NurseryOp(this, cb, new ReceiverImpl(receiver)); 165 } 166 167 private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { 168 with(assumeThreadSafe) { 169 mutex.lock_nothrow(); 170 assert(this.receiver is null, "Cannot await a nursery twice."); 171 receiver = r; 172 stopCallback = cb; 173 auto ops = operations.dup(); 174 mutex.unlock_nothrow(); 175 176 // start all work 177 foreach(op; ops) 178 op.start(); 179 } 180 } 181 } 182 183 private interface ReceiverObject { 184 void setValue() @safe; 185 void setDone() nothrow @safe; 186 void setError(Throwable e) nothrow @safe; 187 SchedulerObjectBase getScheduler() nothrow @safe; 188 } 189 190 private struct NurseryReceiver(Value) { 191 shared Nursery nursery; 192 size_t id; 193 this(shared Nursery nursery, size_t id) { 194 this.nursery = nursery; 195 this.id = id; 196 } 197 198 static if (is(Value == void)) { 199 void setValue() shared @safe { 200 (cast() this).setDone(); 201 } 202 void setValue() @safe { 203 (cast() this).setDone(); 204 } 205 } else { 206 void setValue(Value val) shared @trusted { 207 (cast() this).setDone(); 208 } 209 void setValue(Value val) @safe { 210 nursery.done(id); 211 } 212 } 213 214 void setDone() nothrow @safe { 215 nursery.done(id); 216 } 217 218 void setError(Throwable e) nothrow @safe { 219 nursery.setError(e, id); 220 } 221 222 auto getStopToken() @safe { 223 return nursery.getStopToken(); 224 } 225 226 auto getScheduler() @safe { 227 return nursery.getScheduler(); 228 } 229 } 230 231 private struct NurseryOp { 232 shared Nursery nursery; 233 StopCallback cb; 234 ReceiverObject receiver; 235 @disable this(ref return scope typeof(this) rhs); 236 @disable this(this); 237 this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe scope return { 238 nursery = n; 239 this.cb = cb; 240 receiver = r; 241 } 242 void start() nothrow scope @trusted { 243 import core.atomic : atomicLoad; 244 if (nursery.busy.atomicLoad == 0) 245 receiver.setDone(); 246 else 247 nursery.setReceiver(receiver, cb); 248 } 249 }