1 module concurrency.sender;
2 
3 import concepts;
4 import std.traits : ReturnType, isCallable;
5 import core.time : Duration;
6 
7 // A Sender represents something that completes with either:
8 // 1. a value (which can be void)
9 // 2. completion, in response to cancellation
10 // 3. an Exception
11 //
12 // Many things can be represented as a Sender.
13 // Threads, Fibers, coroutines, etc. In general, any async operation.
14 //
15 // A Sender is lazy. Work it represents is only started when
16 // the sender is connected to a receiver and explicitly started.
17 //
18 // Senders and Receivers go hand in hand. Senders send a value,
19 // Receivers receive one.
20 //
21 // Senders are useful because many Tasks can be represented as them,
22 // and any operation on top of senders then works on any one of those
23 // Tasks.
24 //
25 // The most common operation is `sync_wait`. It blocks the current
26 // execution context to await the Sender.
27 //
28 // There are many others as well. Like `when_all`, `retry`, `when_any`,
29 // etc. These algorithms can be used on any sender.
30 //
31 // Cancellation happens through StopTokens. A Sender can ask a Receiver
32 // for a StopToken. Default is a NeverStopToken but Receiver's can
33 // customize this.
34 //
35 // The StopToken can be polled or a callback can be registered with one.
36 //
37 // Senders enforce Structured Concurrency because work cannot be
38 // started unless it is awaited.
39 //
40 // These concepts are heavily inspired by several C++ proposals
41 // starting with http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html
42 
43 /// checks that T is a Sender
44 void checkSender(T)() @safe {
45   import concurrency.scheduler : SchedulerObjectBase;
46   import concurrency.stoptoken : StopToken;
47   T t = T.init;
48   struct Receiver {
49     static if (is(T.Value == void))
50       void setValue() {}
51     else
52       void setValue(T.Value) {}
53     void setDone() nothrow {}
54     void setError(Exception e) nothrow {}
55     StopToken getStopToken() nothrow { return StopToken.init; }
56     SchedulerObjectBase getScheduler() nothrow { return null; }
57   }
58   OpType!(T, Receiver) op = t.connect(Receiver.init);
59   static if (!isValidOp!(T, Receiver))
60     pragma(msg, "Warning: ", T, "'s operation state is not returned via the stack");
61 }
62 enum isSender(T) = is(typeof(checkSender!T));
63 
64 /// It is ok for the operation state to be on the heap, but if it is on the stack we need to ensure any copies are elided. We can't be 100% sure (the compiler may still blit), but this is the best we can do.
65 template isValidOp(Sender, Receiver) {
66   import std.traits : isPointer;
67   import std.meta : allSatisfy;
68   alias overloads = __traits(getOverloads, Sender, "connect", true);
69   template isRVO(alias connect) {
70     static if (__traits(isTemplate, connect))
71       enum isRVO = __traits(isReturnOnStack, connect!Receiver);
72     else
73       enum isRVO = __traits(isReturnOnStack, connect);
74   }
75   alias Op = OpType!(Sender, Receiver);
76   enum isValidOp = isPointer!Op || is(Op == OperationObject) || is(Op == class) || (allSatisfy!(isRVO, overloads) && !__traits(isPOD, Op));
77 }
78 
79 /// A Sender that sends a single value of type T
80 struct ValueSender(T) {
81   static assert (models!(typeof(this), isSender));
82   alias Value = T;
83   static struct Op(Receiver) {
84     Receiver receiver;
85     static if (!is(T == void))
86       T value;
87     void start() nothrow @trusted scope {
88       import concurrency.receiver : setValueOrError;
89       static if (!is(T == void))
90         receiver.setValueOrError(value);
91       else
92         receiver.setValueOrError();
93     }
94   }
95   static if (!is(T == void))
96     T value;
97   Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return {
98     // ensure NRVO
99     static if (!is(T == void))
100       auto op = Op!(Receiver)(receiver, value);
101     else
102       auto op = Op!(Receiver)(receiver);
103     return op;
104   }
105 }
106 
107 auto just(T...)(T t) {
108   import std.typecons : tuple, Tuple;
109   static if (T.length == 1)
110     return ValueSender!(T[0])(t);
111   else
112     return ValueSender!(Tuple!T)(tuple(t));
113 }
114 
115 struct JustFromSender(Fun) {
116   static assert (models!(typeof(this), isSender));
117   alias Value = ReturnType!fun;
118   static struct Op(Receiver) {
119     Receiver receiver;
120     Fun fun;
121     void start() @safe nothrow {
122       import std.traits : hasFunctionAttributes;
123       static if (hasFunctionAttributes!(Fun, "nothrow")) {
124         set();
125       } else {
126         try {
127           set();
128         } catch (Exception e) {
129           receiver.setError(e);
130         }
131       }
132     }
133     private void set() @safe {
134       import concurrency.receiver : setValueOrError;
135       static if (is(Value == void)) {
136         fun();
137         receiver.setValueOrError();
138       } else
139         receiver.setValueOrError(fun());
140     }
141   }
142   Fun fun;
143   Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return {
144     // ensure NRVO
145     auto op = Op!(Receiver)(receiver, fun);
146     return op;
147   }
148 }
149 
150 JustFromSender!(Fun) justFrom(Fun)(Fun fun) if (isCallable!Fun) {
151   import std.traits : hasFunctionAttributes, isFunction, isFunctionPointer;
152   static assert (isFunction!Fun || isFunctionPointer!Fun || hasFunctionAttributes!(Fun, "shared"), "Function must be shared");
153   return JustFromSender!Fun(fun);
154 }
155 
156 /// A polymorphic sender of type T
157 interface SenderObjectBase(T) {
158   import concurrency.receiver;
159   import concurrency.scheduler : SchedulerObjectBase;
160   import concurrency.stoptoken : StopToken, stopTokenObject;
161   static assert (models!(typeof(this), isSender));
162   alias Value = T;
163   alias Op = OperationObject;
164   OperationObject connect(ReceiverObjectBase!(T) receiver) @safe;
165   OperationObject connect(Receiver)(return Receiver receiver) @trusted scope {
166     return connect(new class(receiver) ReceiverObjectBase!T {
167       Receiver receiver;
168       this(Receiver receiver) {
169         this.receiver = receiver;
170       }
171       static if (is(T == void)) {
172         void setValue() {
173           receiver.setValueOrError();
174         }
175       } else {
176         void setValue(T value) {
177           receiver.setValueOrError(value);
178         }
179       }
180       void setDone() nothrow {
181         receiver.setDone();
182       }
183       void setError(Exception e) nothrow {
184         receiver.setError(e);
185       }
186       StopToken getStopToken() nothrow {
187         return stopTokenObject(receiver.getStopToken());
188       }
189       SchedulerObjectBase getScheduler() nothrow @safe {
190         import concurrency.scheduler : toSchedulerObject;
191         return receiver.getScheduler().toSchedulerObject;
192       }
193     });
194   }
195 }
196 
197 /// Type-erased operational state object
198 /// used in polymorphic senders
199 struct OperationObject {
200   private void delegate() nothrow shared _start;
201   void start() nothrow @trusted { _start(); }
202 }
203 
204 interface OperationalStateBase {
205   void start() @safe nothrow;
206 }
207 
208 /// calls connect on the Sender but stores the OperationState on the heap
209 OperationalStateBase connectHeap(Sender, Receiver)(Sender sender, Receiver receiver) {
210   alias State = typeof(sender.connect(receiver));
211   return new class(sender, receiver) OperationalStateBase {
212     State state;
213     this(Sender sender, Receiver receiver) {
214       state = sender.connect(receiver);
215     }
216     void start() @safe nothrow {
217       state.start();
218     }
219   };
220 }
221 
222 /// A class extending from SenderObjectBase that wraps any Sender
223 class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) {
224   import concurrency.receiver : ReceiverObjectBase;
225   static assert (models!(typeof(this), isSender));
226   private Sender sender;
227   this(Sender sender) {
228     this.sender = sender;
229   }
230   OperationObject connect(ReceiverObjectBase!(Sender.Value) receiver) @trusted {
231     auto state = sender.connectHeap(receiver);
232     return OperationObject(cast(typeof(OperationObject._start))&state.start);
233   }
234   OperationObject connect(Receiver)(Receiver receiver) {
235     auto base = cast(SenderObjectBase!(Sender.Value))this;
236     return base.connect(receiver);
237   }
238 }
239 
240 /// Converts any Sender to a polymorphic SenderObject
241 auto toSenderObject(Sender)(Sender sender) {
242   static assert(models!(Sender, isSender));
243   static if (is(Sender : SenderObjectBase!(Sender.Value))) {
244     return sender;
245   } else
246     return cast(SenderObjectBase!(Sender.Value))new SenderObjectImpl!(Sender)(sender);
247 }
248 
249 /// A sender that always sets an error
250 struct ThrowingSender {
251   alias Value = void;
252   static struct Op(Receiver) {
253     Receiver receiver;
254     void start() {
255       receiver.setError(new Exception("ThrowingSender"));
256     }
257   }
258   auto connect(Receiver)(return Receiver receiver) @safe scope return {
259     // ensure NRVO
260     auto op = Op!Receiver(receiver);
261     return op;
262   }
263 }
264 
265 /// This tests whether a Sender, by itself, makes any calls to the
266 /// setError function.
267 /// If a Sender is connected to a Receiver that has a non-nothrow
268 /// setValue function, a Sender can still throw, but only Exceptions
269 /// throw from that Receiver's setValue function.
270 template canSenderThrow(Sender) {
271   static assert (models!(Sender, isSender));
272   struct NoErrorReceiver {
273     void setDone() nothrow @safe @nogc {}
274     static if (is(Sender.Value == void))
275       void setValue() nothrow @safe @nogc {}
276     else
277       void setValue(Sender.Value t) nothrow @safe @nogc {}
278   }
279   enum canSenderThrow = !__traits(compiles, Sender.init.connect(NoErrorReceiver()));
280 }
281 
282 static assert( canSenderThrow!ThrowingSender);
283 static assert(!canSenderThrow!(ValueSender!int));
284 
285 /// A sender that always calls setDone
286 struct DoneSender {
287   static assert (models!(typeof(this), isSender));
288   alias Value = void;
289   static struct DoneOp(Receiver) {
290     Receiver receiver;
291     void start() nothrow @trusted scope {
292       receiver.setDone();
293     }
294   }
295   auto connect(Receiver)(return Receiver receiver) @safe scope return {
296     // ensure NRVO
297     auto op = DoneOp!(Receiver)(receiver);
298     return op;
299   }
300 }
301 
302 /// A sender that always calls setValue with no args
303 struct VoidSender {
304   static assert (models!(typeof(this), isSender));
305   alias Value = void;
306   struct VoidOp(Receiver) {
307     Receiver receiver;
308     void start() nothrow @trusted scope {
309       import concurrency.receiver : setValueOrError;
310       receiver.setValueOrError();
311     }
312   }
313   auto connect(Receiver)(return Receiver receiver) @safe scope return{
314     // ensure NRVO
315     auto op = VoidOp!Receiver(receiver);
316     return op;
317   }
318 }
319 /// A sender that always calls setError
320 struct ErrorSender {
321   static assert (models!(typeof(this), isSender));
322   alias Value = void;
323   Exception exception;
324   static struct ErrorOp(Receiver) {
325     Receiver receiver;
326     Exception exception;
327     void start() nothrow @trusted scope {
328       receiver.setError(exception);
329     }
330   }
331   auto connect(Receiver)(return Receiver receiver) @safe scope return {
332     // ensure NRVO
333     auto op = ErrorOp!(Receiver)(receiver, exception);
334     return op;
335   }
336 }
337 
338 template OpType(Sender, Receiver) {
339   static if (is(Sender.Op)) {
340     alias OpType = Sender.Op;
341   } else {
342     import std.traits : ReturnType;
343     import std.meta : staticMap;
344     template GetOpType(alias connect) {
345       static if (__traits(isTemplate, connect)) {
346         alias GetOpType = ReturnType!(connect!Receiver);//(Receiver.init));
347       } else {
348         alias GetOpType = ReturnType!(connect);//(Receiver.init));
349       }
350     }
351     alias overloads = __traits(getOverloads, Sender, "connect", true);
352     alias opTypes = staticMap!(GetOpType, overloads);
353     alias OpType = opTypes[0];
354   }
355 }
356 
357 /// A sender that delays before calling setValue
358 struct DelaySender {
359   alias Value = void;
360   Duration dur;
361   auto connect(Receiver)(return Receiver receiver) @safe return scope {
362     // ensure NRVO
363     auto op = receiver.getScheduler().scheduleAfter(dur).connect(receiver);
364     return op;
365   }
366 }
367 
368 auto delay(Duration dur) {
369   return DelaySender(dur);
370 }
371 
372 struct PromiseSenderOp(T, Receiver) {
373   import concurrency.stoptoken;
374   alias Sender = PromiseSender!T;
375   alias InternalValue = Sender.InternalValue;
376   shared Sender parent;
377   Receiver receiver;
378   StopCallback cb;
379   void start() nothrow @trusted scope {
380     parent.add(&(cast(shared)this).onValue);
381     cb = receiver.getStopToken.onStop(&(cast(shared)this).onStop);
382   }
383   void onStop() nothrow @trusted shared {
384     with(unshared) {
385       parent.remove(&(cast(shared)this).onValue);
386       receiver.setDone();
387     }
388   }
389   void onValue(InternalValue value) nothrow @safe shared {
390     import mir.algebraic : match;
391     with(unshared) {
392       value.match!((Sender.ValueRep v){
393           try {
394             static if (is(Value == void))
395               receiver.setValue();
396             else
397               receiver.setValue(v);
398           } catch (Exception e) {
399             /// TODO: dispose needs to be called in all cases, except
400             /// this onValue can sometimes be called immediately,
401             /// leaving no room to set cb.dispose...
402             cb.dispose();
403             receiver.setError(e);
404           }
405         }, (Exception e){
406           receiver.setError(e);
407         }, (Sender.Done d){
408           receiver.setDone();
409         });
410     }
411   }
412   private auto ref unshared() @trusted nothrow shared {
413     return cast()this;
414   }
415 }
416 
417 class PromiseSender(T) {
418   import std.traits : ReturnType;
419   import concurrency.slist;
420   import concurrency.bitfield;
421   import mir.algebraic : Algebraic, match, Nullable;
422   static assert(models!(typeof(this), isSender));
423   alias Value = T;
424   static if (is(Value == void)) {
425     static struct ValueRep{}
426   } else
427     alias ValueRep = Value;
428   static struct Done{}
429   alias InternalValue = Algebraic!(Exception, ValueRep, Done);
430   alias DG = void delegate(InternalValue) nothrow @safe shared;
431   private {
432     shared SList!DG dgs;
433     Nullable!InternalValue value;
434     enum Flags {
435       locked = 0x1,
436       completed = 0x2
437     }
438     SharedBitField!Flags counter;
439     void add(DG dg) @safe nothrow shared {
440       with(unshared) {
441         with(counter.lock()) {
442           if (was(Flags.completed)) {
443             auto val = value.get;
444             release(); // release early
445             dg(val);
446           } else {
447             dgs.pushBack(dg);
448           }
449         }
450       }
451     }
452     void remove(DG dg) @safe nothrow shared {
453       with (counter.lock()) {
454         if (was(Flags.completed)) {
455           release(); // release early
456         } else {
457           dgs.remove(dg);
458         }
459       }
460     }
461     private auto ref unshared() @trusted nothrow shared {
462       return cast()this;
463     }
464   }
465   private void pushImpl(P)(P t) @safe shared {
466     import std.exception : enforce;
467     with (counter.lock(Flags.completed)) {
468       enforce(!was(Flags.completed), "Can only complete once");
469       InternalValue val = InternalValue(t);
470       (cast()value) = val;
471       auto localDgs = dgs.release();
472       release();
473       foreach(dg; localDgs)
474         dg(val);
475     }
476   }
477   void cancel() @safe shared {
478     pushImpl(Done());
479   }
480   void error(Exception e) @safe shared {
481     pushImpl(e);
482   }
483   void fulfill(T t) @safe shared {
484     pushImpl(t);
485   }
486   bool isCompleted() @trusted shared {
487     import core.atomic : MemoryOrder;
488     return (counter.load!(MemoryOrder.acq) & Flags.completed) > 0;
489   }
490   this() {
491     this.dgs = new shared SList!DG;
492   }
493   auto connect(Receiver)(return Receiver receiver) @trusted scope {
494     // ensure NRVO
495     auto op = (cast(shared)this).connect(receiver);
496     return op;
497   }
498   auto connect(Receiver)(return Receiver receiver) @safe shared scope return {
499     // ensure NRVO
500     auto op = PromiseSenderOp!(T, Receiver)(this, receiver);
501     return op;
502   }
503 }
504 
505 shared(PromiseSender!T) promise(T)() {
506   return new shared PromiseSender!T();
507 }