1 module concurrency.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 public import concurrency.stream.stream;
6 public import concurrency.stream.filter;
7 public import concurrency.stream.take;
8 public import concurrency.stream.transform;
9 public import concurrency.stream.scan;
10 public import concurrency.stream.sample;
11 public import concurrency.stream.tolist;
12 public import concurrency.stream.slide;
13 public import concurrency.stream.throttling;
14 public import concurrency.stream.cycle;
15 public import concurrency.stream.flatmapconcat;
16 public import concurrency.stream.flatmaplatest;
17 import concurrency.sender : isSender, OpType;
18 import concepts;
19 import std.traits : hasFunctionAttributes;
20 
21 /*
22   catch?
23   combineLatest
24   count
25   debounce
26   distinctUntilChanged
27   drop
28   dropWhile
29   filter
30   first
31   firstOrNull
32   flatMapConcat
33   flatMapLatest
34   flatMapMerge
35   fold
36   map
37   mapLatest
38   merge
39   onEach
40   onEmpty
41   onStart
42   onSubscription
43   reduce (fold with no seed)
44   retry
45   retryWhen
46   runningReduce
47   sample
48   scan (like runningReduce but with initial value)
49   take
50   takeWhile
51   toList
52   transform
53   transformLatest
54   zip
55 */
56 
57 /// Stream that emit the same value until cancelled
58 auto infiniteStream(T)(T t) {
59   alias DG = CollectDelegate!(T);
60   struct Loop {
61     T val;
62     void loop(StopToken)(DG emit, StopToken stopToken) {
63       while(!stopToken.isStopRequested)
64         emit(val);
65     }
66   }
67   return Loop(t).loopStream!T;
68 }
69 
70 /// Stream that emits from start..end or until cancelled
71 auto iotaStream(T)(T start, T end) {
72   alias DG = CollectDelegate!(T);
73   struct Loop {
74     T b,e;
75     void loop(StopToken)(DG emit, StopToken stopToken) {
76       foreach(i; b..e) {
77         emit(i);
78         if (stopToken.isStopRequested)
79           break;
80       }
81     }
82   }
83   return Loop(start, end).loopStream!T;
84 }
85 
86 /// Stream that emits each value from the array or until cancelled
87 auto arrayStream(T)(T[] arr) {
88   alias DG = CollectDelegate!(T);
89   struct Loop {
90     T[] arr;
91     void loop(StopToken)(DG emit, StopToken stopToken) @safe {
92       foreach(item; arr) {
93         emit(item);
94         if (stopToken.isStopRequested)
95           break;
96       }
97     }
98   }
99   return Loop(arr).loopStream!T;
100 }
101 
102 import core.time : Duration;
103 
104 auto intervalStream(Duration duration, bool emitAtStart = false) {
105   alias DG = CollectDelegate!(void);
106   static struct ItemReceiver(Op) {
107     Op* op;
108     void setValue() @safe {
109       if (op.receiver.getStopToken.isStopRequested) {
110         op.receiver.setDone();
111         return;
112       }
113       try {
114         op.dg();
115         if (op.receiver.getStopToken.isStopRequested) {
116           op.receiver.setDone();
117           return;
118         }
119         op.load();
120       } catch (Exception e) {
121         op.receiver.setError(e);
122       }
123     }
124     void setDone() @safe nothrow {
125       op.receiver.setDone();
126     }
127     void setError(Throwable t) @safe nothrow {
128       op.receiver.setError(t);
129     }
130     auto getStopToken() @safe {
131       return op.receiver.getStopToken();
132     }
133     auto getScheduler() @safe {
134       return op.receiver.getScheduler();
135     }
136   }
137   static struct Op(Receiver) {
138     import std.traits : ReturnType;
139     Duration duration;
140     DG dg;
141     Receiver receiver;
142     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
143     alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this)));
144     Op op;
145     bool emitAtStart;
146     @disable this(this);
147     @disable this(ref return scope typeof(this) rhs);
148     this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) {
149       this.duration = duration;
150       this.dg = dg;
151       this.receiver = receiver;
152       this.emitAtStart = emitAtStart;
153     }
154     void start() @trusted nothrow {
155       try {
156         if (emitAtStart) {
157           emitAtStart = false;
158           dg();
159           if (receiver.getStopToken.isStopRequested) {
160             receiver.setDone();
161             return;
162           }
163         }
164         load();
165       } catch (Exception e) {
166         receiver.setError(e);
167       }
168     }
169     private void load() @trusted nothrow {
170       try {
171         op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
172         op.start();
173       } catch (Exception e) {
174         receiver.setError(e);
175       }
176     }
177   }
178   static struct Sender {
179     alias Value = void;
180     Duration duration;
181     DG dg;
182     bool emitAtStart;
183     auto connect(Receiver)(return Receiver receiver) @safe scope return {
184       // ensure NRVO
185       auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart);
186       return op;
187     }
188   }
189   static struct IntervalStream {
190     alias ElementType = void;
191     Duration duration;
192     bool emitAtStart = false;
193     auto collect(DG dg) @safe {
194       return Sender(duration, dg, emitAtStart);
195     }
196   }
197   return IntervalStream(duration, emitAtStart);
198 }
199 
200 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) {
201   alias Properties = StreamProperties!Stream;
202   alias DG = Properties.DG;
203   static struct ViaStreamOp(Receiver) {
204     import std.traits : ReturnType;
205     import concurrency.operations.via : senderVia = via;
206     alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver);
207     Op op;
208     @disable this(ref return scope typeof(this) rhs);
209     @disable this(this);
210     this(Stream stream, Sender sender, DG dg, Receiver receiver) {
211       op = stream.collect(dg).senderVia(sender).connect(receiver);
212     }
213     void start() nothrow @safe {
214       op.start();
215     }
216   }
217   return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender);
218 }
219 
220 auto doneStream() {
221   alias DG = CollectDelegate!void;
222   static struct DoneStreamOp(Receiver) {
223     Receiver receiver;
224     @disable this(ref return scope typeof(this) rhs);
225     @disable this(this);
226     this(DG dg, Receiver receiver) {
227       this.receiver = receiver;
228     }
229     void start() nothrow @safe {
230       receiver.setDone();
231     }
232   }
233   return fromStreamOp!(void, void, DoneStreamOp)();
234 }
235 
236 auto errorStream(Exception e) {
237   alias DG = CollectDelegate!void;
238   static struct ErrorStreamOp(Receiver) {
239     Exception e;
240     Receiver receiver;
241     @disable this(ref return scope typeof(this) rhs);
242     @disable this(this);
243     this(Exception e, DG dg, Receiver receiver) {
244       this.e = e;
245       this.receiver = receiver;
246     }
247     void start() nothrow @safe {
248       receiver.setError(e);
249     }
250   }
251   return fromStreamOp!(void, void, ErrorStreamOp)(e);
252 }
253 
254 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection.
255 auto sharedStream(T)() {
256   import concurrency.slist;
257   alias DG = CollectDelegate!(T);
258   return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG));
259 }
260 
261 shared struct SharedStream(T) {
262   alias ElementType = T;
263   alias SubscriberDG = void delegate(T) nothrow @safe shared;
264   import concurrency.slist;
265   private {
266     alias DG = CollectDelegate!T;
267     static struct Op(Receiver) {
268       shared SharedStream!T source;
269       DG dg;
270       Receiver receiver;
271       StopCallback cb;
272       @disable this(ref return scope typeof(this) rhs);
273       @disable this(this);
274       void start() nothrow @trusted {
275         auto stopToken = receiver.getStopToken();
276         cb = stopToken.onStop(&(cast(shared)this).onStop);
277         if (stopToken.isStopRequested) {
278           cb.dispose();
279           receiver.setDone();
280         } else {
281           source.add(&(cast(shared)this).onItem);
282         }
283       }
284       void onStop() nothrow @safe shared {
285         with(unshared) {
286           source.remove(&this.onItem);
287           receiver.setDone();
288         }
289       }
290       void onItem(T element) nothrow @safe shared {
291         with(unshared) {
292           try {
293             dg(element);
294           } catch (Exception e) {
295             source.remove(&this.onItem);
296             cb.dispose();
297             receiver.setError(e);
298           }
299         }
300       }
301       private auto ref unshared() nothrow @trusted shared {
302         return cast()this;
303       }
304     }
305     static struct SharedStreamSender {
306       alias Value = void;
307       shared SharedStream!T source;
308       DG dg;
309       auto connect(Receiver)(return Receiver receiver) @safe scope return {
310         // ensure NRVO
311         auto op = Op!(Receiver)(source, dg, receiver);
312         return op;
313       }
314     }
315     shared SList!SubscriberDG dgs;
316   }
317   this(shared SList!SubscriberDG dgs) {
318     this.dgs = dgs;
319   }
320   void emit(T t) nothrow @trusted {
321     foreach(dg; dgs[])
322       dg(t);
323   }
324   private void remove(SubscriberDG dg) nothrow @trusted {
325     dgs.remove(dg);
326   }
327   private void add(SubscriberDG dg) nothrow @trusted {
328     dgs.pushBack(dg);
329   }
330   auto collect(DG dg) @safe {
331     return SharedStreamSender(this, dg);
332   }
333 }