1 module concurrency.stream.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 import concurrency.sender : isSender, OpType;
6 import concepts;
7 import std.traits : hasFunctionAttributes;
8 
9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender.
10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called.
11 
12 template CollectDelegate(ElementType) {
13   static if (is(ElementType == void)) {
14     alias CollectDelegate = void delegate() @safe shared;
15   } else {
16     alias CollectDelegate = void delegate(ElementType) @safe shared;
17   }
18 }
19 
20 /// checks that T is a Stream
21 void checkStream(T)() {
22   import std.traits : ReturnType;
23   alias DG = CollectDelegate!(T.ElementType);
24   static if (is(typeof(T.collect!DG)))
25     alias Sender = ReturnType!(T.collect!(DG));
26   else
27     alias Sender = ReturnType!(T.collect);
28   static assert (models!(Sender, isSender));
29 }
30 enum isStream(T) = is(typeof(checkStream!T));
31 
32 /// A polymorphic stream with elements of type T
33 interface StreamObjectBase(T) {
34   import concurrency.sender : SenderObjectBase;
35   alias ElementType = T;
36   static assert (models!(typeof(this), isStream));
37   alias DG = CollectDelegate!(ElementType);
38 
39   SenderObjectBase!void collect(DG dg) @safe;
40 }
41 
42 /// A class extending from StreamObjectBase that wraps any Stream
43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) {
44   import concurrency.receiver : ReceiverObjectBase;
45   static assert (models!(typeof(this), isStream));
46   private Stream stream;
47   this(Stream stream) {
48     this.stream = stream;
49   }
50   alias DG = CollectDelegate!(Stream.ElementType);
51 
52   SenderObjectBase!void collect(DG dg) @safe {
53     import concurrency.sender : toSenderObject;
54     return stream.collect(dg).toSenderObject();
55   }
56 }
57 
58 /// Converts any Stream to a polymorphic StreamObject
59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) {
60   return new StreamObjectImpl!(Stream)(stream);
61 }
62 
63 template StreamProperties(Stream) {
64   import std.traits : ReturnType;
65   alias ElementType = Stream.ElementType;
66   alias DG = CollectDelegate!(ElementType);
67   alias Sender = ReturnType!(Stream.collect);
68   alias Value = Sender.Value;
69 }
70 
71 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) {
72   alias DG = CollectDelegate!(StreamElementType);
73   static struct FromStreamSender {
74     alias Value = SenderValue;
75     Args args;
76     DG dg;
77     auto connect(Receiver)(return Receiver receiver) @safe return scope {
78       // ensure NRVO
79       auto op = Op!(Receiver)(args, dg, receiver);
80       return op;
81     }
82   }
83   static struct FromStream {
84     static assert(models!(typeof(this), isStream));
85     alias ElementType = StreamElementType;
86     Args args;
87     auto collect(DG dg) @safe {
88       return FromStreamSender(args, dg);
89     }
90   }
91   return FromStream(args);
92 }
93 
94 template SchedulerType(Receiver) {
95   import std.traits : ReturnType;
96   alias SchedulerType = ReturnType!(Receiver.getScheduler);
97 }
98 
99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop
100 template loopStream(E) {
101   alias DG = CollectDelegate!(E);
102   auto loopStream(T)(T t) {
103     static struct LoopStream {
104       static assert(models!(typeof(this), isStream));
105       alias ElementType = E;
106       static struct LoopOp(Receiver) {
107         T t;
108         DG dg;
109         Receiver receiver;
110         @disable this(ref return scope typeof(this) rhs);
111         @disable this(this);
112         this(T t, DG dg, Receiver receiver) {
113           this.t = t;
114           this.dg = dg;
115           this.receiver = receiver;
116         }
117         void start() @trusted nothrow scope {
118           try {
119             t.loop(dg, receiver.getStopToken);
120           } catch (Exception e) {
121             receiver.setError(e);
122           }
123           if (receiver.getStopToken().isStopRequested)
124             receiver.setDone();
125           else
126             receiver.setValueOrError();
127         }
128       }
129       static struct LoopSender {
130         alias Value = void;
131         T t;
132         DG dg;
133         auto connect(Receiver)(return Receiver receiver) @safe return scope {
134           // ensure NRVO
135           auto op = LoopOp!(Receiver)(t, dg, receiver);
136           return op;
137         }
138       }
139       T t;
140       auto collect(DG dg) @safe {
141         return LoopSender(t, dg);
142       }
143     }
144     return LoopStream(t);
145   }
146 }