1 module concurrency.operations.withstoptoken; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 import concurrency.utils : isThreadSafeFunction; 10 11 deprecated("function passed is not shared @safe delegate or @safe function.") 12 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) { 13 return STSender!(Sender, Fun)(sender, fun); 14 } 15 16 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (isThreadSafeFunction!Fun) { 17 return STSender!(Sender, Fun)(sender, fun); 18 } 19 20 private struct STReceiver(Receiver, Value, Fun) { 21 Receiver receiver; 22 Fun fun; 23 this(return scope Receiver receiver, return Fun fun) @safe return scope { 24 this.receiver = receiver; 25 this.fun = fun; 26 } 27 static if (is(Value == void)) { 28 void setValue() @safe { 29 static if (is(ReturnType!Fun == void)) { 30 fun(receiver.getStopToken); 31 if (receiver.getStopToken.isStopRequested) 32 receiver.setDone(); 33 else 34 receiver.setValueOrError(); 35 } else 36 receiver.setValueOrError(fun(receiver.getStopToken)); 37 } 38 } else { 39 import std.typecons : isTuple; 40 enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);}); 41 void setValue(Value value) @safe { 42 static if (is(ReturnType!Fun == void)) { 43 static if (isExpandable) 44 fun(receiver.getStopToken, value.expand); 45 else 46 fun(receiver.getStopToken, value); 47 if (receiver.getStopToken.isStopRequested) 48 receiver.setDone(); 49 else 50 receiver.setValueOrError(); 51 } else { 52 static if (isExpandable) 53 auto r = fun(receiver.getStopToken, value.expand); 54 else 55 auto r = fun(receiver.getStopToken, value); 56 receiver.setValueOrError(r); 57 } 58 } 59 } 60 void setDone() nothrow @safe { 61 receiver.setDone(); 62 } 63 void setError(Throwable e) nothrow @safe { 64 receiver.setError(e); 65 } 66 mixin ForwardExtensionPoints!receiver; 67 } 68 69 struct STSender(Sender, Fun) if (models!(Sender, isSender)) { 70 static assert(models!(typeof(this), isSender)); 71 alias Value = ReturnType!fun; 72 Sender sender; 73 Fun fun; 74 auto connect(Receiver)(return Receiver receiver) @safe return scope { 75 alias R = STReceiver!(Receiver, Sender.Value, Fun); 76 // ensure NRVO 77 auto op = sender.connect(R(receiver, fun)); 78 return op; 79 } 80 }