1 module concurrency.operations.withstoptoken; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 import concurrency.utils : isThreadSafeFunction; 10 11 deprecated("function passed is not shared @safe delegate or @safe function.") 12 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) { 13 return STSender!(Sender, Fun)(sender, fun); 14 } 15 16 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (isThreadSafeFunction!Fun) { 17 return STSender!(Sender, Fun)(sender, fun); 18 } 19 20 private struct STReceiver(Receiver, Value, Fun) { 21 Receiver receiver; 22 Fun fun; 23 static if (is(Value == void)) { 24 void setValue() @safe { 25 static if (is(ReturnType!Fun == void)) { 26 fun(receiver.getStopToken); 27 if (receiver.getStopToken.isStopRequested) 28 receiver.setDone(); 29 else 30 receiver.setValueOrError(); 31 } else 32 receiver.setValueOrError(fun(receiver.getStopToken)); 33 } 34 } else { 35 import std.typecons : isTuple; 36 enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);}); 37 void setValue(Value value) @safe { 38 static if (is(ReturnType!Fun == void)) { 39 static if (isExpandable) 40 fun(receiver.getStopToken, value.expand); 41 else 42 fun(receiver.getStopToken, value); 43 if (receiver.getStopToken.isStopRequested) 44 receiver.setDone(); 45 else 46 receiver.setValueOrError(); 47 } else { 48 static if (isExpandable) 49 auto r = fun(receiver.getStopToken, value.expand); 50 else 51 auto r = fun(receiver.getStopToken, value); 52 receiver.setValueOrError(r); 53 } 54 } 55 } 56 void setDone() nothrow @safe { 57 receiver.setDone(); 58 } 59 void setError(Throwable e) nothrow @safe { 60 receiver.setError(e); 61 } 62 mixin ForwardExtensionPoints!receiver; 63 } 64 65 struct STSender(Sender, Fun) if (models!(Sender, isSender)) { 66 static assert(models!(typeof(this), isSender)); 67 alias Value = ReturnType!fun; 68 Sender sender; 69 Fun fun; 70 auto connect(Receiver)(return Receiver receiver) @safe scope return { 71 alias R = STReceiver!(Receiver, Sender.Value, Fun); 72 // ensure NRVO 73 auto op = sender.connect(R(receiver, fun)); 74 return op; 75 } 76 }