1 module concurrency.operations.via; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 auto via(SenderA, SenderB)(SenderA a, SenderB b) { 11 return ViaSender!(SenderA, SenderB)(a,b); 12 } 13 14 private enum NoVoid(T) = !is(T == void); 15 16 private struct ViaAReceiver(ValueB, ValueA, Receiver) { 17 ValueB valueB; 18 Receiver receiver; 19 static if (!is(ValueA == void)) 20 void setValue(ValueA valueA) @safe { 21 import std.typecons : tuple; 22 receiver.setValue(tuple(valueB, valueA)); 23 } 24 else 25 void setValue() @safe { 26 receiver.setValue(valueB); 27 } 28 void setDone() @safe nothrow { 29 receiver.setDone(); 30 } 31 void setError(Throwable e) @safe nothrow { 32 receiver.setError(e); 33 } 34 mixin ForwardExtensionPoints!receiver; 35 } 36 37 private struct ViaBReceiver(SenderA, ValueB, Receiver) { 38 SenderA senderA; 39 Receiver receiver; 40 static if (!is(ValueB == void)) { 41 // OpType!(SenderA, ViaAReceiver!(ValueB, SenderA.Value, Receiver)) op; 42 void setValue(ValueB val) @safe { 43 // TODO: tried to allocate this on the stack, but failed... 44 auto op = senderA.connectHeap(ViaAReceiver!(ValueB, SenderA.Value, Receiver)(val, receiver)); 45 op.start(); 46 } 47 } else { 48 // OpType!(SenderA, Receiver) op; 49 void setValue() @safe { 50 // TODO: tried to allocate this on the stack, but failed... 51 auto op = senderA.connectHeap(receiver); 52 op.start(); 53 } 54 } 55 void setDone() @safe nothrow { 56 receiver.setDone(); 57 } 58 void setError(Throwable e) @safe nothrow { 59 receiver.setError(e); 60 } 61 mixin ForwardExtensionPoints!receiver; 62 } 63 64 struct ViaSender(SenderA, SenderB) if (models!(SenderA, isSender) && models!(SenderB, isSender)) { 65 static assert(models!(typeof(this), isSender)); 66 import std.meta : Filter, AliasSeq; 67 SenderA senderA; 68 SenderB senderB; 69 alias Values = Filter!(NoVoid, AliasSeq!(SenderA.Value, SenderB.Value)); 70 static if (Values.length == 0) 71 alias Value = void; 72 else static if (Values.length == 1) 73 alias Value = Values[0]; 74 else { 75 import std.typecons : Tuple; 76 alias Value = Tuple!Values; 77 } 78 auto connect(Receiver)(return Receiver receiver) @safe scope return { 79 // ensure NRVO 80 auto op = senderB.connect(ViaBReceiver!(SenderA, SenderB.Value, Receiver)(senderA, receiver)); 81 return op; 82 } 83 } 84