1 module ut.concurrency.stream;
2 
3 import concurrency.stream;
4 import concurrency;
5 import unit_threaded;
6 import concurrency.stoptoken;
7 import core.atomic;
8 import concurrency.thread : ThreadSender;
9 
10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.
11 
12 @("arrayStream")
13 @safe unittest {
14   shared int p = 0;
15   [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
16   p.should == 6;
17 }
18 
19 @("timerStream")
20 @safe unittest {
21   import concurrency.operations : withStopSource, whenAll, via;
22   import core.time : msecs;
23   shared int s = 0, f = 0;
24   auto source = new shared StopSource();
25   auto slow = 10.msecs.intervalStream().collect(() shared { s.atomicOp!"+="(1); source.stop(); }).withStopSource(source);
26   auto fast = 3.msecs.intervalStream().collect(() shared { f.atomicOp!"+="(1); });
27   whenAll(slow, fast).syncWait(source).isCancelled.should == true;
28   s.should == 1;
29   f.shouldBeGreaterThan(1);
30 }
31 
32 
33 @("infiniteStream.stop")
34 @safe unittest {
35   import concurrency.operations : withStopSource;
36   shared int g = 0;
37   auto source = new shared StopSource();
38   infiniteStream(5).collect((int n) shared {
39       if (g < 14)
40         g.atomicOp!"+="(n);
41       else
42         source.stop();
43     })
44     .withStopSource(source).syncWait.isCancelled.should == true;
45   g.should == 15;
46 };
47 
48 @("infiniteStream.take")
49 @safe unittest {
50   shared int g = 0;
51   infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
52   g.should == 20;
53 }
54 
55 @("iotaStream")
56 @safe unittest {
57   import concurrency.stoptoken;
58   shared int g = 0;
59   iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
60   g.should == 10;
61 }
62 
63 @("loopStream")
64 @safe unittest {
65   struct Loop {
66     size_t b,e;
67     void loop(DG, StopToken)(DG emit, StopToken stopToken) {
68       foreach(i; b..e)
69         emit(i);
70     }
71   }
72   shared int g = 0;
73   Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
74   g.should == 6;
75 }
76 
77 @("toStreamObject")
78 @safe unittest {
79   import core.atomic : atomicOp;
80 
81   static StreamObjectBase!int getStream() {
82     return [1,2,3].arrayStream().toStreamObject();
83   }
84   shared int p;
85 
86   getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
87 
88   p.should == 6;
89 }
90 
91 
92 @("toStreamObject.take")
93 @safe unittest {
94   static StreamObjectBase!int getStream() {
95     return [1,2,3].arrayStream().toStreamObject();
96   }
97   shared int p;
98 
99   getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
100 
101   p.should == 3;
102 }
103 
104 @("toStreamObject.void")
105 @safe unittest {
106   import core.time : msecs;
107   shared bool p = false;
108 
109   1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true;
110 
111   p.should == true;
112 }
113 
114 @("transform.int.double")
115 @safe unittest {
116   shared int p = 0;
117   [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
118   p.should == 18;
119 }
120 
121 @("transform.int.bool")
122 @safe unittest {
123   shared int p = 0;
124   [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true;
125   p.should == 1;
126 }
127 
128 @("scan")
129 @safe unittest {
130   shared int p = 0;
131   [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
132   p.should == 10;
133 }
134 
135 @("scan.void-value")
136 @safe unittest {
137   import core.time;
138   shared int p = 0;
139   5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
140   p.should == 6;
141 }
142 
143 @("take.enough")
144 @safe unittest {
145   shared int p = 0;
146 
147   [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
148   p.should == 3;
149 }
150 
151 @("take.too-few")
152 @safe unittest {
153   shared int p = 0;
154 
155   [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
156   p.should == 6;
157 }
158 
159 @("take.donestream")
160 @safe unittest {
161   doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true;
162 }
163 
164 @("take.errorstream")
165 @safe unittest {
166   errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad");
167 }
168 
169 @("sample.trigger.stop")
170 @safe unittest {
171   import core.time;
172   auto sampler = 7.msecs.intervalStream()
173     .scan((int acc) => acc+1, 0)
174     .sample(10.msecs.intervalStream().take(3))
175     .collect((int i) shared {})
176     .syncWait().isOk.should == true;
177 }
178 
179 @("sample.slower")
180 @safe unittest {
181   import core.time;
182   import concurrency.operations : withScheduler, whenAll;
183   import concurrency.sender : justFrom;
184 
185   shared int p = 0;
186   import concurrency.scheduler : ManualTimeWorker;
187 
188   auto worker = new shared ManualTimeWorker();
189 
190   auto sampler = 7.msecs
191     .intervalStream()
192     .scan((int acc) => acc+1, 0)
193     .sample(10.msecs.intervalStream())
194     .take(3)
195     .collect((int i) shared { p.atomicOp!"+="(i); })
196     .withScheduler(worker.getScheduler);
197 
198   auto driver = justFrom(() shared {
199       worker.advance(7.msecs);
200       p.atomicLoad.should == 0;
201       worker.timeUntilNextEvent().should == 3.msecs;
202 
203       worker.advance(3.msecs);
204       p.atomicLoad.should == 1;
205       worker.timeUntilNextEvent().should == 4.msecs;
206 
207       worker.advance(4.msecs);
208       p.atomicLoad.should == 1;
209       worker.timeUntilNextEvent().should == 6.msecs;
210 
211       worker.advance(6.msecs);
212       p.atomicLoad.should == 3;
213       worker.timeUntilNextEvent().should == 1.msecs;
214 
215       worker.advance(1.msecs);
216       p.atomicLoad.should == 3;
217       worker.timeUntilNextEvent().should == 7.msecs;
218 
219       worker.advance(7.msecs);
220       p.atomicLoad.should == 3;
221       worker.timeUntilNextEvent().should == 2.msecs;
222 
223       worker.advance(2.msecs);
224       p.atomicLoad.should == 7;
225       worker.timeUntilNextEvent().should == null;
226     });
227 
228   whenAll(sampler, driver).syncWait().isOk.should == true;
229 
230   p.should == 7;
231 }
232 
233 @("sample.faster")
234 @safe unittest {
235   import core.time;
236 
237   shared int p = 0;
238 
239   7.msecs
240     .intervalStream()
241     .scan((int acc) => acc+1, 0)
242     .sample(3.msecs.intervalStream())
243     .take(3)
244     .collect((int i) shared { p.atomicOp!"+="(i); })
245     .syncWait().isOk.should == true;
246 
247   p.should == 6;
248 }
249 
250 @("sharedStream")
251 @safe unittest {
252   import concurrency.operations : then, race;
253 
254   auto source = sharedStream!int;
255 
256   shared int p = 0;
257 
258   auto emitter = ThreadSender().then(() shared {
259       source.emit(6);
260       source.emit(12);
261     });
262   auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); });
263 
264   race(collector, emitter).syncWait().isOk.should == true;
265 
266   p.atomicLoad.should == 18;
267 }
268 
269 @("throttling.throttleLast")
270 @safe unittest {
271   import core.time;
272 
273   shared int p = 0;
274 
275   1.msecs
276     .intervalStream()
277     .scan((int acc) => acc+1, 0)
278     .throttleLast(3.msecs)
279     .take(6)
280     .collect((int i) shared { p.atomicOp!"+="(i); })
281     .syncWait().isOk.should == true;
282 
283   p.atomicLoad.shouldBeGreaterThan(40);
284 }
285 
286 @("throttling.throttleLast.arrayStream")
287 @safe unittest {
288   import core.time;
289 
290   shared int p = 0;
291 
292   [1,2,3].arrayStream()
293     .throttleLast(30.msecs)
294     .collect((int i) shared { p.atomicOp!"+="(i); })
295     .syncWait().isOk.should == true;
296 
297   p.atomicLoad.should == 3;
298 }
299 
300 @("throttling.throttleLast.exception")
301 @safe unittest {
302   import core.time;
303 
304   1.msecs
305     .intervalStream()
306     .throttleLast(10.msecs)
307     .collect(() shared { throw new Exception("Bla"); })
308     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
309 }
310 
311 @("throttling.throttleLast.thread")
312 @safe unittest {
313   import core.time;
314 
315   shared int p = 0;
316 
317   1.msecs
318     .intervalStream()
319     .via(ThreadSender())
320     .scan((int acc) => acc+1, 0)
321     .throttleLast(3.msecs)
322     .take(6)
323     .collect((int i) shared { p.atomicOp!"+="(i); })
324     .syncWait().isOk.should == true;
325 
326   p.atomicLoad.shouldBeGreaterThan(40);
327 }
328 
329 @("throttling.throttleLast.thread.arrayStream")
330 @safe unittest {
331   import core.time;
332 
333   shared int p = 0;
334 
335   [1,2,3].arrayStream()
336     .via(ThreadSender())
337     .throttleLast(30.msecs)
338     .collect((int i) shared { p.atomicOp!"+="(i); })
339     .syncWait().isOk.should == true;
340 
341   p.atomicLoad.should == 3;
342 }
343 
344 @("throttling.throttleLast.thread.exception")
345 @safe unittest {
346   import core.time;
347 
348   1.msecs
349     .intervalStream()
350     .via(ThreadSender())
351     .throttleLast(10.msecs)
352     .collect(() shared { throw new Exception("Bla"); })
353     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
354 }
355 
356 @("throttling.throttleFirst")
357 @safe unittest {
358   import core.time;
359   import concurrency.scheduler : ManualTimeWorker;
360   import concurrency.operations : withScheduler, whenAll;
361   import concurrency.sender : justFrom;
362 
363   shared int p = 0;
364   auto worker = new shared ManualTimeWorker();
365 
366   auto throttled = 1.msecs
367     .intervalStream()
368     .scan((int acc) => acc+1, 0)
369     .throttleFirst(3.msecs)
370     .take(2)
371     .collect((int i) shared { p.atomicOp!"+="(i); })
372     .withScheduler(worker.getScheduler);
373 
374   auto driver = justFrom(() shared {
375       p.atomicLoad.should == 0;
376 
377       worker.advance(1.msecs);
378       p.atomicLoad.should == 1;
379 
380       worker.advance(1.msecs);
381       p.atomicLoad.should == 1;
382 
383       worker.advance(1.msecs);
384       p.atomicLoad.should == 1;
385 
386       worker.advance(1.msecs);
387       p.atomicLoad.should == 5;
388 
389       worker.timeUntilNextEvent().should == null;
390     });
391   whenAll(throttled, driver).syncWait().isOk.should == true;
392 
393   p.should == 5;
394 }
395 
396 @("throttling.debounce")
397 @safe unittest {
398   import core.time;
399   import concurrency.scheduler : ManualTimeWorker;
400   import concurrency.operations : withScheduler, whenAll;
401   import concurrency.sender : justFrom;
402 
403   shared int p = 0;
404   auto worker = new shared ManualTimeWorker();
405   auto source = sharedStream!int;
406 
407   auto throttled = source
408     .debounce(3.msecs)
409     .take(2)
410     .collect((int i) shared { p.atomicOp!"+="(i); })
411     .withScheduler(worker.getScheduler);
412 
413   auto driver = justFrom(() shared {
414       source.emit(1);
415       p.atomicLoad.should == 0;
416       worker.timeUntilNextEvent().should == 3.msecs;
417 
418       worker.advance(3.msecs);
419       p.atomicLoad.should == 1;
420 
421       source.emit(2);
422       p.atomicLoad.should == 1;
423       worker.timeUntilNextEvent().should == 3.msecs;
424 
425       source.emit(3);
426       p.atomicLoad.should == 1;
427       worker.timeUntilNextEvent().should == 3.msecs;
428 
429       worker.advance(1.msecs);
430       p.atomicLoad.should == 1;
431       worker.timeUntilNextEvent().should == 2.msecs;
432 
433       source.emit(4);
434       p.atomicLoad.should == 1;
435       worker.timeUntilNextEvent().should == 3.msecs;
436 
437       worker.advance(3.msecs);
438       p.atomicLoad.should == 5;
439 
440       worker.timeUntilNextEvent().should == null;
441     });
442   whenAll(throttled, driver).syncWait().isOk.should == true;
443 
444   p.should == 5;
445 }
446 
447 @("slide")
448 @safe unittest {
449   import std.stdio;
450   import std.functional : toDelegate;
451   import std.algorithm : sum;
452   shared int p;
453 
454   [1,2,3,4,5,6,7].arrayStream
455     .slide(3)
456     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
457     .syncWait.isOk.should == true;
458 
459   p.should == 60;
460 
461   [1,2].arrayStream
462     .slide(3)
463     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
464     .syncWait.isOk.should == true;
465 
466   p.should == 60;
467 }
468 
469 @("slide.step")
470 @safe unittest {
471   import std.stdio;
472   import std.functional : toDelegate;
473   import std.algorithm : sum;
474   shared int p;
475 
476   [1,2,3,4,5,6,7].arrayStream
477     .slide(3, 2)
478     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
479     .syncWait.isOk.should == true;
480 
481   p.should == 36;
482 
483   [1,2].arrayStream
484     .slide(2, 2)
485     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
486     .syncWait.isOk.should == true;
487 
488   p.should == 39;
489 }
490 
491 @("toList.arrayStream")
492 @safe unittest {
493   [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3];
494 }
495 
496 @("toList.arrayStream.whenAll")
497 @safe unittest {
498   import concurrency.operations : withScheduler, whenAll;
499   import std.typecons : tuple;
500   auto s1 = [1,2,3].arrayStream.toList;
501   auto s2 = [2,3,4].arrayStream.toList;
502   whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]);
503 }