1 module concurrency.sender; 2 3 import concepts; 4 import std.traits : ReturnType, isCallable; 5 import core.time : Duration; 6 7 // A Sender represents something that completes with either: 8 // 1. a value (which can be void) 9 // 2. completion, in response to cancellation 10 // 3. an Exception 11 // 12 // Many things can be represented as a Sender. 13 // Threads, Fibers, coroutines, etc. In general, any async operation. 14 // 15 // A Sender is lazy. Work it represents is only started when 16 // the sender is connected to a receiver and explicitly started. 17 // 18 // Senders and Receivers go hand in hand. Senders send a value, 19 // Receivers receive one. 20 // 21 // Senders are useful because many Tasks can be represented as them, 22 // and any operation on top of senders then works on any one of those 23 // Tasks. 24 // 25 // The most common operation is `sync_wait`. It blocks the current 26 // execution context to await the Sender. 27 // 28 // There are many others as well. Like `when_all`, `retry`, `when_any`, 29 // etc. These algorithms can be used on any sender. 30 // 31 // Cancellation happens through StopTokens. A Sender can ask a Receiver 32 // for a StopToken. Default is a NeverStopToken but Receiver's can 33 // customize this. 34 // 35 // The StopToken can be polled or a callback can be registered with one. 36 // 37 // Senders enforce Structured Concurrency because work cannot be 38 // started unless it is awaited. 39 // 40 // These concepts are heavily inspired by several C++ proposals 41 // starting with http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html 42 43 /// checks that T is a Sender 44 void checkSender(T)() @safe { 45 import concurrency.scheduler : SchedulerObjectBase; 46 T t = T.init; 47 struct Receiver { 48 static if (is(T.Value == void)) 49 void setValue() {} 50 else 51 void setValue(T.Value) {} 52 void setDone() nothrow {} 53 void setError(Exception e) nothrow {} 54 SchedulerObjectBase getScheduler() nothrow { return null; } 55 } 56 OpType!(T, Receiver) op = t.connect(Receiver.init); 57 static if (!isValidOp!(T, Receiver)) 58 pragma(msg, "Warning: ", T, "'s operation state is not returned via the stack"); 59 } 60 enum isSender(T) = is(typeof(checkSender!T)); 61 62 /// It is ok for the operation state to be on the heap, but if it is on the stack we need to ensure any copies are elided. We can't be 100% sure (the compiler may still blit), but this is the best we can do. 63 template isValidOp(Sender, Receiver) { 64 import std.traits : isPointer; 65 import std.meta : allSatisfy; 66 alias overloads = __traits(getOverloads, Sender, "connect", true); 67 template isRVO(alias connect) { 68 static if (__traits(isTemplate, connect)) 69 enum isRVO = __traits(isReturnOnStack, connect!Receiver); 70 else 71 enum isRVO = __traits(isReturnOnStack, connect); 72 } 73 alias Op = OpType!(Sender, Receiver); 74 enum isValidOp = isPointer!Op || is(Op == OperationObject) || is(Op == class) || (allSatisfy!(isRVO, overloads) && !__traits(isPOD, Op)); 75 } 76 77 /// A Sender that sends a single value of type T 78 struct ValueSender(T) { 79 static assert (models!(typeof(this), isSender)); 80 alias Value = T; 81 static struct Op(Receiver) { 82 Receiver receiver; 83 static if (!is(T == void)) 84 T value; 85 void start() @safe { 86 import concurrency.receiver : setValueOrError; 87 static if (!is(T == void)) 88 receiver.setValueOrError(value); 89 else 90 receiver.setValueOrError(); 91 } 92 } 93 static if (!is(T == void)) 94 T value; 95 Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return { 96 // ensure NRVO 97 static if (!is(T == void)) 98 auto op = Op!(Receiver)(receiver, value); 99 else 100 auto op = Op!(Receiver)(receiver); 101 return op; 102 } 103 } 104 105 ValueSender!T just(T)(T t) { 106 return ValueSender!T(t); 107 } 108 109 struct JustFromSender(Fun) { 110 static assert (models!(typeof(this), isSender)); 111 alias Value = ReturnType!fun; 112 static struct Op(Receiver) { 113 Receiver receiver; 114 Fun fun; 115 void start() @safe nothrow { 116 import std.traits : hasFunctionAttributes; 117 static if (hasFunctionAttributes!(Fun, "nothrow")) { 118 set(); 119 } else { 120 try { 121 set(); 122 } catch (Exception e) { 123 receiver.setError(e); 124 } 125 } 126 } 127 private void set() @safe { 128 import concurrency.receiver : setValueOrError; 129 static if (is(Value == void)) { 130 fun(); 131 receiver.setValueOrError(); 132 } else 133 receiver.setValueOrError(fun()); 134 } 135 } 136 Fun fun; 137 Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return { 138 // ensure NRVO 139 auto op = Op!(Receiver)(receiver, fun); 140 return op; 141 } 142 } 143 144 JustFromSender!(Fun) justFrom(Fun)(Fun fun) if (isCallable!Fun) { 145 import std.traits : hasFunctionAttributes, isFunction, isFunctionPointer; 146 static assert (isFunction!Fun || isFunctionPointer!Fun || hasFunctionAttributes!(Fun, "shared"), "Function must be shared"); 147 return JustFromSender!Fun(fun); 148 } 149 150 /// A polymorphic sender of type T 151 interface SenderObjectBase(T) { 152 import concurrency.receiver; 153 import concurrency.scheduler : SchedulerObjectBase; 154 import concurrency.stoptoken : StopTokenObject, stopTokenObject; 155 static assert (models!(typeof(this), isSender)); 156 alias Value = T; 157 alias Op = OperationObject; 158 OperationObject connect(ReceiverObjectBase!(T) receiver) @safe; 159 OperationObject connect(Receiver)(return Receiver receiver) @trusted scope { 160 return connect(new class(receiver) ReceiverObjectBase!T { 161 Receiver receiver; 162 this(Receiver receiver) { 163 this.receiver = receiver; 164 } 165 static if (is(T == void)) { 166 void setValue() { 167 receiver.setValueOrError(); 168 } 169 } else { 170 void setValue(T value) { 171 receiver.setValueOrError(value); 172 } 173 } 174 void setDone() nothrow { 175 receiver.setDone(); 176 } 177 void setError(Exception e) nothrow { 178 receiver.setError(e); 179 } 180 StopTokenObject getStopToken() nothrow { 181 return stopTokenObject(receiver.getStopToken()); 182 } 183 SchedulerObjectBase getScheduler() nothrow @safe { 184 import concurrency.scheduler : toSchedulerObject; 185 return receiver.getScheduler().toSchedulerObject; 186 } 187 }); 188 } 189 } 190 191 /// Type-erased operational state object 192 /// used in polymorphic senders 193 struct OperationObject { 194 private void delegate() nothrow shared _start; 195 void start() nothrow @trusted { _start(); } 196 } 197 198 interface OperationalStateBase { 199 void start() @safe nothrow; 200 } 201 202 /// calls connect on the Sender but stores the OperationState on the heap 203 OperationalStateBase connectHeap(Sender, Receiver)(Sender sender, Receiver receiver) { 204 alias State = typeof(sender.connect(receiver)); 205 return new class(sender, receiver) OperationalStateBase { 206 State state; 207 this(Sender sender, Receiver receiver) { 208 state = sender.connect(receiver); 209 } 210 void start() @safe nothrow { 211 state.start(); 212 } 213 }; 214 } 215 216 /// A class extending from SenderObjectBase that wraps any Sender 217 class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) { 218 import concurrency.receiver : ReceiverObjectBase; 219 static assert (models!(typeof(this), isSender)); 220 private Sender sender; 221 this(Sender sender) { 222 this.sender = sender; 223 } 224 OperationObject connect(ReceiverObjectBase!(Sender.Value) receiver) @trusted { 225 auto state = sender.connectHeap(receiver); 226 return OperationObject(cast(typeof(OperationObject._start))&state.start); 227 } 228 OperationObject connect(Receiver)(Receiver receiver) { 229 auto base = cast(SenderObjectBase!(Sender.Value))this; 230 return base.connect(receiver); 231 } 232 } 233 234 /// Converts any Sender to a polymorphic SenderObject 235 auto toSenderObject(Sender)(Sender sender) { 236 static assert(models!(Sender, isSender)); 237 static if (is(Sender : SenderObjectBase!(Sender.Value))) { 238 return sender; 239 } else 240 return cast(SenderObjectBase!(Sender.Value))new SenderObjectImpl!(Sender)(sender); 241 } 242 243 /// A sender that always sets an error 244 struct ThrowingSender { 245 alias Value = void; 246 static struct Op(Receiver) { 247 Receiver receiver; 248 void start() { 249 receiver.setError(new Exception("ThrowingSender")); 250 } 251 } 252 auto connect(Receiver)(return Receiver receiver) @safe scope return { 253 // ensure NRVO 254 auto op = Op!Receiver(receiver); 255 return op; 256 } 257 } 258 259 /// This tests whether a Sender, by itself, makes any calls to the 260 /// setError function. 261 /// If a Sender is connected to a Receiver that has a non-nothrow 262 /// setValue function, a Sender can still throw, but only Exceptions 263 /// throw from that Receiver's setValue function. 264 template canSenderThrow(Sender) { 265 static assert (models!(Sender, isSender)); 266 struct NoErrorReceiver { 267 void setDone() nothrow @safe @nogc {} 268 static if (is(Sender.Value == void)) 269 void setValue() nothrow @safe @nogc {} 270 else 271 void setValue(Sender.Value t) nothrow @safe @nogc {} 272 } 273 enum canSenderThrow = !__traits(compiles, Sender.init.connect(NoErrorReceiver())); 274 } 275 276 static assert( canSenderThrow!ThrowingSender); 277 static assert(!canSenderThrow!(ValueSender!int)); 278 279 /// A sender that always calls setDone 280 struct DoneSender { 281 static assert (models!(typeof(this), isSender)); 282 alias Value = void; 283 static struct DoneOp(Receiver) { 284 Receiver receiver; 285 void start() { 286 receiver.setDone(); 287 } 288 } 289 auto connect(Receiver)(return Receiver receiver) @safe scope return { 290 // ensure NRVO 291 auto op = DoneOp!(Receiver)(receiver); 292 return op; 293 } 294 } 295 296 /// A sender that always calls setValue with no args 297 struct VoidSender { 298 static assert (models!(typeof(this), isSender)); 299 alias Value = void; 300 struct VoidOp(Receiver) { 301 Receiver receiver; 302 void start() nothrow @safe { 303 import concurrency.receiver : setValueOrError; 304 receiver.setValueOrError(); 305 } 306 } 307 auto connect(Receiver)(return Receiver receiver) @safe scope return{ 308 // ensure NRVO 309 auto op = VoidOp!Receiver(receiver); 310 return op; 311 } 312 } 313 314 template OpType(Sender, Receiver) { 315 static if (is(Sender.Op)) { 316 alias OpType = Sender.Op; 317 } else { 318 import std.traits : ReturnType; 319 import std.meta : staticMap; 320 template GetOpType(alias connect) { 321 static if (__traits(isTemplate, connect)) { 322 alias GetOpType = ReturnType!(connect!Receiver);//(Receiver.init)); 323 } else { 324 alias GetOpType = ReturnType!(connect);//(Receiver.init)); 325 } 326 } 327 alias overloads = __traits(getOverloads, Sender, "connect", true); 328 alias opTypes = staticMap!(GetOpType, overloads); 329 alias OpType = opTypes[0]; 330 } 331 } 332 333 /// A sender that delays before calling setValue 334 struct DelaySender { 335 alias Value = void; 336 Duration dur; 337 auto connect(Receiver)(return Receiver receiver) @safe return scope { 338 // ensure NRVO 339 auto op = receiver.getScheduler().scheduleAfter(dur).connect(receiver); 340 return op; 341 } 342 } 343 344 auto delay(Duration dur) { 345 return DelaySender(dur); 346 }