1 module concurrency.operations.then; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 9 auto then(Sender, Fun)(Sender sender, Fun fun) { 10 import std.traits : hasFunctionAttributes, isFunction, isFunctionPointer; 11 static assert (isFunction!Fun || isFunctionPointer!Fun || hasFunctionAttributes!(Fun, "shared"), "Function must be shared"); 12 13 return ThenSender!(Sender, Fun)(sender, fun); 14 } 15 16 private struct ThenReceiver(Receiver, Value, Fun) { 17 import std.traits : ReturnType; 18 Receiver receiver; 19 Fun fun; 20 static if (is(Value == void)) { 21 void setValue() @safe { 22 static if (is(ReturnType!Fun == void)) { 23 fun(); 24 receiver.setValue(); 25 } else 26 receiver.setValue(fun()); 27 } 28 } else { 29 import std.typecons : isTuple; 30 enum isExpandable = isTuple!Value; 31 void setValue(Value value) @safe { 32 static if (is(ReturnType!Fun == void)) { 33 static if (isExpandable) 34 fun(value.expand); 35 else 36 fun(value); 37 receiver.setValue(); 38 } else { 39 static if (isExpandable) 40 auto r = fun(value.expand); 41 else 42 auto r = fun(value); 43 receiver.setValue(r); 44 } 45 } 46 } 47 void setDone() @safe nothrow { 48 receiver.setDone(); 49 } 50 void setError(Exception e) @safe nothrow { 51 receiver.setError(e); 52 } 53 mixin ForwardExtensionPoints!receiver; 54 } 55 56 struct ThenSender(Sender, Fun) if (models!(Sender, isSender)) { 57 import std.traits : ReturnType; 58 static assert(models!(typeof(this), isSender)); 59 alias Value = ReturnType!fun; 60 Sender sender; 61 Fun fun; 62 auto connect(Receiver)(return Receiver receiver) @safe scope return { 63 alias R = ThenReceiver!(Receiver, Sender.Value, Fun); 64 // ensure NRVO 65 auto op = sender.connect(R(receiver, fun)); 66 return op; 67 } 68 }