1 module ut.concurrency.stream;
2 
3 import concurrency.stream;
4 import concurrency;
5 import unit_threaded;
6 import concurrency.stoptoken;
7 import core.atomic;
8 import concurrency.thread : ThreadSender;
9 
10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.
11 
12 @("arrayStream")
13 @safe unittest {
14   shared int p = 0;
15   [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
16   p.should == 6;
17 }
18 
19 @("intervalStream")
20 @safe unittest {
21   import core.time;
22   import concurrency.operations : withScheduler, whenAll;
23   import concurrency.sender : justFrom;
24   import concurrency.scheduler : ManualTimeWorker;
25 
26   auto worker = new shared ManualTimeWorker();
27   auto interval = 5.msecs.intervalStream()
28     .take(2)
29     .collect(() shared {})
30     .withScheduler(worker.getScheduler);
31 
32   auto driver = justFrom(() shared {
33       worker.timeUntilNextEvent().should == 5.msecs;
34       worker.advance(5.msecs);
35 
36       worker.timeUntilNextEvent().should == 5.msecs;
37       worker.advance(5.msecs);
38 
39       worker.timeUntilNextEvent().should == null;
40     });
41 
42   whenAll(interval, driver).syncWait().assumeOk;
43 }
44 
45 
46 @("infiniteStream.stop")
47 @safe unittest {
48   import concurrency.operations : withStopSource;
49   shared int g = 0;
50   auto source = new shared StopSource();
51   infiniteStream(5).collect((int n) shared {
52       if (g < 14)
53         g.atomicOp!"+="(n);
54       else
55         source.stop();
56     })
57     .withStopSource(source).syncWait.isCancelled.should == true;
58   g.should == 15;
59 };
60 
61 @("infiniteStream.take")
62 @safe unittest {
63   shared int g = 0;
64   infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
65   g.should == 20;
66 }
67 
68 @("iotaStream")
69 @safe unittest {
70   import concurrency.stoptoken;
71   shared int g = 0;
72   iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
73   g.should == 10;
74 }
75 
76 @("loopStream")
77 @safe unittest {
78   struct Loop {
79     size_t b,e;
80     void loop(DG, StopToken)(DG emit, StopToken stopToken) {
81       foreach(i; b..e)
82         emit(i);
83     }
84   }
85   shared int g = 0;
86   Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
87   g.should == 6;
88 }
89 
90 @("toStreamObject")
91 @safe unittest {
92   import core.atomic : atomicOp;
93 
94   static StreamObjectBase!int getStream() {
95     return [1,2,3].arrayStream().toStreamObject();
96   }
97   shared int p;
98 
99   getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().assumeOk;
100 
101   p.should == 6;
102 }
103 
104 
105 @("toStreamObject.take")
106 @safe unittest {
107   static StreamObjectBase!int getStream() {
108     return [1,2,3].arrayStream().toStreamObject();
109   }
110   shared int p;
111 
112   getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().assumeOk;
113 
114   p.should == 3;
115 }
116 
117 @("toStreamObject.void")
118 @safe unittest {
119   import core.time : msecs;
120   shared bool p = false;
121 
122   1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().assumeOk;
123 
124   p.should == true;
125 }
126 
127 @("transform.int.double")
128 @safe unittest {
129   shared int p = 0;
130   [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
131   p.should == 18;
132 }
133 
134 @("transform.int.bool")
135 @safe unittest {
136   shared int p = 0;
137   [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().assumeOk;
138   p.should == 1;
139 }
140 
141 @("scan")
142 @safe unittest {
143   shared int p = 0;
144   [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
145   p.should == 10;
146 }
147 
148 @("scan.void-value")
149 @safe unittest {
150   import core.time;
151   shared int p = 0;
152   5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
153   p.should == 6;
154 }
155 
156 @("take.enough")
157 @safe unittest {
158   shared int p = 0;
159 
160   [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk;
161   p.should == 3;
162 }
163 
164 @("take.too-few")
165 @safe unittest {
166   shared int p = 0;
167 
168   [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk;
169   p.should == 6;
170 }
171 
172 @("take.donestream")
173 @safe unittest {
174   doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true;
175 }
176 
177 @("take.errorstream")
178 @safe unittest {
179   errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad");
180 }
181 
182 @("sample.trigger.stop")
183 @safe unittest {
184   import core.time;
185   7.msecs.intervalStream()
186     .scan((int acc) => acc+1, 0)
187     .sample(10.msecs.intervalStream().take(3))
188     .collect((int i) shared {})
189     .syncWait().assumeOk;
190 }
191 
192 @("sample.slower")
193 @safe unittest {
194   import core.time;
195   import concurrency.operations : withScheduler, whenAll;
196   import concurrency.sender : justFrom;
197 
198   shared int p = 0;
199   import concurrency.scheduler : ManualTimeWorker;
200 
201   auto worker = new shared ManualTimeWorker();
202 
203   auto sampler = 7.msecs
204     .intervalStream()
205     .scan((int acc) => acc+1, 0)
206     .sample(10.msecs.intervalStream())
207     .take(3)
208     .collect((int i) shared { p.atomicOp!"+="(i); })
209     .withScheduler(worker.getScheduler);
210 
211   auto driver = justFrom(() shared {
212       worker.advance(7.msecs);
213       p.atomicLoad.should == 0;
214       worker.timeUntilNextEvent().should == 3.msecs;
215 
216       worker.advance(3.msecs);
217       p.atomicLoad.should == 1;
218       worker.timeUntilNextEvent().should == 4.msecs;
219 
220       worker.advance(4.msecs);
221       p.atomicLoad.should == 1;
222       worker.timeUntilNextEvent().should == 6.msecs;
223 
224       worker.advance(6.msecs);
225       p.atomicLoad.should == 3;
226       worker.timeUntilNextEvent().should == 1.msecs;
227 
228       worker.advance(1.msecs);
229       p.atomicLoad.should == 3;
230       worker.timeUntilNextEvent().should == 7.msecs;
231 
232       worker.advance(7.msecs);
233       p.atomicLoad.should == 3;
234       worker.timeUntilNextEvent().should == 2.msecs;
235 
236       worker.advance(2.msecs);
237       p.atomicLoad.should == 7;
238       worker.timeUntilNextEvent().should == null;
239     });
240 
241   whenAll(sampler, driver).syncWait().assumeOk;
242 
243   p.should == 7;
244 }
245 
246 @("sample.faster")
247 @safe unittest {
248   import core.time;
249   import concurrency.operations : withScheduler, whenAll;
250   import concurrency.sender : justFrom;
251 
252   shared int p = 0;
253   import concurrency.scheduler : ManualTimeWorker;
254 
255   auto worker = new shared ManualTimeWorker();
256 
257   auto sampler = 7.msecs
258     .intervalStream()
259     .scan((int acc) => acc+1, 0)
260     .sample(3.msecs.intervalStream())
261     .take(3)
262     .collect((int i) shared { p.atomicOp!"+="(i); })
263     .withScheduler(worker.getScheduler);
264 
265   auto driver = justFrom(() shared {
266       worker.advance(3.msecs);
267       p.atomicLoad.should == 0;
268       worker.timeUntilNextEvent().should == 3.msecs;
269 
270       worker.advance(3.msecs);
271       p.atomicLoad.should == 0;
272       worker.timeUntilNextEvent().should == 1.msecs;
273 
274       worker.advance(1.msecs);
275       p.atomicLoad.should == 0;
276       worker.timeUntilNextEvent().should == 2.msecs;
277 
278       worker.advance(2.msecs);
279       p.atomicLoad.should == 1;
280       worker.timeUntilNextEvent().should == 3.msecs;
281 
282       worker.advance(3.msecs);
283       p.atomicLoad.should == 1;
284       worker.timeUntilNextEvent().should == 2.msecs;
285 
286       worker.advance(2.msecs);
287       p.atomicLoad.should == 1;
288       worker.timeUntilNextEvent().should == 1.msecs;
289 
290       worker.advance(1.msecs);
291       p.atomicLoad.should == 3;
292       worker.timeUntilNextEvent().should == 3.msecs;
293 
294       worker.advance(3.msecs);
295       p.atomicLoad.should == 3;
296       worker.timeUntilNextEvent().should == 3.msecs;
297 
298       worker.advance(3.msecs);
299       p.atomicLoad.should == 6;
300       worker.timeUntilNextEvent().should == null;
301     });
302 
303   whenAll(sampler, driver).syncWait().assumeOk;
304 
305   p.should == 6;
306 }
307 
308 @("sharedStream")
309 @safe unittest {
310   import concurrency.operations : then, race;
311 
312   auto source = sharedStream!int;
313 
314   shared int p = 0;
315 
316   auto emitter = ThreadSender().then(() shared {
317       source.emit(6);
318       source.emit(12);
319     });
320   auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); });
321 
322   race(collector, emitter).syncWait().assumeOk;
323 
324   p.atomicLoad.should == 18;
325 }
326 
327 @("throttling.throttleLast")
328 @safe unittest {
329   import core.time;
330 
331   shared int p = 0;
332 
333   1.msecs
334     .intervalStream()
335     .scan((int acc) => acc+1, 0)
336     .throttleLast(3.msecs)
337     .take(6)
338     .collect((int i) shared { p.atomicOp!"+="(i); })
339     .syncWait().assumeOk;
340 
341   p.atomicLoad.shouldBeGreaterThan(40);
342 }
343 
344 @("throttling.throttleLast.arrayStream")
345 @safe unittest {
346   import core.time;
347 
348   shared int p = 0;
349 
350   [1,2,3].arrayStream()
351     .throttleLast(30.msecs)
352     .collect((int i) shared { p.atomicOp!"+="(i); })
353     .syncWait().assumeOk;
354 
355   p.atomicLoad.should == 3;
356 }
357 
358 @("throttling.throttleLast.exception")
359 @safe unittest {
360   import core.time;
361 
362   1.msecs
363     .intervalStream()
364     .throttleLast(10.msecs)
365     .collect(() shared { throw new Exception("Bla"); })
366     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
367 }
368 
369 @("throttling.throttleLast.thread")
370 @safe unittest {
371   import core.time;
372 
373   shared int p = 0;
374 
375   1.msecs
376     .intervalStream()
377     .via(ThreadSender())
378     .scan((int acc) => acc+1, 0)
379     .throttleLast(3.msecs)
380     .take(6)
381     .collect((int i) shared { p.atomicOp!"+="(i); })
382     .syncWait().assumeOk;
383 
384   p.atomicLoad.shouldBeGreaterThan(40);
385 }
386 
387 @("throttling.throttleLast.thread.arrayStream")
388 @safe unittest {
389   import core.time;
390 
391   shared int p = 0;
392 
393   [1,2,3].arrayStream()
394     .via(ThreadSender())
395     .throttleLast(30.msecs)
396     .collect((int i) shared { p.atomicOp!"+="(i); })
397     .syncWait().assumeOk;
398 
399   p.atomicLoad.should == 3;
400 }
401 
402 @("throttling.throttleLast.thread.exception")
403 @safe unittest {
404   import core.time;
405 
406   1.msecs
407     .intervalStream()
408     .via(ThreadSender())
409     .throttleLast(10.msecs)
410     .collect(() shared { throw new Exception("Bla"); })
411     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
412 }
413 
414 @("throttling.throttleFirst")
415 @safe unittest {
416   import core.time;
417   import concurrency.scheduler : ManualTimeWorker;
418   import concurrency.operations : withScheduler, whenAll;
419   import concurrency.sender : justFrom;
420 
421   shared int p = 0;
422   auto worker = new shared ManualTimeWorker();
423 
424   auto throttled = 1.msecs
425     .intervalStream()
426     .scan((int acc) => acc+1, 0)
427     .throttleFirst(3.msecs)
428     .take(2)
429     .collect((int i) shared { p.atomicOp!"+="(i); })
430     .withScheduler(worker.getScheduler);
431 
432   auto driver = justFrom(() shared {
433       p.atomicLoad.should == 0;
434 
435       worker.advance(1.msecs);
436       p.atomicLoad.should == 1;
437 
438       worker.advance(1.msecs);
439       p.atomicLoad.should == 1;
440 
441       worker.advance(1.msecs);
442       p.atomicLoad.should == 1;
443 
444       worker.advance(1.msecs);
445       p.atomicLoad.should == 5;
446 
447       worker.timeUntilNextEvent().should == null;
448     });
449   whenAll(throttled, driver).syncWait().assumeOk;
450 
451   p.should == 5;
452 }
453 
454 @("throttling.debounce")
455 @safe unittest {
456   import core.time;
457   import concurrency.scheduler : ManualTimeWorker;
458   import concurrency.operations : withScheduler, whenAll;
459   import concurrency.sender : justFrom;
460 
461   shared int p = 0;
462   auto worker = new shared ManualTimeWorker();
463   auto source = sharedStream!int;
464 
465   auto throttled = source
466     .debounce(3.msecs)
467     .take(2)
468     .collect((int i) shared { p.atomicOp!"+="(i); })
469     .withScheduler(worker.getScheduler);
470 
471   auto driver = justFrom(() shared {
472       source.emit(1);
473       p.atomicLoad.should == 0;
474       worker.timeUntilNextEvent().should == 3.msecs;
475 
476       worker.advance(3.msecs);
477       p.atomicLoad.should == 1;
478 
479       source.emit(2);
480       p.atomicLoad.should == 1;
481       worker.timeUntilNextEvent().should == 3.msecs;
482 
483       source.emit(3);
484       p.atomicLoad.should == 1;
485       worker.timeUntilNextEvent().should == 3.msecs;
486 
487       worker.advance(1.msecs);
488       p.atomicLoad.should == 1;
489       worker.timeUntilNextEvent().should == 2.msecs;
490 
491       source.emit(4);
492       p.atomicLoad.should == 1;
493       worker.timeUntilNextEvent().should == 3.msecs;
494 
495       worker.advance(3.msecs);
496       p.atomicLoad.should == 5;
497 
498       worker.timeUntilNextEvent().should == null;
499     });
500   whenAll(throttled, driver).syncWait().assumeOk;
501 
502   p.should == 5;
503 }
504 
505 @("slide")
506 @safe unittest {
507   import std.stdio;
508   import std.functional : toDelegate;
509   import std.algorithm : sum;
510   shared int p;
511 
512   [1,2,3,4,5,6,7].arrayStream
513     .slide(3)
514     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
515     .syncWait.assumeOk;
516 
517   p.should == 60;
518 
519   [1,2].arrayStream
520     .slide(3)
521     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
522     .syncWait.assumeOk;
523 
524   p.should == 60;
525 }
526 
527 @("slide.step")
528 @safe unittest {
529   import std.stdio;
530   import std.functional : toDelegate;
531   import std.algorithm : sum;
532   shared int p;
533 
534   [1,2,3,4,5,6,7].arrayStream
535     .slide(3, 2)
536     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
537     .syncWait.assumeOk;
538 
539   p.should == 36;
540 
541   [1,2].arrayStream
542     .slide(2, 2)
543     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
544     .syncWait.assumeOk;
545 
546   p.should == 39;
547 }
548 
549 @("toList.arrayStream")
550 @safe unittest {
551   [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3];
552 }
553 
554 @("toList.arrayStream.whenAll")
555 @safe unittest {
556   import concurrency.operations : withScheduler, whenAll;
557   import std.typecons : tuple;
558   auto s1 = [1,2,3].arrayStream.toList;
559   auto s2 = [2,3,4].arrayStream.toList;
560   whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]);
561 }
562 
563 @("filter")
564 unittest {
565   [1,2,3,4].arrayStream
566     .filter((int i) => i % 2 == 0)
567     .toList
568     .syncWait
569     .value.should == [2,4];
570 }
571 
572 @("cycle")
573 unittest {
574   "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/";
575 }
576 
577 @("flatmap.concat.just")
578 @safe unittest {
579   import concurrency.sender : just;
580 
581   [1,2,3].arrayStream
582     .flatMapConcat((int i) => just(i))
583     .toList
584     .syncWait
585     .value
586     .should == [1,2,3];
587 }
588 
589 @("flatmap.concat.thread")
590 @safe unittest {
591   import concurrency.sender : just;
592   import concurrency.operations : via;
593 
594   [1,2,3].arrayStream
595     .flatMapConcat((int i) => just(i).via(ThreadSender()))
596     .toList
597     .syncWait
598     .value
599     .should == [1,2,3];
600 }
601 
602 @("flatmap.concat.error")
603 @safe unittest {
604   import concurrency.sender : just, ErrorSender;
605   import concurrency.operations : via;
606 
607   [1,2,3].arrayStream
608     .flatMapConcat((int i) => ErrorSender())
609     .collect(()shared{})
610     .syncWait
611     .assumeOk
612     .shouldThrow();
613 }
614 
615 @("flatmap.concat.thread.on.thread")
616 @safe unittest {
617   import concurrency.sender : just;
618   import concurrency.operations : via;
619 
620   [1,2,3].arrayStream
621     .flatMapConcat((int i) => just(i).via(ThreadSender()))
622     .toList
623     .via(ThreadSender())
624     .syncWait
625     .value
626     .should == [1,2,3];
627 }
628 
629 @("flatmap.latest.just")
630 @safe unittest {
631   import concurrency.sender : just;
632 
633   [1,2,3].arrayStream
634     .flatMapLatest((int i) => just(i))
635     .toList
636     .syncWait
637     .value
638     .should == [1,2,3];
639 }
640 
641 @("flatmap.latest.delay")
642 @safe unittest {
643   import concurrency.sender : just, delay;
644   import concurrency.operations : via, onTermination;
645   import core.time;
646 
647   import std.stdio;
648   [1,2,3].arrayStream
649     .flatMapLatest((int i) => just(i).via(delay(50.msecs)))
650     .toList
651     .via(ThreadSender())
652     .syncWait
653     .value
654     .should == [3];
655 }
656 
657 @("flatmap.latest.error")
658 @safe unittest {
659   import concurrency.sender : just, ErrorSender;
660   import concurrency.operations : via;
661 
662   [1,2,3].arrayStream
663     .flatMapLatest((int i) => ErrorSender())
664     .collect(()shared{})
665     .syncWait
666     .assumeOk
667     .shouldThrow();
668 }