1 module concurrency.stream.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 import concurrency.sender : isSender, OpType; 6 import concepts; 7 import std.traits : hasFunctionAttributes; 8 9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender. 10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called. 11 12 template CollectDelegate(ElementType) { 13 static if (is(ElementType == void)) { 14 alias CollectDelegate = void delegate() @safe shared; 15 } else { 16 alias CollectDelegate = void delegate(ElementType) @safe shared; 17 } 18 } 19 20 /// checks that T is a Stream 21 void checkStream(T)() { 22 import std.traits : ReturnType; 23 alias DG = CollectDelegate!(T.ElementType); 24 static if (is(typeof(T.collect!DG))) 25 alias Sender = ReturnType!(T.collect!(DG)); 26 else 27 alias Sender = ReturnType!(T.collect); 28 static assert (models!(Sender, isSender)); 29 } 30 enum isStream(T) = is(typeof(checkStream!T)); 31 32 /// A polymorphic stream with elements of type T 33 interface StreamObjectBase(T) { 34 import concurrency.sender : SenderObjectBase; 35 alias ElementType = T; 36 static assert (models!(typeof(this), isStream)); 37 alias DG = CollectDelegate!(ElementType); 38 39 SenderObjectBase!void collect(DG dg) @safe; 40 } 41 42 /// A class extending from StreamObjectBase that wraps any Stream 43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) { 44 import concurrency.receiver : ReceiverObjectBase; 45 static assert (models!(typeof(this), isStream)); 46 private Stream stream; 47 this(Stream stream) { 48 this.stream = stream; 49 } 50 alias DG = CollectDelegate!(Stream.ElementType); 51 52 SenderObjectBase!void collect(DG dg) @safe { 53 import concurrency.sender : toSenderObject; 54 return stream.collect(dg).toSenderObject(); 55 } 56 } 57 58 /// Converts any Stream to a polymorphic StreamObject 59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) { 60 return new StreamObjectImpl!(Stream)(stream); 61 } 62 63 template StreamProperties(Stream) { 64 import std.traits : ReturnType; 65 alias ElementType = Stream.ElementType; 66 alias DG = CollectDelegate!(ElementType); 67 alias Sender = ReturnType!(Stream.collect); 68 alias Value = Sender.Value; 69 } 70 71 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) { 72 alias DG = CollectDelegate!(StreamElementType); 73 static struct FromStreamSender { 74 alias Value = SenderValue; 75 Args args; 76 DG dg; 77 auto connect(Receiver)(return Receiver receiver) @safe return scope { 78 // ensure NRVO 79 auto op = Op!(Receiver)(args, dg, receiver); 80 return op; 81 } 82 } 83 static struct FromStream { 84 static assert(models!(typeof(this), isStream)); 85 alias ElementType = StreamElementType; 86 Args args; 87 auto collect(DG dg) @safe { 88 return FromStreamSender(args, dg); 89 } 90 } 91 return FromStream(args); 92 } 93 94 template SchedulerType(Receiver) { 95 import std.traits : ReturnType; 96 alias SchedulerType = ReturnType!(Receiver.getScheduler); 97 } 98 99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop 100 template loopStream(E) { 101 alias DG = CollectDelegate!(E); 102 auto loopStream(T)(T t) { 103 static struct LoopStream { 104 static assert(models!(typeof(this), isStream)); 105 alias ElementType = E; 106 static struct LoopOp(Receiver) { 107 T t; 108 DG dg; 109 Receiver receiver; 110 @disable this(ref return scope typeof(this) rhs); 111 @disable this(this); 112 this(T t, DG dg, Receiver receiver) { 113 this.t = t; 114 this.dg = dg; 115 this.receiver = receiver; 116 } 117 void start() @trusted nothrow scope { 118 try { 119 t.loop(dg, receiver.getStopToken); 120 } catch (Exception e) { 121 receiver.setError(e); 122 } 123 if (receiver.getStopToken().isStopRequested) 124 receiver.setDone(); 125 else 126 receiver.setValueOrError(); 127 } 128 } 129 static struct LoopSender { 130 alias Value = void; 131 T t; 132 DG dg; 133 auto connect(Receiver)(return Receiver receiver) @safe return scope { 134 // ensure NRVO 135 auto op = LoopOp!(Receiver)(t, dg, receiver); 136 return op; 137 } 138 } 139 T t; 140 auto collect(DG dg) @safe { 141 return LoopSender(t, dg); 142 } 143 } 144 return LoopStream(t); 145 } 146 }