1 module concurrency.sender;
2 
3 import concepts;
4 import std.traits : ReturnType, isCallable;
5 import core.time : Duration;
6 
7 // A Sender represents something that completes with either:
8 // 1. a value (which can be void)
9 // 2. completion, in response to cancellation
10 // 3. an Exception
11 //
12 // Many things can be represented as a Sender.
13 // Threads, Fibers, coroutines, etc. In general, any async operation.
14 //
15 // A Sender is lazy. Work it represents is only started when
16 // the sender is connected to a receiver and explicitly started.
17 //
18 // Senders and Receivers go hand in hand. Senders send a value,
19 // Receivers receive one.
20 //
21 // Senders are useful because many Tasks can be represented as them,
22 // and any operation on top of senders then works on any one of those
23 // Tasks.
24 //
25 // The most common operation is `sync_wait`. It blocks the current
26 // execution context to await the Sender.
27 //
28 // There are many others as well. Like `when_all`, `retry`, `when_any`,
29 // etc. These algorithms can be used on any sender.
30 //
31 // Cancellation happens through StopTokens. A Sender can ask a Receiver
32 // for a StopToken. Default is a NeverStopToken but Receiver's can
33 // customize this.
34 //
35 // The StopToken can be polled or a callback can be registered with one.
36 //
37 // Senders enforce Structured Concurrency because work cannot be
38 // started unless it is awaited.
39 //
40 // These concepts are heavily inspired by several C++ proposals
41 // starting with http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html
42 
43 /// checks that T is a Sender
44 void checkSender(T)() @safe {
45   import concurrency.scheduler : SchedulerObjectBase;
46   T t = T.init;
47   struct Receiver {
48     static if (is(T.Value == void))
49       void setValue() {}
50     else
51       void setValue(T.Value) {}
52     void setDone() nothrow {}
53     void setError(Exception e) nothrow {}
54     SchedulerObjectBase getScheduler() nothrow { return null; }
55   }
56   OpType!(T, Receiver) op = t.connect(Receiver.init);
57   static if (!isValidOp!(T, Receiver))
58     pragma(msg, "Warning: ", T, "'s operation state is not returned via the stack");
59 }
60 enum isSender(T) = is(typeof(checkSender!T));
61 
62 /// It is ok for the operation state to be on the heap, but if it is on the stack we need to ensure any copies are elided. We can't be 100% sure (the compiler may still blit), but this is the best we can do.
63 template isValidOp(Sender, Receiver) {
64   import std.traits : isPointer;
65   import std.meta : allSatisfy;
66   alias overloads = __traits(getOverloads, Sender, "connect", true);
67   template isRVO(alias connect) {
68     static if (__traits(isTemplate, connect))
69       enum isRVO = __traits(isReturnOnStack, connect!Receiver);
70     else
71       enum isRVO = __traits(isReturnOnStack, connect);
72   }
73   alias Op = OpType!(Sender, Receiver);
74   enum isValidOp = isPointer!Op || is(Op == OperationObject) || is(Op == class) || (allSatisfy!(isRVO, overloads) && !__traits(isPOD, Op));
75 }
76 
77 /// A Sender that sends a single value of type T
78 struct ValueSender(T) {
79   static assert (models!(typeof(this), isSender));
80   alias Value = T;
81   static struct Op(Receiver) {
82     Receiver receiver;
83     static if (!is(T == void))
84       T value;
85     void start() @safe {
86       import concurrency.receiver : setValueOrError;
87       static if (!is(T == void))
88         receiver.setValueOrError(value);
89       else
90         receiver.setValueOrError();
91     }
92   }
93   static if (!is(T == void))
94     T value;
95   Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return {
96     // ensure NRVO
97     static if (!is(T == void))
98       auto op = Op!(Receiver)(receiver, value);
99     else
100       auto op = Op!(Receiver)(receiver);
101     return op;
102   }
103 }
104 
105 ValueSender!T just(T)(T t) {
106   return ValueSender!T(t);
107 }
108 
109 struct JustFromSender(Fun) {
110   static assert (models!(typeof(this), isSender));
111   alias Value = ReturnType!fun;
112   static struct Op(Receiver) {
113     Receiver receiver;
114     Fun fun;
115     void start() @safe nothrow {
116       import std.traits : hasFunctionAttributes;
117       static if (hasFunctionAttributes!(Fun, "nothrow")) {
118         set();
119       } else {
120         try {
121           set();
122         } catch (Exception e) {
123           receiver.setError(e);
124         }
125       }
126     }
127     private void set() @safe {
128       import concurrency.receiver : setValueOrError;
129       static if (is(Value == void)) {
130         fun();
131         receiver.setValueOrError();
132       } else
133         receiver.setValueOrError(fun());
134     }
135   }
136   Fun fun;
137   Op!Receiver connect(Receiver)(return Receiver receiver) @safe scope return {
138     // ensure NRVO
139     auto op = Op!(Receiver)(receiver, fun);
140     return op;
141   }
142 }
143 
144 JustFromSender!(Fun) justFrom(Fun)(Fun fun) if (isCallable!Fun) {
145   import std.traits : hasFunctionAttributes, isFunction, isFunctionPointer;
146   static assert (isFunction!Fun || isFunctionPointer!Fun || hasFunctionAttributes!(Fun, "shared"), "Function must be shared");
147   return JustFromSender!Fun(fun);
148 }
149 
150 /// A polymorphic sender of type T
151 interface SenderObjectBase(T) {
152   import concurrency.receiver;
153   import concurrency.scheduler : SchedulerObjectBase;
154   import concurrency.stoptoken : StopTokenObject, stopTokenObject;
155   static assert (models!(typeof(this), isSender));
156   alias Value = T;
157   alias Op = OperationObject;
158   OperationObject connect(ReceiverObjectBase!(T) receiver) @safe;
159   OperationObject connect(Receiver)(return Receiver receiver) @trusted scope {
160     return connect(new class(receiver) ReceiverObjectBase!T {
161       Receiver receiver;
162       this(Receiver receiver) {
163         this.receiver = receiver;
164       }
165       static if (is(T == void)) {
166         void setValue() {
167           receiver.setValueOrError();
168         }
169       } else {
170         void setValue(T value) {
171           receiver.setValueOrError(value);
172         }
173       }
174       void setDone() nothrow {
175         receiver.setDone();
176       }
177       void setError(Exception e) nothrow {
178         receiver.setError(e);
179       }
180       StopTokenObject getStopToken() nothrow {
181         return stopTokenObject(receiver.getStopToken());
182       }
183       SchedulerObjectBase getScheduler() nothrow @safe {
184         import concurrency.scheduler : toSchedulerObject;
185         return receiver.getScheduler().toSchedulerObject;
186       }
187     });
188   }
189 }
190 
191 /// Type-erased operational state object
192 /// used in polymorphic senders
193 struct OperationObject {
194   private void delegate() nothrow shared _start;
195   void start() nothrow @trusted { _start(); }
196 }
197 
198 interface OperationalStateBase {
199   void start() @safe nothrow;
200 }
201 
202 /// calls connect on the Sender but stores the OperationState on the heap
203 OperationalStateBase connectHeap(Sender, Receiver)(Sender sender, Receiver receiver) {
204   alias State = typeof(sender.connect(receiver));
205   return new class(sender, receiver) OperationalStateBase {
206     State state;
207     this(Sender sender, Receiver receiver) {
208       state = sender.connect(receiver);
209     }
210     void start() @safe nothrow {
211       state.start();
212     }
213   };
214 }
215 
216 /// A class extending from SenderObjectBase that wraps any Sender
217 class SenderObjectImpl(Sender) : SenderObjectBase!(Sender.Value) {
218   import concurrency.receiver : ReceiverObjectBase;
219   static assert (models!(typeof(this), isSender));
220   private Sender sender;
221   this(Sender sender) {
222     this.sender = sender;
223   }
224   OperationObject connect(ReceiverObjectBase!(Sender.Value) receiver) @trusted {
225     auto state = sender.connectHeap(receiver);
226     return OperationObject(cast(typeof(OperationObject._start))&state.start);
227   }
228   OperationObject connect(Receiver)(Receiver receiver) {
229     auto base = cast(SenderObjectBase!(Sender.Value))this;
230     return base.connect(receiver);
231   }
232 }
233 
234 /// Converts any Sender to a polymorphic SenderObject
235 auto toSenderObject(Sender)(Sender sender) {
236   static assert(models!(Sender, isSender));
237   static if (is(Sender : SenderObjectBase!(Sender.Value))) {
238     return sender;
239   } else
240     return cast(SenderObjectBase!(Sender.Value))new SenderObjectImpl!(Sender)(sender);
241 }
242 
243 /// A sender that always sets an error
244 struct ThrowingSender {
245   alias Value = void;
246   static struct Op(Receiver) {
247     Receiver receiver;
248     void start() {
249       receiver.setError(new Exception("ThrowingSender"));
250     }
251   }
252   auto connect(Receiver)(return Receiver receiver) @safe scope return {
253     // ensure NRVO
254     auto op = Op!Receiver(receiver);
255     return op;
256   }
257 }
258 
259 /// This tests whether a Sender, by itself, makes any calls to the
260 /// setError function.
261 /// If a Sender is connected to a Receiver that has a non-nothrow
262 /// setValue function, a Sender can still throw, but only Exceptions
263 /// throw from that Receiver's setValue function.
264 template canSenderThrow(Sender) {
265   static assert (models!(Sender, isSender));
266   struct NoErrorReceiver {
267     void setDone() nothrow @safe @nogc {}
268     static if (is(Sender.Value == void))
269       void setValue() nothrow @safe @nogc {}
270     else
271       void setValue(Sender.Value t) nothrow @safe @nogc {}
272   }
273   enum canSenderThrow = !__traits(compiles, Sender.init.connect(NoErrorReceiver()));
274 }
275 
276 static assert( canSenderThrow!ThrowingSender);
277 static assert(!canSenderThrow!(ValueSender!int));
278 
279 /// A sender that always calls setDone
280 struct DoneSender {
281   static assert (models!(typeof(this), isSender));
282   alias Value = void;
283   static struct DoneOp(Receiver) {
284     Receiver receiver;
285     void start() {
286       receiver.setDone();
287     }
288   }
289   auto connect(Receiver)(return Receiver receiver) @safe scope return {
290     // ensure NRVO
291     auto op = DoneOp!(Receiver)(receiver);
292     return op;
293   }
294 }
295 
296 /// A sender that always calls setValue with no args
297 struct VoidSender {
298   static assert (models!(typeof(this), isSender));
299   alias Value = void;
300   struct VoidOp(Receiver) {
301     Receiver receiver;
302     void start() nothrow @safe {
303       import concurrency.receiver : setValueOrError;
304       receiver.setValueOrError();
305     }
306   }
307   auto connect(Receiver)(return Receiver receiver) @safe scope return{
308     // ensure NRVO
309     auto op = VoidOp!Receiver(receiver);
310     return op;
311   }
312 }
313 
314 template OpType(Sender, Receiver) {
315   static if (is(Sender.Op)) {
316     alias OpType = Sender.Op;
317   } else {
318     import std.traits : ReturnType;
319     import std.meta : staticMap;
320     template GetOpType(alias connect) {
321       static if (__traits(isTemplate, connect)) {
322         alias GetOpType = ReturnType!(connect!Receiver);//(Receiver.init));
323       } else {
324         alias GetOpType = ReturnType!(connect);//(Receiver.init));
325       }
326     }
327     alias overloads = __traits(getOverloads, Sender, "connect", true);
328     alias opTypes = staticMap!(GetOpType, overloads);
329     alias OpType = opTypes[0];
330   }
331 }
332 
333 /// A sender that delays before calling setValue
334 struct DelaySender {
335   alias Value = void;
336   Duration dur;
337   auto connect(Receiver)(return Receiver receiver) @safe return scope {
338     // ensure NRVO
339     auto op = receiver.getScheduler().scheduleAfter(dur).connect(receiver);
340     return op;
341   }
342 }
343 
344 auto delay(Duration dur) {
345   return DelaySender(dur);
346 }