1 module concurrency.operations.then;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 
9 auto then(Sender, Fun)(Sender sender, Fun fun) {
10   import concurrency.utils;
11   static assert (isThreadSafeFunction!Fun);
12   return ThenSender!(Sender, Fun)(sender, fun);
13 }
14 
15 private struct ThenReceiver(Receiver, Value, Fun) {
16   import std.traits : ReturnType;
17   Receiver receiver;
18   Fun fun;
19   static if (is(Value == void)) {
20     void setValue() @safe {
21       static if (is(ReturnType!Fun == void)) {
22         fun();
23         receiver.setValue();
24       } else
25         receiver.setValue(fun());
26     }
27   } else {
28     import std.typecons : isTuple;
29     enum isExpandable = isTuple!Value && __traits(compiles, {fun(Value.init.expand);});
30     void setValue(Value value) @safe {
31       static if (is(ReturnType!Fun == void)) {
32         static if (isExpandable)
33           fun(value.expand);
34         else
35           fun(value);
36         receiver.setValue();
37       } else {
38         static if (isExpandable)
39           auto r = fun(value.expand);
40         else
41           auto r = fun(value);
42         receiver.setValue(r);
43       }
44     }
45   }
46   void setDone() @safe nothrow {
47     receiver.setDone();
48   }
49   void setError(Throwable e) @safe nothrow {
50     receiver.setError(e);
51   }
52   mixin ForwardExtensionPoints!receiver;
53 }
54 
55 struct ThenSender(Sender, Fun) if (models!(Sender, isSender)) {
56   import std.traits : ReturnType;
57   static assert(models!(typeof(this), isSender));
58   alias Value = ReturnType!fun;
59   Sender sender;
60   Fun fun;
61   auto connect(Receiver)(return Receiver receiver) @safe scope return {
62     alias R = ThenReceiver!(Receiver, Sender.Value, Fun);
63     // ensure NRVO
64     auto op = sender.connect(R(receiver, fun));
65     return op;
66   }
67 }