1 module concurrency.operations.via;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 auto via(SenderA, SenderB)(SenderA a, SenderB b) {
11   return ViaSender!(SenderA, SenderB)(a,b);
12 }
13 
14 private enum NoVoid(T) = !is(T == void);
15 
16 private struct ViaAReceiver(ValueB, ValueA, Receiver) {
17   ValueB valueB;
18   Receiver receiver;
19   static if (!is(ValueA == void))
20     void setValue(ValueA valueA) @safe {
21       import std.typecons : tuple;
22       receiver.setValue(tuple(valueB, valueA));
23     }
24   else
25     void setValue() @safe {
26       receiver.setValue(valueB);
27     }
28   void setDone() @safe nothrow {
29     receiver.setDone();
30   }
31   void setError(Throwable e) @safe nothrow {
32     receiver.setError(e);
33   }
34   mixin ForwardExtensionPoints!receiver;
35 }
36 
37 private struct ViaBReceiver(SenderA, ValueB, Receiver) {
38   SenderA senderA;
39   Receiver receiver;
40   static if (!is(ValueB == void)) {
41     // OpType!(SenderA, ViaAReceiver!(ValueB, SenderA.Value, Receiver)) op;
42     void setValue(ValueB val) @safe {
43       // TODO: tried to allocate this on the stack, but failed...
44       auto op = senderA.connectHeap(ViaAReceiver!(ValueB, SenderA.Value, Receiver)(val, receiver));
45       op.start();
46     }
47   } else {
48     // OpType!(SenderA, Receiver) op;
49     void setValue() @safe {
50       // TODO: tried to allocate this on the stack, but failed...
51       auto op = senderA.connectHeap(receiver);
52       op.start();
53     }
54   }
55   void setDone() @safe nothrow {
56     receiver.setDone();
57   }
58   void setError(Throwable e) @safe nothrow {
59     receiver.setError(e);
60   }
61   mixin ForwardExtensionPoints!receiver;
62 }
63 
64 struct ViaSender(SenderA, SenderB) if (models!(SenderA, isSender) && models!(SenderB, isSender)) {
65   static assert(models!(typeof(this), isSender));
66   import std.meta : Filter, AliasSeq;
67   SenderA senderA;
68   SenderB senderB;
69   alias Values = Filter!(NoVoid, AliasSeq!(SenderA.Value, SenderB.Value));
70   static if (Values.length == 0)
71     alias Value = void;
72   else static if (Values.length == 1)
73     alias Value = Values[0];
74   else {
75     import std.typecons : Tuple;
76     alias Value = Tuple!Values;
77   }
78   auto connect(Receiver)(return Receiver receiver) @safe scope return {
79     // ensure NRVO
80     auto op = senderB.connect(ViaBReceiver!(SenderA, SenderB.Value, Receiver)(senderA, receiver));
81     return op;
82   }
83 }
84