1 module concurrency.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 public import concurrency.stream.stream;
6 public import concurrency.stream.filter;
7 public import concurrency.stream.take;
8 public import concurrency.stream.transform;
9 public import concurrency.stream.scan;
10 public import concurrency.stream.sample;
11 public import concurrency.stream.tolist;
12 public import concurrency.stream.slide;
13 public import concurrency.stream.throttling;
14 public import concurrency.stream.cycle;
15 public import concurrency.stream.cron;
16 public import concurrency.stream.defer;
17 public import concurrency.stream.flatmapconcat;
18 public import concurrency.stream.flatmaplatest;
19 import concurrency.sender : isSender, OpType;
20 import concepts;
21 import std.traits : hasFunctionAttributes;
22 
23 /*
24   catch?
25   combineLatest
26   count
27   debounce
28   distinctUntilChanged
29   drop
30   dropWhile
31   filter
32   first
33   firstOrNull
34   flatMapConcat
35   flatMapLatest
36   flatMapMerge
37   fold
38   map
39   mapLatest
40   merge
41   onEach
42   onEmpty
43   onStart
44   onSubscription
45   reduce (fold with no seed)
46   retry
47   retryWhen
48   runningReduce
49   sample
50   scan (like runningReduce but with initial value)
51   take
52   takeWhile
53   toList
54   transform
55   transformLatest
56   zip
57 */
58 
59 /// Stream that emit the same value until cancelled
60 auto infiniteStream(T)(T t) {
61   alias DG = CollectDelegate!(T);
62   struct Loop {
63     T val;
64     void loop(StopToken)(DG emit, StopToken stopToken) {
65       while(!stopToken.isStopRequested)
66         emit(val);
67     }
68   }
69   return Loop(t).loopStream!T;
70 }
71 
72 /// Stream that emits from start..end or until cancelled
73 auto iotaStream(T)(T start, T end) {
74   alias DG = CollectDelegate!(T);
75   struct Loop {
76     T b,e;
77     void loop(StopToken)(DG emit, StopToken stopToken) {
78       foreach(i; b..e) {
79         emit(i);
80         if (stopToken.isStopRequested)
81           break;
82       }
83     }
84   }
85   return Loop(start, end).loopStream!T;
86 }
87 
88 /// Stream that emits each value from the array or until cancelled
89 auto arrayStream(T)(T[] arr) {
90   alias DG = CollectDelegate!(T);
91   struct Loop {
92     T[] arr;
93     void loop(StopToken)(DG emit, StopToken stopToken) @safe {
94       foreach(item; arr) {
95         emit(item);
96         if (stopToken.isStopRequested)
97           break;
98       }
99     }
100   }
101   return Loop(arr).loopStream!T;
102 }
103 
104 import core.time : Duration;
105 
106 auto intervalStream(Duration duration, bool emitAtStart = false) {
107   alias DG = CollectDelegate!(void);
108   static struct ItemReceiver(Op) {
109     Op* op;
110     void setValue() @safe {
111       if (op.receiver.getStopToken.isStopRequested) {
112         op.receiver.setDone();
113         return;
114       }
115       try {
116         op.dg();
117         if (op.receiver.getStopToken.isStopRequested) {
118           op.receiver.setDone();
119           return;
120         }
121         op.load();
122       } catch (Exception e) {
123         op.receiver.setError(e);
124       }
125     }
126     void setDone() @safe nothrow {
127       op.receiver.setDone();
128     }
129     void setError(Throwable t) @safe nothrow {
130       op.receiver.setError(t);
131     }
132     auto getStopToken() @safe {
133       return op.receiver.getStopToken();
134     }
135     auto getScheduler() @safe {
136       return op.receiver.getScheduler();
137     }
138   }
139   static struct Op(Receiver) {
140     import std.traits : ReturnType;
141     Duration duration;
142     DG dg;
143     Receiver receiver;
144     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
145     alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this)));
146     Op op;
147     bool emitAtStart;
148     @disable this(this);
149     @disable this(ref return scope typeof(this) rhs);
150     this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) {
151       this.duration = duration;
152       this.dg = dg;
153       this.receiver = receiver;
154       this.emitAtStart = emitAtStart;
155     }
156     void start() @trusted nothrow scope {
157       try {
158         if (emitAtStart) {
159           emitAtStart = false;
160           dg();
161           if (receiver.getStopToken.isStopRequested) {
162             receiver.setDone();
163             return;
164           }
165         }
166         load();
167       } catch (Exception e) {
168         receiver.setError(e);
169       }
170     }
171     private void load() @trusted nothrow {
172       try {
173         op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
174         op.start();
175       } catch (Exception e) {
176         receiver.setError(e);
177       }
178     }
179   }
180   static struct Sender {
181     alias Value = void;
182     Duration duration;
183     DG dg;
184     bool emitAtStart;
185     auto connect(Receiver)(return Receiver receiver) @safe return scope {
186       // ensure NRVO
187       auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart);
188       return op;
189     }
190   }
191   static struct IntervalStream {
192     alias ElementType = void;
193     Duration duration;
194     bool emitAtStart = false;
195     auto collect(DG dg) @safe {
196       return Sender(duration, dg, emitAtStart);
197     }
198   }
199   return IntervalStream(duration, emitAtStart);
200 }
201 
202 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) {
203   alias Properties = StreamProperties!Stream;
204   alias DG = Properties.DG;
205   static struct ViaStreamOp(Receiver) {
206     import std.traits : ReturnType;
207     import concurrency.operations.via : senderVia = via;
208     alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver);
209     Op op;
210     @disable this(ref return scope typeof(this) rhs);
211     @disable this(this);
212     this(Stream stream, Sender sender, DG dg, Receiver receiver) @trusted {
213       op = stream.collect(dg).senderVia(sender).connect(receiver);
214     }
215     void start() nothrow @safe {
216       op.start();
217     }
218   }
219   return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender);
220 }
221 
222 auto doneStream() {
223   alias DG = CollectDelegate!void;
224   static struct DoneStreamOp(Receiver) {
225     Receiver receiver;
226     @disable this(ref return scope typeof(this) rhs);
227     @disable this(this);
228     this(DG dg, Receiver receiver) {
229       this.receiver = receiver;
230     }
231     void start() nothrow @safe {
232       receiver.setDone();
233     }
234   }
235   return fromStreamOp!(void, void, DoneStreamOp)();
236 }
237 
238 auto errorStream(Exception e) {
239   alias DG = CollectDelegate!void;
240   static struct ErrorStreamOp(Receiver) {
241     Exception e;
242     Receiver receiver;
243     @disable this(ref return scope typeof(this) rhs);
244     @disable this(this);
245     this(Exception e, DG dg, Receiver receiver) {
246       this.e = e;
247       this.receiver = receiver;
248     }
249     void start() nothrow @safe {
250       receiver.setError(e);
251     }
252   }
253   return fromStreamOp!(void, void, ErrorStreamOp)(e);
254 }
255 
256 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection.
257 auto sharedStream(T)() {
258   import concurrency.slist;
259   alias DG = CollectDelegate!(T);
260   return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG));
261 }
262 
263 shared struct SharedStream(T) {
264   alias ElementType = T;
265   alias SubscriberDG = void delegate(T) nothrow @safe shared;
266   import concurrency.slist;
267   private {
268     alias DG = CollectDelegate!T;
269     static struct Op(Receiver) {
270       shared SharedStream!T source;
271       DG dg;
272       Receiver receiver;
273       StopCallback cb;
274       @disable this(ref return scope typeof(this) rhs);
275       @disable this(this);
276       void start() nothrow @trusted {
277         auto stopToken = receiver.getStopToken();
278         cb = stopToken.onStop(&(cast(shared)this).onStop);
279         if (stopToken.isStopRequested) {
280           cb.dispose();
281           receiver.setDone();
282         } else {
283           source.add(&(cast(shared)this).onItem);
284         }
285       }
286       void onStop() nothrow @safe shared {
287         with(unshared) {
288           source.remove(&this.onItem);
289           receiver.setDone();
290         }
291       }
292       void onItem(T element) nothrow @safe shared {
293         with(unshared) {
294           try {
295             dg(element);
296           } catch (Exception e) {
297             source.remove(&this.onItem);
298             cb.dispose();
299             receiver.setError(e);
300           }
301         }
302       }
303       private auto ref unshared() nothrow @trusted shared {
304         return cast()this;
305       }
306     }
307     static struct SharedStreamSender {
308       alias Value = void;
309       shared SharedStream!T source;
310       DG dg;
311       auto connect(Receiver)(return Receiver receiver) @safe return scope {
312         // ensure NRVO
313         auto op = Op!(Receiver)(source, dg, receiver);
314         return op;
315       }
316     }
317     shared SList!SubscriberDG dgs;
318   }
319   this(shared SList!SubscriberDG dgs) {
320     this.dgs = dgs;
321   }
322   void emit(T t) nothrow @trusted {
323     foreach(dg; dgs[])
324       dg(t);
325   }
326   private void remove(SubscriberDG dg) nothrow @trusted {
327     dgs.remove(dg);
328   }
329   private void add(SubscriberDG dg) nothrow @trusted {
330     dgs.pushBack(dg);
331   }
332   auto collect(DG dg) @safe {
333     return SharedStreamSender(this, dg);
334   }
335 }