1 module concurrency.stream.defer;
2 
3 import concurrency.sender : isSender;
4 import concurrency.stream.stream;
5 import concepts;
6 import std.traits : ReturnType;
7 
8 // Creates a stream of the values resulted by the Senders returned by Fun.
9 auto deferStream(Fun)(Fun fun) if (models!(ReturnType!Fun, isSender)) {
10   import concurrency.utils : isThreadSafeCallable;
11   static assert(isThreadSafeCallable!Fun);
12   alias Sender = ReturnType!Fun;
13   return fromStreamOp!(Sender.Value, void, DeferStreamOp!(Fun))(fun);
14 }
15 
16 template DeferStreamOp(Fun) {
17   import concurrency.sender : OpType;
18   alias Sender = ReturnType!Fun;
19   alias DG = CollectDelegate!(Sender.Value);
20 
21   struct DeferStreamOp(Receiver) {
22     alias Op = OpType!(Sender, DeferReceiver!(Sender.Value, Receiver));
23     Fun fun;
24     DG dg;
25     Receiver receiver;
26     Op op;
27     @disable this(ref return scope inout typeof(this) rhs);
28     @disable this(this);
29     this(Fun fun, DG dg, return Receiver receiver) @trusted scope {
30       this.fun = fun;
31       this.dg = dg;
32       this.receiver = receiver;
33     }
34     void start() @trusted nothrow scope {
35       op = fun().connect(DeferReceiver!(Sender.Value, Receiver)(dg, receiver, &start));
36       op.start();
37     }
38   }
39 }
40 
41 struct DeferReceiver(Value, Receiver) {
42   import concurrency.receiver;
43   alias DG = CollectDelegate!(Value);
44   DG dg;
45   Receiver receiver;
46   void delegate() @safe nothrow reset;
47   static if (!is(Value : void)) {
48     void setValue(Value value) @safe {
49       dg(value);
50       if (!receiver.getStopToken.isStopRequested)
51         reset();
52       else
53         setDone();
54     }
55   } else {
56     void setValue() @safe {
57       dg();
58       if (!receiver.getStopToken.isStopRequested)
59         reset();
60       else
61         setDone();
62     }
63   }
64   void setError(Throwable t) @safe nothrow {
65     receiver.setError(t);
66   }
67   void setDone() @safe nothrow {
68     receiver.setDone();
69   }
70   mixin ForwardExtensionPoints!(receiver);
71 }