1 module concurrency.operations.then; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 9 auto then(Sender, Fun)(Sender sender, Fun fun) { 10 import concurrency.utils; 11 static assert (isThreadSafeFunction!Fun); 12 return ThenSender!(Sender, Fun)(sender, fun); 13 } 14 15 private struct ThenReceiver(Receiver, Value, Fun) { 16 import std.traits : ReturnType; 17 Receiver receiver; 18 Fun fun; 19 static if (is(Value == void)) { 20 void setValue() @safe { 21 static if (is(ReturnType!Fun == void)) { 22 fun(); 23 receiver.setValue(); 24 } else 25 receiver.setValue(fun()); 26 } 27 } else { 28 import std.typecons : isTuple; 29 enum isExpandable = isTuple!Value && __traits(compiles, {fun(Value.init.expand);}); 30 void setValue(Value value) @safe { 31 static if (is(ReturnType!Fun == void)) { 32 static if (isExpandable) 33 fun(value.expand); 34 else 35 fun(value); 36 receiver.setValue(); 37 } else { 38 static if (isExpandable) 39 auto r = fun(value.expand); 40 else 41 auto r = fun(value); 42 receiver.setValue(r); 43 } 44 } 45 } 46 void setDone() @safe nothrow { 47 receiver.setDone(); 48 } 49 void setError(Throwable e) @safe nothrow { 50 receiver.setError(e); 51 } 52 mixin ForwardExtensionPoints!receiver; 53 } 54 55 struct ThenSender(Sender, Fun) if (models!(Sender, isSender)) { 56 import std.traits : ReturnType; 57 static assert(models!(typeof(this), isSender)); 58 alias Value = ReturnType!fun; 59 Sender sender; 60 Fun fun; 61 auto connect(Receiver)(return Receiver receiver) @safe scope return { 62 alias R = ThenReceiver!(Receiver, Sender.Value, Fun); 63 // ensure NRVO 64 auto op = sender.connect(R(receiver, fun)); 65 return op; 66 } 67 }