1 module concurrency.operations.withstoptoken;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 import concurrency.utils : isThreadSafeFunction;
10 
11 deprecated("function passed is not shared @safe delegate or @safe function.")
12 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) {
13   return STSender!(Sender, Fun)(sender, fun);
14  }
15 
16 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (isThreadSafeFunction!Fun) {
17   return STSender!(Sender, Fun)(sender, fun);
18 }
19 
20 private struct STReceiver(Receiver, Value, Fun) {
21   Receiver receiver;
22   Fun fun;
23   this(return scope Receiver receiver, return Fun fun) @safe return scope {
24     this.receiver = receiver;
25     this.fun = fun;
26   }
27   static if (is(Value == void)) {
28     void setValue() @safe {
29       static if (is(ReturnType!Fun == void)) {
30         fun(receiver.getStopToken);
31         if (receiver.getStopToken.isStopRequested)
32           receiver.setDone();
33         else
34           receiver.setValueOrError();
35       } else
36         receiver.setValueOrError(fun(receiver.getStopToken));
37     }
38   } else {
39     import std.typecons : isTuple;
40     enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);});
41     void setValue(Value value) @safe {
42       static if (is(ReturnType!Fun == void)) {
43         static if (isExpandable)
44           fun(receiver.getStopToken, value.expand);
45         else
46           fun(receiver.getStopToken, value);
47         if (receiver.getStopToken.isStopRequested)
48           receiver.setDone();
49         else
50           receiver.setValueOrError();
51       } else {
52         static if (isExpandable)
53           auto r = fun(receiver.getStopToken, value.expand);
54         else
55           auto r = fun(receiver.getStopToken, value);
56         receiver.setValueOrError(r);
57       }
58     }
59   }
60   void setDone() nothrow @safe {
61     receiver.setDone();
62   }
63   void setError(Throwable e) nothrow @safe {
64     receiver.setError(e);
65   }
66   mixin ForwardExtensionPoints!receiver;
67 }
68 
69 struct STSender(Sender, Fun) if (models!(Sender, isSender)) {
70   static assert(models!(typeof(this), isSender));
71   alias Value = ReturnType!fun;
72   Sender sender;
73   Fun fun;
74   auto connect(Receiver)(return Receiver receiver) @safe return scope {
75     alias R = STReceiver!(Receiver, Sender.Value, Fun);
76     // ensure NRVO
77     auto op = sender.connect(R(receiver, fun));
78     return op;
79   }
80 }