1 module concurrency.operations.withstoptoken;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 import concurrency.utils : isThreadSafeFunction;
10 
11 deprecated("function passed is not shared @safe delegate or @safe function.")
12 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (!isThreadSafeFunction!Fun) {
13   return STSender!(Sender, Fun)(sender, fun);
14  }
15 
16 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) if (isThreadSafeFunction!Fun) {
17   return STSender!(Sender, Fun)(sender, fun);
18 }
19 
20 private struct STReceiver(Receiver, Value, Fun) {
21   Receiver receiver;
22   Fun fun;
23   static if (is(Value == void)) {
24     void setValue() @safe {
25       static if (is(ReturnType!Fun == void)) {
26         fun(receiver.getStopToken);
27         if (receiver.getStopToken.isStopRequested)
28           receiver.setDone();
29         else
30           receiver.setValueOrError();
31       } else
32         receiver.setValueOrError(fun(receiver.getStopToken));
33     }
34   } else {
35     import std.typecons : isTuple;
36     enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);});
37     void setValue(Value value) @safe {
38       static if (is(ReturnType!Fun == void)) {
39         static if (isExpandable)
40           fun(receiver.getStopToken, value.expand);
41         else
42           fun(receiver.getStopToken, value);
43         if (receiver.getStopToken.isStopRequested)
44           receiver.setDone();
45         else
46           receiver.setValueOrError();
47       } else {
48         static if (isExpandable)
49           auto r = fun(receiver.getStopToken, value.expand);
50         else
51           auto r = fun(receiver.getStopToken, value);
52         receiver.setValueOrError(r);
53       }
54     }
55   }
56   void setDone() nothrow @safe {
57     receiver.setDone();
58   }
59   void setError(Throwable e) nothrow @safe {
60     receiver.setError(e);
61   }
62   mixin ForwardExtensionPoints!receiver;
63 }
64 
65 struct STSender(Sender, Fun) if (models!(Sender, isSender)) {
66   static assert(models!(typeof(this), isSender));
67   alias Value = ReturnType!fun;
68   Sender sender;
69   Fun fun;
70   auto connect(Receiver)(return Receiver receiver) @safe scope return {
71     alias R = STReceiver!(Receiver, Sender.Value, Fun);
72     // ensure NRVO
73     auto op = sender.connect(R(receiver, fun));
74     return op;
75   }
76 }