1 module concurrency.sender; 2 3 import concepts; 4 import std.traits : ReturnType, isCallable; 5 import core.time : Duration; 6 7 // A Sender represents something that completes with either: 8 // 1. a value (which can be void) 9 // 2. completion, in response to cancellation 10 // 3. an Exception 11 // 12 // Many things can be represented as a Sender. 13 // Threads, Fibers, coroutines, etc. In general, any async operation. 14 // 15 // A Sender is lazy. Work it represents is only started when 16 // the sender is connected to a receiver and explicitly started. 17 // 18 // Senders and Receivers go hand in hand. Senders send a value, 19 // Receivers receive one. 20 // 21 // Senders are useful because many Tasks can be represented as them, 22 // and any operation on top of senders then works on any one of those 23 // Tasks. 24 // 25 // The most common operation is `sync_wait`. It blocks the current 26 // execution context to await the Sender. 27 // 28 // There are many others as well. Like `when_all`, `retry`, `when_any`, 29 // etc. These algorithms can be used on any sender. 30 // 31 // Cancellation happens through StopTokens. A Sender can ask a Receiver 32 // for a StopToken. Default is a NeverStopToken but Receiver's can 33 // customize this. 34 // 35 // The StopToken can be polled or a callback can be registered with one. 36 // 37 // Senders enforce Structured Concurrency because work cannot be 38 // started unless it is awaited. 39 // 40 // These concepts are heavily inspired by several C++ proposals 41 // starting with http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html 42 43 /// checks that T is a Sender 44 void checkSender(T)() @safe { 45 import concurrency.scheduler : SchedulerObjectBase; 46 import concurrency.stoptoken : StopToken; 47 T t = T.init; 48 struct Receiver { 49 static if (is(T.Value == void)) 50 void setValue() {} 51 else 52 void setValue(T.Value) {} 53 void setDone() nothrow {} 54 void setError(Exception e) nothrow {} 55 StopToken getStopToken() nothrow { return StopToken.init; } 56 SchedulerObjectBase getScheduler() nothrow { return null; } 57 } 58 OpType!(T, Receiver) op = t.connect(Receiver.init); 59 static if (!isValidOp!(T, Receiver)) 60 pragma(msg, "Warning: ", T, "'s operation state is not returned via the stack"); 61 } 62 enum isSender(T) = is(typeof(checkSender!T)); 63 64 /// It is ok for the operation state to be on the heap, but if it is on the stack we need to ensure any copies are elided. We can't be 100% sure (the compiler may still blit), but this is the best we can do. 65 template isValidOp(Sender, Receiver) { 66 import std.traits : isPointer; 67 import std.meta : allSatisfy; 68 alias overloads = __traits(getOverloads, Sender, "connect", true); 69 template isRVO(alias connect) { 70 static if (__traits(isTemplate, connect)) 71 enum isRVO = __traits(isReturnOnStack, connect!Receiver); 72 else 73 enum isRVO = __traits(isReturnOnStack, connect); 74 } 75 alias Op = OpType!(Sender, Receiver); 76 enum isValidOp = isPointer!Op || is(Op == OperationObject) || is(Op == class) || (allSatisfy!(isRVO, overloads) && !__traits(isPOD, Op)); 77 } 78 79 /// A Sender that sends a single value of type T 80 struct ValueSender(T) { 81 static assert (models!(typeof(this), isSender)); 82 alias Value = T; 83 static struct Op(Receiver) { 84 Receiver receiver; 85 static if (!is(T == void)) 86 T value; 87 void start() nothrow @trusted scope { 88 import concurrency.receiver : setValueOrError; 89 static if (!is(T == void)) 90 receiver.setValueOrError(value); 91 else 92 receiver.setValueOrError(); 93 } 94 } 95 static if (!is(T == void)) 96 T value; 97 Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return { 98 // ensure NRVO 99 static if (!is(T == void)) 100 auto op = Op!(Receiver)(receiver, value); 101 else 102 auto op = Op!(Receiver)(receiver); 103 return op; 104 } 105 } 106 107 auto just(T...)(T t) { 108 import std.typecons : tuple, Tuple; 109 static if (T.length == 1) 110 return ValueSender!(T[0])(t); 111 else 112 return ValueSender!(Tuple!T)(tuple(t)); 113 } 114 115 struct JustFromSender(Fun) { 116 static assert (models!(typeof(this), isSender)); 117 alias Value = ReturnType!fun; 118 static struct Op(Receiver) { 119 Receiver receiver; 120 Fun fun; 121 void start() @safe nothrow { 122 import std.traits : hasFunctionAttributes; 123 static if (hasFunctionAttributes!(Fun, "nothrow")) { 124 set(); 125 } else { 126 try { 127 set(); 128 } catch (Exception e) { 129 receiver.setError(e); 130 } 131 } 132 } 133 private void set() @safe { 134 import concurrency.receiver : setValueOrError; 135 static if (is(Value == void)) { 136 fun(); 137 receiver.setValueOrError(); 138 } else 139 receiver.setValueOrError(fun()); 140 } 141 } 142 Fun fun; 143 Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return { 144 // ensure NRVO 145 auto op = Op!(Receiver)(receiver, fun); 146 return op; 147 } 148 } 149 150 JustFromSender!(Fun) justFrom(Fun)(Fun fun) if (isCallable!Fun) { 151 import std.traits : hasFunctionAttributes, isFunction, isFunctionPointer; 152 static assert (isFunction!Fun || isFunctionPointer!Fun || hasFunctionAttributes!(Fun, "shared"), "Function must be shared"); 153 return JustFromSender!Fun(fun); 154 } 155 156 /// A polymorphic sender of type T 157 interface SenderObjectBase(T) { 158 import concurrency.receiver; 159 import concurrency.scheduler : SchedulerObjectBase; 160 import concurrency.stoptoken : StopToken, stopTokenObject; 161 static assert (models!(typeof(this), isSender)); 162 alias Value = T; 163 alias Op = OperationObject; 164 OperationObject connect(ReceiverObjectBase!(T) receiver) @safe; 165 OperationObject connect(Receiver)(return Receiver receiver) @trusted scope { 166 return connect(new class(receiver) ReceiverObjectBase!T { 167 Receiver receiver; 168 this(Receiver receiver) { 169 this.receiver = receiver; 170 } 171 static if (is(T == void)) { 172 void setValue() { 173 receiver.setValueOrError(); 174 } 175 } else { 176 void setValue(T value) { 177 receiver.setValueOrError(value); 178 } 179 } 180 void setDone() nothrow { 181 receiver.setDone(); 182 } 183 void setError(Exception e) nothrow { 184 receiver.setError(e); 185 } 186 StopToken getStopToken() nothrow { 187 return stopTokenObject(receiver.getStopToken()); 188 } 189 SchedulerObjectBase getScheduler() nothrow @safe { 190 import concurrency.scheduler : toSchedulerObject; 191 return receiver.getScheduler().toSchedulerObject; 192 } 193 }); 194 } 195 } 196 197 /// Type-erased operational state object 198 /// used in polymorphic senders 199 struct OperationObject { 200 private void delegate() nothrow shared _start; 201 void start() nothrow @trusted { _start(); } 202 } 203 204 interface OperationalStateBase { 205 void start() @safe nothrow; 206 } 207 208 /// calls connect on the Sender but stores the OperationState on the heap 209 OperationalStateBase connectHeap(Sender, Receiver)(Sender sender, Receiver receiver) { 210 alias State = typeof(sender.connect(receiver)); 211 return new class(sender, receiver) OperationalStateBase { 212 State state; 213 this(Sender sender, Receiver receiver) { 214 state = sender.connect(receiver); 215 } 216 void start() @safe nothrow { 217 state.start(); 218 } 219 }; 220 } 221 222 /// A class extending from SenderObjectBase that wraps any Sender 223 class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) { 224 import concurrency.receiver : ReceiverObjectBase; 225 static assert (models!(typeof(this), isSender)); 226 private Sender sender; 227 this(Sender sender) { 228 this.sender = sender; 229 } 230 OperationObject connect(ReceiverObjectBase!(Sender.Value) receiver) @trusted { 231 auto state = sender.connectHeap(receiver); 232 return OperationObject(cast(typeof(OperationObject._start))&state.start); 233 } 234 OperationObject connect(Receiver)(Receiver receiver) { 235 auto base = cast(SenderObjectBase!(Sender.Value))this; 236 return base.connect(receiver); 237 } 238 } 239 240 /// Converts any Sender to a polymorphic SenderObject 241 auto toSenderObject(Sender)(Sender sender) { 242 static assert(models!(Sender, isSender)); 243 static if (is(Sender : SenderObjectBase!(Sender.Value))) { 244 return sender; 245 } else 246 return cast(SenderObjectBase!(Sender.Value))new SenderObjectImpl!(Sender)(sender); 247 } 248 249 /// A sender that always sets an error 250 struct ThrowingSender { 251 alias Value = void; 252 static struct Op(Receiver) { 253 Receiver receiver; 254 void start() { 255 receiver.setError(new Exception("ThrowingSender")); 256 } 257 } 258 auto connect(Receiver)(return Receiver receiver) @safe scope return { 259 // ensure NRVO 260 auto op = Op!Receiver(receiver); 261 return op; 262 } 263 } 264 265 /// This tests whether a Sender, by itself, makes any calls to the 266 /// setError function. 267 /// If a Sender is connected to a Receiver that has a non-nothrow 268 /// setValue function, a Sender can still throw, but only Exceptions 269 /// throw from that Receiver's setValue function. 270 template canSenderThrow(Sender) { 271 static assert (models!(Sender, isSender)); 272 struct NoErrorReceiver { 273 void setDone() nothrow @safe @nogc {} 274 static if (is(Sender.Value == void)) 275 void setValue() nothrow @safe @nogc {} 276 else 277 void setValue(Sender.Value t) nothrow @safe @nogc {} 278 } 279 enum canSenderThrow = !__traits(compiles, Sender.init.connect(NoErrorReceiver())); 280 } 281 282 static assert( canSenderThrow!ThrowingSender); 283 static assert(!canSenderThrow!(ValueSender!int)); 284 285 /// A sender that always calls setDone 286 struct DoneSender { 287 static assert (models!(typeof(this), isSender)); 288 alias Value = void; 289 static struct DoneOp(Receiver) { 290 Receiver receiver; 291 void start() nothrow @trusted scope { 292 receiver.setDone(); 293 } 294 } 295 auto connect(Receiver)(return Receiver receiver) @safe scope return { 296 // ensure NRVO 297 auto op = DoneOp!(Receiver)(receiver); 298 return op; 299 } 300 } 301 302 /// A sender that always calls setValue with no args 303 struct VoidSender { 304 static assert (models!(typeof(this), isSender)); 305 alias Value = void; 306 struct VoidOp(Receiver) { 307 Receiver receiver; 308 void start() nothrow @trusted scope { 309 import concurrency.receiver : setValueOrError; 310 receiver.setValueOrError(); 311 } 312 } 313 auto connect(Receiver)(return Receiver receiver) @safe scope return{ 314 // ensure NRVO 315 auto op = VoidOp!Receiver(receiver); 316 return op; 317 } 318 } 319 /// A sender that always calls setError 320 struct ErrorSender { 321 static assert (models!(typeof(this), isSender)); 322 alias Value = void; 323 Exception exception; 324 static struct ErrorOp(Receiver) { 325 Receiver receiver; 326 Exception exception; 327 void start() nothrow @trusted scope { 328 receiver.setError(exception); 329 } 330 } 331 auto connect(Receiver)(return Receiver receiver) @safe scope return { 332 // ensure NRVO 333 auto op = ErrorOp!(Receiver)(receiver, exception); 334 return op; 335 } 336 } 337 338 template OpType(Sender, Receiver) { 339 static if (is(Sender.Op)) { 340 alias OpType = Sender.Op; 341 } else { 342 import std.traits : ReturnType; 343 import std.meta : staticMap; 344 template GetOpType(alias connect) { 345 static if (__traits(isTemplate, connect)) { 346 alias GetOpType = ReturnType!(connect!Receiver);//(Receiver.init)); 347 } else { 348 alias GetOpType = ReturnType!(connect);//(Receiver.init)); 349 } 350 } 351 alias overloads = __traits(getOverloads, Sender, "connect", true); 352 alias opTypes = staticMap!(GetOpType, overloads); 353 alias OpType = opTypes[0]; 354 } 355 } 356 357 /// A sender that delays before calling setValue 358 struct DelaySender { 359 alias Value = void; 360 Duration dur; 361 auto connect(Receiver)(return Receiver receiver) @safe return scope { 362 // ensure NRVO 363 auto op = receiver.getScheduler().scheduleAfter(dur).connect(receiver); 364 return op; 365 } 366 } 367 368 auto delay(Duration dur) { 369 return DelaySender(dur); 370 } 371 372 struct PromiseSenderOp(T, Receiver) { 373 import concurrency.stoptoken; 374 alias Sender = PromiseSender!T; 375 alias InternalValue = Sender.InternalValue; 376 shared Sender parent; 377 Receiver receiver; 378 StopCallback cb; 379 void start() nothrow @trusted scope { 380 parent.add(&(cast(shared)this).onValue); 381 cb = receiver.getStopToken.onStop(&(cast(shared)this).onStop); 382 } 383 void onStop() nothrow @trusted shared { 384 with(unshared) { 385 parent.remove(&(cast(shared)this).onValue); 386 receiver.setDone(); 387 } 388 } 389 void onValue(InternalValue value) nothrow @safe shared { 390 import mir.algebraic : match; 391 with(unshared) { 392 value.match!((Sender.ValueRep v){ 393 try { 394 static if (is(Value == void)) 395 receiver.setValue(); 396 else 397 receiver.setValue(v); 398 } catch (Exception e) { 399 /// TODO: dispose needs to be called in all cases, except 400 /// this onValue can sometimes be called immediately, 401 /// leaving no room to set cb.dispose... 402 cb.dispose(); 403 receiver.setError(e); 404 } 405 }, (Exception e){ 406 receiver.setError(e); 407 }, (Sender.Done d){ 408 receiver.setDone(); 409 }); 410 } 411 } 412 private auto ref unshared() @trusted nothrow shared { 413 return cast()this; 414 } 415 } 416 417 class PromiseSender(T) { 418 import std.traits : ReturnType; 419 import concurrency.slist; 420 import concurrency.bitfield; 421 import mir.algebraic : Algebraic, match, Nullable; 422 static assert(models!(typeof(this), isSender)); 423 alias Value = T; 424 static if (is(Value == void)) { 425 static struct ValueRep{} 426 } else 427 alias ValueRep = Value; 428 static struct Done{} 429 alias InternalValue = Algebraic!(Exception, ValueRep, Done); 430 alias DG = void delegate(InternalValue) nothrow @safe shared; 431 private { 432 shared SList!DG dgs; 433 Nullable!InternalValue value; 434 enum Flags { 435 locked = 0x1, 436 completed = 0x2 437 } 438 SharedBitField!Flags counter; 439 void add(DG dg) @safe nothrow shared { 440 with(unshared) { 441 with(counter.lock()) { 442 if (was(Flags.completed)) { 443 auto val = value.get; 444 release(); // release early 445 dg(val); 446 } else { 447 dgs.pushBack(dg); 448 } 449 } 450 } 451 } 452 void remove(DG dg) @safe nothrow shared { 453 with (counter.lock()) { 454 if (was(Flags.completed)) { 455 release(); // release early 456 } else { 457 dgs.remove(dg); 458 } 459 } 460 } 461 private auto ref unshared() @trusted nothrow shared { 462 return cast()this; 463 } 464 } 465 private void pushImpl(P)(P t) @safe shared { 466 import std.exception : enforce; 467 with (counter.lock(Flags.completed)) { 468 enforce(!was(Flags.completed), "Can only complete once"); 469 InternalValue val = InternalValue(t); 470 (cast()value) = val; 471 auto localDgs = dgs.release(); 472 release(); 473 foreach(dg; localDgs) 474 dg(val); 475 } 476 } 477 void cancel() @safe shared { 478 pushImpl(Done()); 479 } 480 void error(Exception e) @safe shared { 481 pushImpl(e); 482 } 483 void fulfill(T t) @safe shared { 484 pushImpl(t); 485 } 486 bool isCompleted() @trusted shared { 487 import core.atomic : MemoryOrder; 488 return (counter.load!(MemoryOrder.acq) & Flags.completed) > 0; 489 } 490 this() { 491 this.dgs = new shared SList!DG; 492 } 493 auto connect(Receiver)(return Receiver receiver) @trusted scope { 494 // ensure NRVO 495 auto op = (cast(shared)this).connect(receiver); 496 return op; 497 } 498 auto connect(Receiver)(return Receiver receiver) @safe shared scope return { 499 // ensure NRVO 500 auto op = PromiseSenderOp!(T, Receiver)(this, receiver); 501 return op; 502 } 503 } 504 505 shared(PromiseSender!T) promise(T)() { 506 return new shared PromiseSender!T(); 507 }