1 module concurrency.operations.withstoptoken;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 auto withStopToken(Sender, Fun)(Sender sender, Fun fun) {
11   return STSender!(Sender, Fun)(sender, fun);
12 }
13 
14 private struct STReceiver(Receiver, Value, Fun) {
15   Receiver receiver;
16   Fun fun;
17   static if (is(Value == void)) {
18     void setValue() @safe {
19       static if (is(ReturnType!Fun == void)) {
20         fun(receiver.getStopToken);
21         if (receiver.getStopToken.isStopRequested)
22           receiver.setDone();
23         else
24           receiver.setValueOrError();
25       } else
26         receiver.setValueOrError(fun(receiver.getStopToken));
27     }
28   } else {
29     import std.typecons : isTuple;
30     enum isExpandable = isTuple!Value && __traits(compiles, {fun(StopToken.init, Value.init.expand);});
31     void setValue(Value value) @safe {
32       static if (is(ReturnType!Fun == void)) {
33         static if (isExpandable)
34           fun(receiver.getStopToken, value.expand);
35         else
36           fun(receiver.getStopToken, value);
37         if (receiver.getStopToken.isStopRequested)
38           receiver.setDone();
39         else
40           receiver.setValueOrError();
41       } else {
42         static if (isExpandable)
43           auto r = fun(receiver.getStopToken, value.expand);
44         else
45           auto r = fun(receiver.getStopToken, value);
46         receiver.setValueOrError(r);
47       }
48     }
49   }
50   void setDone() nothrow @safe {
51     receiver.setDone();
52   }
53   void setError(Throwable e) nothrow @safe {
54     receiver.setError(e);
55   }
56   mixin ForwardExtensionPoints!receiver;
57 }
58 
59 struct STSender(Sender, Fun) if (models!(Sender, isSender)) {
60   static assert(models!(typeof(this), isSender));
61   alias Value = ReturnType!fun;
62   Sender sender;
63   Fun fun;
64   auto connect(Receiver)(return Receiver receiver) @safe scope return {
65     alias R = STReceiver!(Receiver, Sender.Value, Fun);
66     // ensure NRVO
67     auto op = sender.connect(R(receiver, fun));
68     return op;
69   }
70 }