1 module concurrency.stream;
2 
3 import concurrency.stoptoken;
4 import concurrency.receiver;
5 public import concurrency.stream.stream;
6 public import concurrency.stream.filter;
7 public import concurrency.stream.take;
8 public import concurrency.stream.transform;
9 public import concurrency.stream.scan;
10 public import concurrency.stream.sample;
11 public import concurrency.stream.tolist;
12 public import concurrency.stream.slide;
13 public import concurrency.stream.throttling;
14 public import concurrency.stream.cycle;
15 import concurrency.sender : isSender, OpType;
16 import concepts;
17 import std.traits : hasFunctionAttributes;
18 
19 /*
20   catch?
21   combineLatest
22   count
23   debounce
24   distinctUntilChanged
25   drop
26   dropWhile
27   filter
28   first
29   firstOrNull
30   flatMapConcat
31   flatMapLatest
32   flatMapMerge
33   fold
34   map
35   mapLatest
36   merge
37   onEach
38   onEmpty
39   onStart
40   onSubscription
41   reduce (fold with no seed)
42   retry
43   retryWhen
44   runningReduce
45   sample
46   scan (like runningReduce but with initial value)
47   take
48   takeWhile
49   toList
50   transform
51   transformLatest
52   zip
53 */
54 
55 /// Stream that emit the same value until cancelled
56 auto infiniteStream(T)(T t) {
57   alias DG = CollectDelegate!(T);
58   struct Loop {
59     T val;
60     void loop(StopToken)(DG emit, StopToken stopToken) {
61       while(!stopToken.isStopRequested)
62         emit(val);
63     }
64   }
65   return Loop(t).loopStream!T;
66 }
67 
68 /// Stream that emits from start..end or until cancelled
69 auto iotaStream(T)(T start, T end) {
70   alias DG = CollectDelegate!(T);
71   struct Loop {
72     T b,e;
73     void loop(StopToken)(DG emit, StopToken stopToken) {
74       foreach(i; b..e) {
75         emit(i);
76         if (stopToken.isStopRequested)
77           break;
78       }
79     }
80   }
81   return Loop(start, end).loopStream!T;
82 }
83 
84 /// Stream that emits each value from the array or until cancelled
85 auto arrayStream(T)(T[] arr) {
86   alias DG = CollectDelegate!(T);
87   struct Loop {
88     T[] arr;
89     void loop(StopToken)(DG emit, StopToken stopToken) @safe {
90       foreach(item; arr) {
91         emit(item);
92         if (stopToken.isStopRequested)
93           break;
94       }
95     }
96   }
97   return Loop(arr).loopStream!T;
98 }
99 
100 import core.time : Duration;
101 
102 auto intervalStream(Duration duration, bool emitAtStart = false) {
103   alias DG = CollectDelegate!(void);
104   static struct ItemReceiver(Op) {
105     Op* op;
106     void setValue() @safe {
107       if (op.receiver.getStopToken.isStopRequested) {
108         op.receiver.setDone();
109         return;
110       }
111       try {
112         op.dg();
113         if (op.receiver.getStopToken.isStopRequested) {
114           op.receiver.setDone();
115           return;
116         }
117         op.load();
118       } catch (Exception e) {
119         op.receiver.setError(e);
120       }
121     }
122     void setDone() @safe nothrow {
123       op.receiver.setDone();
124     }
125     void setError(Throwable t) @safe nothrow {
126       op.receiver.setError(t);
127     }
128     auto getStopToken() @safe {
129       return op.receiver.getStopToken();
130     }
131     auto getScheduler() @safe {
132       return op.receiver.getScheduler();
133     }
134   }
135   static struct Op(Receiver) {
136     import std.traits : ReturnType;
137     Duration duration;
138     DG dg;
139     Receiver receiver;
140     alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter);
141     alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this)));
142     Op op;
143     bool emitAtStart;
144     @disable this(this);
145     @disable this(ref return scope typeof(this) rhs);
146     this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) {
147       this.duration = duration;
148       this.dg = dg;
149       this.receiver = receiver;
150       this.emitAtStart = emitAtStart;
151     }
152     void start() @trusted nothrow {
153       try {
154         if (emitAtStart) {
155           emitAtStart = false;
156           dg();
157           if (receiver.getStopToken.isStopRequested) {
158             receiver.setDone();
159             return;
160           }
161         }
162         load();
163       } catch (Exception e) {
164         receiver.setError(e);
165       }
166     }
167     private void load() @trusted nothrow {
168       try {
169         op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this));
170         op.start();
171       } catch (Exception e) {
172         receiver.setError(e);
173       }
174     }
175   }
176   static struct Sender {
177     alias Value = void;
178     Duration duration;
179     DG dg;
180     bool emitAtStart;
181     auto connect(Receiver)(return Receiver receiver) @safe scope return {
182       // ensure NRVO
183       auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart);
184       return op;
185     }
186   }
187   static struct IntervalStream {
188     alias ElementType = void;
189     Duration duration;
190     bool emitAtStart = false;
191     auto collect(DG dg) @safe {
192       return Sender(duration, dg, emitAtStart);
193     }
194   }
195   return IntervalStream(duration, emitAtStart);
196 }
197 
198 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) {
199   alias Properties = StreamProperties!Stream;
200   alias DG = Properties.DG;
201   static struct ViaStreamOp(Receiver) {
202     import std.traits : ReturnType;
203     import concurrency.operations.via : senderVia = via;
204     alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver);
205     Op op;
206     @disable this(ref return scope typeof(this) rhs);
207     @disable this(this);
208     this(Stream stream, Sender sender, DG dg, Receiver receiver) {
209       op = stream.collect(dg).senderVia(sender).connect(receiver);
210     }
211     void start() nothrow @safe {
212       op.start();
213     }
214   }
215   return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender);
216 }
217 
218 auto doneStream() {
219   alias DG = CollectDelegate!void;
220   static struct DoneStreamOp(Receiver) {
221     Receiver receiver;
222     @disable this(ref return scope typeof(this) rhs);
223     @disable this(this);
224     this(DG dg, Receiver receiver) {
225       this.receiver = receiver;
226     }
227     void start() nothrow @safe {
228       receiver.setDone();
229     }
230   }
231   return fromStreamOp!(void, void, DoneStreamOp)();
232 }
233 
234 auto errorStream(Exception e) {
235   alias DG = CollectDelegate!void;
236   static struct ErrorStreamOp(Receiver) {
237     Exception e;
238     Receiver receiver;
239     @disable this(ref return scope typeof(this) rhs);
240     @disable this(this);
241     this(Exception e, DG dg, Receiver receiver) {
242       this.e = e;
243       this.receiver = receiver;
244     }
245     void start() nothrow @safe {
246       receiver.setError(e);
247     }
248   }
249   return fromStreamOp!(void, void, ErrorStreamOp)(e);
250 }
251 
252 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection.
253 auto sharedStream(T)() {
254   import concurrency.slist;
255   alias DG = CollectDelegate!(T);
256   return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG));
257 }
258 
259 shared struct SharedStream(T) {
260   alias ElementType = T;
261   alias SubscriberDG = void delegate(T) nothrow @safe shared;
262   import concurrency.slist;
263   private {
264     alias DG = CollectDelegate!T;
265     static struct Op(Receiver) {
266       shared SharedStream!T source;
267       DG dg;
268       Receiver receiver;
269       StopCallback cb;
270       @disable this(ref return scope typeof(this) rhs);
271       @disable this(this);
272       void start() nothrow @trusted {
273         auto stopToken = receiver.getStopToken();
274         cb = stopToken.onStop(&(cast(shared)this).onStop);
275         if (stopToken.isStopRequested) {
276           cb.dispose();
277           receiver.setDone();
278         } else {
279           source.add(&(cast(shared)this).onItem);
280         }
281       }
282       void onStop() nothrow @safe shared {
283         with(unshared) {
284           source.remove(&this.onItem);
285           receiver.setDone();
286         }
287       }
288       void onItem(T element) nothrow @safe shared {
289         with(unshared) {
290           try {
291             dg(element);
292           } catch (Exception e) {
293             source.remove(&this.onItem);
294             cb.dispose();
295             receiver.setError(e);
296           }
297         }
298       }
299       private auto ref unshared() nothrow @trusted shared {
300         return cast()this;
301       }
302     }
303     static struct SharedStreamSender {
304       alias Value = void;
305       shared SharedStream!T source;
306       DG dg;
307       auto connect(Receiver)(return Receiver receiver) @safe scope return {
308         // ensure NRVO
309         auto op = Op!(Receiver)(source, dg, receiver);
310         return op;
311       }
312     }
313     shared SList!SubscriberDG dgs;
314   }
315   this(shared SList!SubscriberDG dgs) {
316     this.dgs = dgs;
317   }
318   void emit(T t) nothrow @trusted {
319     foreach(dg; dgs[])
320       dg(t);
321   }
322   private void remove(SubscriberDG dg) nothrow @trusted {
323     dgs.remove(dg);
324   }
325   private void add(SubscriberDG dg) nothrow @trusted {
326     dgs.pushBack(dg);
327   }
328   auto collect(DG dg) @safe {
329     return SharedStreamSender(this, dg);
330   }
331 }