1 module concurrency.nursery; 2 3 import concurrency.stoptoken : StopSource, StopToken, StopCallback, onStop; 4 import concurrency.thread : LocalThreadExecutor; 5 import concurrency.receiver : getStopToken; 6 import concurrency.scheduler : SchedulerObjectBase; 7 import std.typecons : Nullable; 8 9 /// A Nursery is a place for senders to be ran in, while being a Sender itself. 10 /// Stopping the Nursery cancels all senders. 11 /// When any Sender completes with an Error all Senders are canceled as well. 12 /// Cancellation is signaled with a StopToken. 13 /// Senders themselves bare the responsibility to respond to stop requests. 14 /// When cancellation happens all Senders are waited on for completion. 15 /// Senders can be added to the Nursery at any time. 16 /// Senders are only started when the Nursery itself is being awaited on. 17 class Nursery : StopSource { 18 import concurrency.sender : isSender, OperationalStateBase; 19 import core.sync.mutex : Mutex; 20 21 alias Value = void; 22 private { 23 Node[] operations; 24 struct Node { 25 OperationalStateBase state; 26 void start() @safe nothrow { 27 state.start(); 28 } 29 size_t id; 30 } 31 Mutex mutex; 32 shared size_t busy = 1; // we start at 1 to denote the Nursery is open for tasks 33 shared size_t counter = 0; 34 Throwable throwable; // first throwable from sender, if any 35 ReceiverObject receiver; 36 StopCallback stopCallback; 37 Nursery assumeThreadSafe() @trusted shared nothrow { 38 return cast(Nursery)this; 39 } 40 } 41 42 this() @safe shared { 43 import concurrency.utils : resetScheduler; 44 resetScheduler(); 45 with(assumeThreadSafe) mutex = new Mutex(); 46 } 47 48 override bool stop() nothrow @trusted { 49 auto result = super.stop(); 50 51 if (result) 52 (cast(shared)this).done(-1); 53 54 return result; 55 } 56 57 override bool stop() nothrow @trusted shared { 58 return (cast(Nursery)this).stop(); 59 } 60 61 StopToken getStopToken() nothrow @trusted shared { 62 return StopToken(cast(Nursery)this); 63 } 64 65 private auto getScheduler() nothrow @trusted shared { 66 return (cast()receiver).getScheduler(); 67 } 68 69 private void setError(Throwable e, size_t id) nothrow @safe shared { 70 import core.atomic : cas; 71 with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already 72 done(id); 73 stop(); 74 } 75 76 private void done(size_t id) nothrow @trusted shared { 77 import std.algorithm : countUntil, remove; 78 import core.atomic : atomicOp; 79 80 with (assumeThreadSafe) { 81 mutex.lock_nothrow(); 82 auto idx = operations.countUntil!(o => o.id == id); 83 if (idx != -1) 84 operations = operations.remove(idx); 85 bool isDone = atomicOp!"-="(busy,1) == 0 || operations.length == 0; 86 auto localReceiver = receiver; 87 auto localThrowable = throwable; 88 if (isDone) { 89 throwable = null; 90 receiver = null; 91 if (stopCallback) 92 stopCallback.dispose(); 93 stopCallback = null; 94 } 95 mutex.unlock_nothrow(); 96 97 if (isDone && localReceiver !is null) { 98 if (localThrowable !is null) { 99 localReceiver.setError(localThrowable); 100 } else if (isStopRequested()) { 101 localReceiver.setDone(); 102 } else { 103 try { 104 localReceiver.setValue(); 105 } catch (Exception e) { 106 localReceiver.setError(e); 107 } 108 } 109 } 110 } 111 } 112 113 void run(Sender)(Nullable!Sender sender) shared if (isSender!Sender) { 114 if (!sender.isNull) 115 run(sender.get()); 116 } 117 118 void run(Sender)(Sender sender) shared @trusted { 119 import concepts; 120 static assert (models!(Sender, isSender)); 121 import std.typecons : Nullable; 122 import core.atomic : atomicOp, atomicLoad; 123 import concurrency.sender : connectHeap; 124 125 static if (is(Sender == class) || is(Sender == interface)) 126 if (sender is null) 127 return; 128 129 if (busy.atomicLoad() == 0) 130 throw new Exception("This nursery is already stopped, it cannot accept more work."); 131 132 size_t id = atomicOp!"+="(counter, 1); 133 auto op = sender.connectHeap(NurseryReceiver!(Sender.Value)(this, id)); 134 135 mutex.lock_nothrow(); 136 137 operations ~= cast(shared) Node(op, id); 138 atomicOp!"+="(busy, 1); 139 bool hasStarted = this.receiver !is null; 140 mutex.unlock_nothrow(); 141 142 if (hasStarted) 143 op.start(); 144 } 145 146 auto connect(Receiver)(return Receiver receiver) @trusted scope { 147 return (cast(shared)this).connect(receiver); 148 } 149 150 auto connect(Receiver)(Receiver receiver) shared scope @safe { 151 final class ReceiverImpl : ReceiverObject { 152 Receiver receiver; 153 SchedulerObjectBase scheduler; 154 this(Receiver receiver) { this.receiver = receiver; } 155 void setValue() @safe { receiver.setValue(); } 156 void setDone() nothrow @safe { receiver.setDone(); } 157 void setError(Throwable e) nothrow @safe { receiver.setError(e); } 158 SchedulerObjectBase getScheduler() nothrow @safe { 159 import concurrency.scheduler : toSchedulerObject; 160 if (scheduler is null) 161 scheduler = receiver.getScheduler().toSchedulerObject(); 162 return scheduler; 163 } 164 } 165 auto stopToken = receiver.getStopToken(); 166 auto cb = (()@trusted => stopToken.onStop(() shared nothrow @trusted => cast(void)this.stop()))(); 167 return NurseryOp(this, cb, new ReceiverImpl(receiver)); 168 } 169 170 private void setReceiver(ReceiverObject r, StopCallback cb) nothrow @safe shared { 171 with(assumeThreadSafe) { 172 mutex.lock_nothrow(); 173 assert(this.receiver is null, "Cannot await a nursery twice."); 174 receiver = r; 175 stopCallback = cb; 176 auto ops = operations.dup(); 177 mutex.unlock_nothrow(); 178 179 // start all work 180 foreach(op; ops) 181 op.start(); 182 } 183 } 184 } 185 186 private interface ReceiverObject { 187 void setValue() @safe; 188 void setDone() nothrow @safe; 189 void setError(Throwable e) nothrow @safe; 190 SchedulerObjectBase getScheduler() nothrow @safe; 191 } 192 193 private struct NurseryReceiver(Value) { 194 shared Nursery nursery; 195 size_t id; 196 this(shared Nursery nursery, size_t id) { 197 this.nursery = nursery; 198 this.id = id; 199 } 200 201 static if (is(Value == void)) { 202 void setValue() shared @safe { 203 (cast() this).setDone(); 204 } 205 void setValue() @safe { 206 (cast() this).setDone(); 207 } 208 } else { 209 void setValue(Value val) shared @trusted { 210 (cast() this).setDone(); 211 } 212 void setValue(Value val) @safe { 213 nursery.done(id); 214 } 215 } 216 217 void setDone() nothrow @safe { 218 nursery.done(id); 219 } 220 221 void setError(Throwable e) nothrow @safe { 222 nursery.setError(e, id); 223 } 224 225 auto getStopToken() @safe { 226 return nursery.getStopToken(); 227 } 228 229 auto getScheduler() @safe { 230 return nursery.getScheduler(); 231 } 232 } 233 234 private struct NurseryOp { 235 shared Nursery nursery; 236 StopCallback cb; 237 ReceiverObject receiver; 238 @disable this(ref return scope typeof(this) rhs); 239 @disable this(this); 240 this(return shared Nursery n, StopCallback cb, ReceiverObject r) @safe scope return { 241 nursery = n; 242 this.cb = cb; 243 receiver = r; 244 } 245 void start() nothrow scope @trusted { 246 import core.atomic : atomicLoad; 247 if (nursery.busy.atomicLoad == 0) 248 receiver.setDone(); 249 else 250 nursery.setReceiver(receiver, cb); 251 } 252 }