1 module concurrency.operations.withstoptoken; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) { 11 return STSender!(Sender, Fun)(sender, fun); 12 } 13 14 private struct STReceiver(Receiver, Value, Fun) { 15 Receiver receiver; 16 Fun fun; 17 static if (is(Value == void)) { 18 void setValue() @safe { 19 static if (is(ReturnType!Fun == void)) { 20 fun(receiver.getStopToken); 21 if (receiver.getStopToken.isStopRequested) 22 receiver.setDone(); 23 else 24 receiver.setValueOrError(); 25 } else 26 receiver.setValueOrError(fun(receiver.getStopToken)); 27 } 28 } else { 29 import std.typecons : isTuple; 30 enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);}); 31 void setValue(Value value) @safe { 32 static if (is(ReturnType!Fun == void)) { 33 static if (isExpandable) 34 fun(receiver.getStopToken, value.expand); 35 else 36 fun(receiver.getStopToken, value); 37 if (receiver.getStopToken.isStopRequested) 38 receiver.setDone(); 39 else 40 receiver.setValueOrError(); 41 } else { 42 static if (isExpandable) 43 auto r = fun(receiver.getStopToken, value.expand); 44 else 45 auto r = fun(receiver.getStopToken, value); 46 receiver.setValueOrError(r); 47 } 48 } 49 } 50 void setDone() nothrow @safe { 51 receiver.setDone(); 52 } 53 void setError(Throwable e) nothrow @safe { 54 receiver.setError(e); 55 } 56 mixin ForwardExtensionPoints!receiver; 57 } 58 59 struct STSender(Sender, Fun) if (models!(Sender, isSender)) { 60 static assert(models!(typeof(this), isSender)); 61 alias Value = ReturnType!fun; 62 Sender sender; 63 Fun fun; 64 auto connect(Receiver)(return Receiver receiver) @safe scope return { 65 alias R = STReceiver!(Receiver, Sender.Value, Fun); 66 // ensure NRVO 67 auto op = sender.connect(R(receiver, fun)); 68 return op; 69 } 70 }