1 module concurrency.stream.defer; 2 3 import concurrency.sender : isSender; 4 import concurrency.stream.stream; 5 import concepts; 6 import std.traits : ReturnType; 7 8 // Creates a stream of the values resulted by the Senders returned by Fun. 9 auto deferStream(Fun)(Fun fun) if (models!(ReturnType!Fun, isSender)) { 10 import concurrency.utils : isThreadSafeCallable; 11 static assert(isThreadSafeCallable!Fun); 12 alias Sender = ReturnType!Fun; 13 return fromStreamOp!(Sender.Value, void, DeferStreamOp!(Fun))(fun); 14 } 15 16 template DeferStreamOp(Fun) { 17 import concurrency.sender : OpType; 18 alias Sender = ReturnType!Fun; 19 alias DG = CollectDelegate!(Sender.Value); 20 21 struct DeferStreamOp(Receiver) { 22 alias Op = OpType!(Sender, DeferReceiver!(Sender.Value, Receiver)); 23 Fun fun; 24 DG dg; 25 Receiver receiver; 26 Op op; 27 @disable this(ref return scope inout typeof(this) rhs); 28 @disable this(this); 29 this(Fun fun, DG dg, return Receiver receiver) @trusted scope { 30 this.fun = fun; 31 this.dg = dg; 32 this.receiver = receiver; 33 } 34 void start() @trusted nothrow scope { 35 op = fun().connect(DeferReceiver!(Sender.Value, Receiver)(dg, receiver, &start)); 36 op.start(); 37 } 38 } 39 } 40 41 struct DeferReceiver(Value, Receiver) { 42 import concurrency.receiver; 43 alias DG = CollectDelegate!(Value); 44 DG dg; 45 Receiver receiver; 46 void delegate() @safe nothrow reset; 47 static if (!is(Value : void)) { 48 void setValue(Value value) @safe { 49 dg(value); 50 if (!receiver.getStopToken.isStopRequested) 51 reset(); 52 else 53 setDone(); 54 } 55 } else { 56 void setValue() @safe { 57 dg(); 58 if (!receiver.getStopToken.isStopRequested) 59 reset(); 60 else 61 setDone(); 62 } 63 } 64 void setError(Throwable t) @safe nothrow { 65 receiver.setError(t); 66 } 67 void setDone() @safe nothrow { 68 receiver.setDone(); 69 } 70 mixin ForwardExtensionPoints!(receiver); 71 }