1 module ut.concurrency.stream;
2 
3 import concurrency.stream;
4 import concurrency;
5 import unit_threaded;
6 import concurrency.stoptoken;
7 import core.atomic;
8 import concurrency.thread : ThreadSender;
9 
10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.
11 
12 @("arrayStream")
13 @safe unittest {
14   shared int p = 0;
15   [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
16   p.should == 6;
17 }
18 
19 @("intervalStream")
20 @safe unittest {
21   import core.time;
22   import concurrency.operations : withScheduler, whenAll;
23   import concurrency.sender : justFrom;
24   import concurrency.scheduler : ManualTimeWorker;
25 
26   auto worker = new shared ManualTimeWorker();
27   auto interval = 5.msecs.intervalStream()
28     .take(2)
29     .collect(() shared {})
30     .withScheduler(worker.getScheduler);
31 
32   auto driver = justFrom(() shared {
33       worker.timeUntilNextEvent().should == 5.msecs;
34       worker.advance(5.msecs);
35 
36       worker.timeUntilNextEvent().should == 5.msecs;
37       worker.advance(5.msecs);
38 
39       worker.timeUntilNextEvent().should == null;
40     });
41 
42   whenAll(interval, driver).syncWait().assumeOk;
43 }
44 
45 
46 @("infiniteStream.stop")
47 @safe unittest {
48   import concurrency.operations : withStopSource;
49   shared int g = 0;
50   auto source = new shared StopSource();
51   infiniteStream(5).collect((int n) shared {
52       if (g < 14)
53         g.atomicOp!"+="(n);
54       else
55         source.stop();
56     })
57     .withStopSource(source).syncWait.isCancelled.should == true;
58   g.should == 15;
59 };
60 
61 @("infiniteStream.take")
62 @safe unittest {
63   shared int g = 0;
64   infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
65   g.should == 20;
66 }
67 
68 @("iotaStream")
69 @safe unittest {
70   import concurrency.stoptoken;
71   shared int g = 0;
72   iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
73   g.should == 10;
74 }
75 
76 @("loopStream")
77 @safe unittest {
78   struct Loop {
79     size_t b,e;
80     void loop(DG, StopToken)(DG emit, StopToken stopToken) {
81       foreach(i; b..e)
82         emit(i);
83     }
84   }
85   shared int g = 0;
86   Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk;
87   g.should == 6;
88 }
89 
90 @("toStreamObject")
91 @safe unittest {
92   import core.atomic : atomicOp;
93 
94   static StreamObjectBase!int getStream() {
95     return [1,2,3].arrayStream().toStreamObject();
96   }
97   shared int p;
98 
99   getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().assumeOk;
100 
101   p.should == 6;
102 }
103 
104 
105 @("toStreamObject.take")
106 @safe unittest {
107   static StreamObjectBase!int getStream() {
108     return [1,2,3].arrayStream().toStreamObject();
109   }
110   shared int p;
111 
112   getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().assumeOk;
113 
114   p.should == 3;
115 }
116 
117 @("toStreamObject.void")
118 @safe unittest {
119   import core.time : msecs;
120   shared bool p = false;
121 
122   1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().assumeOk;
123 
124   p.should == true;
125 }
126 
127 @("transform.int.double")
128 @safe unittest {
129   shared int p = 0;
130   [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
131   p.should == 18;
132 }
133 
134 @("transform.int.bool")
135 @safe unittest {
136   shared int p = 0;
137   [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().assumeOk;
138   p.should == 1;
139 }
140 
141 @("scan")
142 @safe unittest {
143   shared int p = 0;
144   [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
145   p.should == 10;
146 }
147 
148 @("scan.void-value")
149 @safe unittest {
150   import core.time;
151   shared int p = 0;
152   5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk;
153   p.should == 6;
154 }
155 
156 @("take.enough")
157 @safe unittest {
158   shared int p = 0;
159 
160   [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk;
161   p.should == 3;
162 }
163 
164 @("take.too-few")
165 @safe unittest {
166   shared int p = 0;
167 
168   [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk;
169   p.should == 6;
170 }
171 
172 @("take.donestream")
173 @safe unittest {
174   doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true;
175 }
176 
177 @("take.errorstream")
178 @safe unittest {
179   errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad");
180 }
181 
182 @("sample.trigger.stop")
183 @safe unittest {
184   import core.time;
185   7.msecs.intervalStream()
186     .scan((int acc) => acc+1, 0)
187     .sample(10.msecs.intervalStream().take(3))
188     .collect((int i) shared {})
189     .syncWait().assumeOk;
190 }
191 
192 @("sample.slower")
193 @safe unittest {
194   import core.time;
195   import concurrency.operations : withScheduler, whenAll;
196   import concurrency.sender : justFrom;
197 
198   shared int p = 0;
199   import concurrency.scheduler : ManualTimeWorker;
200 
201   auto worker = new shared ManualTimeWorker();
202 
203   auto sampler = 7.msecs
204     .intervalStream()
205     .scan((int acc) => acc+1, 0)
206     .sample(10.msecs.intervalStream())
207     .take(3)
208     .collect((int i) shared { p.atomicOp!"+="(i); })
209     .withScheduler(worker.getScheduler);
210 
211   auto driver = justFrom(() shared {
212       worker.advance(7.msecs);
213       p.atomicLoad.should == 0;
214       worker.timeUntilNextEvent().should == 3.msecs;
215 
216       worker.advance(3.msecs);
217       p.atomicLoad.should == 1;
218       worker.timeUntilNextEvent().should == 4.msecs;
219 
220       worker.advance(4.msecs);
221       p.atomicLoad.should == 1;
222       worker.timeUntilNextEvent().should == 6.msecs;
223 
224       worker.advance(6.msecs);
225       p.atomicLoad.should == 3;
226       worker.timeUntilNextEvent().should == 1.msecs;
227 
228       worker.advance(1.msecs);
229       p.atomicLoad.should == 3;
230       worker.timeUntilNextEvent().should == 7.msecs;
231 
232       worker.advance(7.msecs);
233       p.atomicLoad.should == 3;
234       worker.timeUntilNextEvent().should == 2.msecs;
235 
236       worker.advance(2.msecs);
237       p.atomicLoad.should == 7;
238       worker.timeUntilNextEvent().should == null;
239     });
240 
241   whenAll(sampler, driver).syncWait().assumeOk;
242 
243   p.should == 7;
244 }
245 
246 @("sample.faster")
247 @safe unittest {
248   import core.time;
249   import concurrency.operations : withScheduler, whenAll;
250   import concurrency.sender : justFrom;
251 
252   shared int p = 0;
253   import concurrency.scheduler : ManualTimeWorker;
254 
255   auto worker = new shared ManualTimeWorker();
256 
257   auto sampler = 7.msecs
258     .intervalStream()
259     .scan((int acc) => acc+1, 0)
260     .sample(3.msecs.intervalStream())
261     .take(3)
262     .collect((int i) shared { p.atomicOp!"+="(i); })
263     .withScheduler(worker.getScheduler);
264 
265   auto driver = justFrom(() shared {
266       worker.advance(3.msecs);
267       p.atomicLoad.should == 0;
268       worker.timeUntilNextEvent().should == 3.msecs;
269 
270       worker.advance(3.msecs);
271       p.atomicLoad.should == 0;
272       worker.timeUntilNextEvent().should == 1.msecs;
273 
274       worker.advance(1.msecs);
275       p.atomicLoad.should == 0;
276       worker.timeUntilNextEvent().should == 2.msecs;
277 
278       worker.advance(2.msecs);
279       p.atomicLoad.should == 1;
280       worker.timeUntilNextEvent().should == 3.msecs;
281 
282       worker.advance(3.msecs);
283       p.atomicLoad.should == 1;
284       worker.timeUntilNextEvent().should == 2.msecs;
285 
286       worker.advance(2.msecs);
287       p.atomicLoad.should == 1;
288       worker.timeUntilNextEvent().should == 1.msecs;
289 
290       worker.advance(1.msecs);
291       p.atomicLoad.should == 3;
292       worker.timeUntilNextEvent().should == 3.msecs;
293 
294       worker.advance(3.msecs);
295       p.atomicLoad.should == 3;
296       worker.timeUntilNextEvent().should == 3.msecs;
297 
298       worker.advance(3.msecs);
299       p.atomicLoad.should == 6;
300       worker.timeUntilNextEvent().should == null;
301     });
302 
303   whenAll(sampler, driver).syncWait().assumeOk;
304 
305   p.should == 6;
306 }
307 
308 @("sharedStream")
309 @safe unittest {
310   import concurrency.operations : then, race;
311 
312   auto source = sharedStream!int;
313 
314   shared int p = 0;
315 
316   auto emitter = ThreadSender().then(() shared {
317       source.emit(6);
318       source.emit(12);
319     });
320   auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); });
321 
322   race(collector, emitter).syncWait().assumeOk;
323 
324   p.atomicLoad.should == 18;
325 }
326 
327 @("throttling.throttleLast")
328 @safe unittest {
329   import core.time;
330   import concurrency.scheduler : ManualTimeWorker;
331   import concurrency.operations : withScheduler, whenAll;
332   import concurrency.sender : justFrom;
333 
334   shared int p = 0;
335   auto worker = new shared ManualTimeWorker();
336 
337   auto throttled = 1.msecs
338     .intervalStream(true)
339     .scan((int acc) => acc+1, 0)
340     .throttleLast(3.msecs)
341     .take(4)
342     .collect((int i) shared { p.atomicOp!"+="(i); })
343     .withScheduler(worker.getScheduler);
344 
345   auto driver = justFrom(() shared {
346       p.atomicLoad.should == 0;
347       worker.timeUntilNextEvent().should == 1.msecs;
348 
349       foreach(expected; [0,0,3,3,3,9,9,9,18,18,18,30]) {
350         worker.advance(1.msecs);
351         p.atomicLoad.should == expected;
352       }
353 
354       worker.timeUntilNextEvent().should == null;
355   });
356 
357   whenAll(throttled, driver).syncWait().assumeOk;
358 
359   p.atomicLoad.should == 30;
360 }
361 
362 @("throttling.throttleLast.arrayStream")
363 @safe unittest {
364   import core.time;
365 
366   shared int p = 0;
367 
368   [1,2,3].arrayStream()
369     .throttleLast(30.msecs)
370     .collect((int i) shared { p.atomicOp!"+="(i); })
371     .syncWait().assumeOk;
372 
373   p.atomicLoad.should == 3;
374 }
375 
376 @("throttling.throttleLast.exception")
377 @safe unittest {
378   import core.time;
379 
380   1.msecs
381     .intervalStream()
382     .throttleLast(10.msecs)
383     .collect(() shared { throw new Exception("Bla"); })
384     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
385 }
386 
387 @("throttling.throttleLast.thread.arrayStream")
388 @safe unittest {
389   import core.time;
390 
391   shared int p = 0;
392 
393   [1,2,3].arrayStream()
394     .via(ThreadSender())
395     .throttleLast(30.msecs)
396     .collect((int i) shared { p.atomicOp!"+="(i); })
397     .syncWait().assumeOk;
398 
399   p.atomicLoad.should == 3;
400 }
401 
402 @("throttling.throttleLast.thread.exception")
403 @safe unittest {
404   import core.time;
405 
406   1.msecs
407     .intervalStream()
408     .via(ThreadSender())
409     .throttleLast(10.msecs)
410     .collect(() shared { throw new Exception("Bla"); })
411     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
412 }
413 
414 @("throttling.throttleFirst")
415 @safe unittest {
416   import core.time;
417   import concurrency.scheduler : ManualTimeWorker;
418   import concurrency.operations : withScheduler, whenAll;
419   import concurrency.sender : justFrom;
420 
421   shared int p = 0;
422   auto worker = new shared ManualTimeWorker();
423 
424   auto throttled = 1.msecs
425     .intervalStream()
426     .scan((int acc) => acc+1, 0)
427     .throttleFirst(3.msecs)
428     .take(2)
429     .collect((int i) shared { p.atomicOp!"+="(i); })
430     .withScheduler(worker.getScheduler);
431 
432   auto driver = justFrom(() shared {
433       p.atomicLoad.should == 0;
434 
435       worker.advance(1.msecs);
436       p.atomicLoad.should == 1;
437 
438       worker.advance(1.msecs);
439       p.atomicLoad.should == 1;
440 
441       worker.advance(1.msecs);
442       p.atomicLoad.should == 1;
443 
444       worker.advance(1.msecs);
445       p.atomicLoad.should == 5;
446 
447       worker.timeUntilNextEvent().should == null;
448     });
449   whenAll(throttled, driver).syncWait().assumeOk;
450 
451   p.should == 5;
452 }
453 
454 @("throttling.debounce")
455 @safe unittest {
456   import core.time;
457   import concurrency.scheduler : ManualTimeWorker;
458   import concurrency.operations : withScheduler, whenAll;
459   import concurrency.sender : justFrom;
460 
461   shared int p = 0;
462   auto worker = new shared ManualTimeWorker();
463   auto source = sharedStream!int;
464 
465   auto throttled = source
466     .debounce(3.msecs)
467     .take(2)
468     .collect((int i) shared { p.atomicOp!"+="(i); })
469     .withScheduler(worker.getScheduler);
470 
471   auto driver = justFrom(() shared {
472       source.emit(1);
473       p.atomicLoad.should == 0;
474       worker.timeUntilNextEvent().should == 3.msecs;
475 
476       worker.advance(3.msecs);
477       p.atomicLoad.should == 1;
478 
479       source.emit(2);
480       p.atomicLoad.should == 1;
481       worker.timeUntilNextEvent().should == 3.msecs;
482 
483       source.emit(3);
484       p.atomicLoad.should == 1;
485       worker.timeUntilNextEvent().should == 3.msecs;
486 
487       worker.advance(1.msecs);
488       p.atomicLoad.should == 1;
489       worker.timeUntilNextEvent().should == 2.msecs;
490 
491       source.emit(4);
492       p.atomicLoad.should == 1;
493       worker.timeUntilNextEvent().should == 3.msecs;
494 
495       worker.advance(3.msecs);
496       p.atomicLoad.should == 5;
497 
498       worker.timeUntilNextEvent().should == null;
499     });
500   whenAll(throttled, driver).syncWait().assumeOk;
501 
502   p.should == 5;
503 }
504 
505 @("slide.basic")
506 @safe unittest {
507   [1,2,3,4,5,6,7].arrayStream
508     .slide(3)
509     .transform((int[] a) => a.dup)
510     .toList
511     .syncWait.value.should == [[1,2,3],[2,3,4],[3,4,5],[4,5,6],[5,6,7]];
512 
513   [1,2].arrayStream
514     .slide(3)
515     .toList
516     .syncWait.value.length.should == 0;
517 }
518 
519 @("slide.step")
520 @safe unittest {
521   [1,2,3,4,5,6,7].arrayStream
522     .slide(3, 2)
523     .transform((int[] a) => a.dup)
524     .toList
525     .syncWait.value.should == [[1,2,3],[3,4,5],[5,6,7]];
526 
527   [1,2].arrayStream
528     .slide(2, 2)
529     .transform((int[] a) => a.dup)
530     .toList
531     .syncWait.value.should == [[1,2]];
532 
533   [1,2,3,4,5,6,7].arrayStream
534     .slide(2, 2)
535     .transform((int[] a) => a.dup)
536     .toList
537     .syncWait.value.should == [[1,2],[3,4],[5,6]];
538 
539   [1,2,3,4,5,6,7].arrayStream
540     .slide(2, 3)
541     .transform((int[] a) => a.dup)
542     .toList
543     .syncWait.value.should == [[1,2],[4,5]];
544 
545   [1,2,3,4,5,6,7,8,9,10].arrayStream
546     .slide(2, 4)
547     .transform((int[] a) => a.dup)
548     .toList
549     .syncWait.value.should == [[1,2],[5,6],[9,10]];
550 }
551 
552 @("toList.arrayStream")
553 @safe unittest {
554   [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3];
555 }
556 
557 @("toList.arrayStream.whenAll")
558 @safe unittest {
559   import concurrency.operations : withScheduler, whenAll;
560   import std.typecons : tuple;
561   auto s1 = [1,2,3].arrayStream.toList;
562   auto s2 = [2,3,4].arrayStream.toList;
563   whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]);
564 }
565 
566 @("filter")
567 unittest {
568   [1,2,3,4].arrayStream
569     .filter((int i) => i % 2 == 0)
570     .toList
571     .syncWait
572     .value.should == [2,4];
573 }
574 
575 @("cycle")
576 unittest {
577   "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/";
578 }
579 
580 @("flatmap.concat.just")
581 @safe unittest {
582   import concurrency.sender : just;
583 
584   [1,2,3].arrayStream
585     .flatMapConcat((int i) => just(i))
586     .toList
587     .syncWait
588     .value
589     .should == [1,2,3];
590 }
591 
592 @("flatmap.concat.thread")
593 @safe unittest {
594   import concurrency.sender : just;
595   import concurrency.operations : via;
596 
597   [1,2,3].arrayStream
598     .flatMapConcat((int i) => just(i).via(ThreadSender()))
599     .toList
600     .syncWait
601     .value
602     .should == [1,2,3];
603 }
604 
605 @("flatmap.concat.error")
606 @safe unittest {
607   import concurrency.sender : just, ErrorSender;
608   import concurrency.operations : via;
609 
610   [1,2,3].arrayStream
611     .flatMapConcat((int i) => ErrorSender())
612     .collect(()shared{})
613     .syncWait
614     .assumeOk
615     .shouldThrow();
616 }
617 
618 @("flatmap.concat.thread.on.thread")
619 @safe unittest {
620   import concurrency.sender : just;
621   import concurrency.operations : via;
622 
623   [1,2,3].arrayStream
624     .flatMapConcat((int i) => just(i).via(ThreadSender()))
625     .toList
626     .via(ThreadSender())
627     .syncWait
628     .value
629     .should == [1,2,3];
630 }
631 
632 @("flatmap.latest.just")
633 @safe unittest {
634   import concurrency.sender : just;
635 
636   [1,2,3].arrayStream
637     .flatMapLatest((int i) => just(i))
638     .toList
639     .syncWait
640     .value
641     .should == [1,2,3];
642 }
643 
644 @("flatmap.latest.delay")
645 @safe unittest {
646   import concurrency.sender : just, delay;
647   import concurrency.operations : via, onTermination;
648   import core.time;
649 
650   import std.stdio;
651   [1,2,3].arrayStream
652     .flatMapLatest((int i) => just(i).via(delay(50.msecs)))
653     .toList
654     .via(ThreadSender())
655     .syncWait
656     .value
657     .should == [3];
658 }
659 
660 @("flatmap.latest.error")
661 @safe unittest {
662   import concurrency.sender : just, ErrorSender;
663   import concurrency.operations : via;
664 
665   [1,2,3].arrayStream
666     .flatMapLatest((int i) => ErrorSender())
667     .collect(()shared{})
668     .syncWait
669     .assumeOk
670     .shouldThrow();
671 }
672 
673 @("flatmap.latest.justfrom.exception")
674 @safe unittest {
675   import concurrency.sender : justFrom;
676 
677   import core.time;
678 
679   1.msecs
680     .intervalStream()
681     .flatMapLatest(() => justFrom(() { throw new Exception("oops"); }))
682     .collect(()shared{})
683     .syncWait
684     .assumeOk
685     .shouldThrow();
686 }
687 
688 @("flatmap.latest.exception")
689 @safe unittest {
690   import concurrency.sender : VoidSender;
691 
692   import core.time;
693 
694   1.msecs
695     .intervalStream()
696     .flatMapLatest(function VoidSender(){
697         throw new Exception("oops");
698       })
699     .collect(()shared{})
700     .syncWait
701     .assumeOk
702     .shouldThrow();
703 }
704 
705 @("flatmap.latest.intervalStream.overlap.delay")
706 @safe unittest {
707   import concurrency.sender : delay;
708   import core.time;
709 
710   1.msecs
711     .intervalStream()
712     .take(2)
713     .flatMapLatest(() => 2.msecs.delay())
714     .collect(()shared{})
715     .syncWait
716     .assumeOk();
717 }
718 
719 @("flatmap.latest.intervalStream.intervalStream.take")
720 @safe unittest {
721   import concurrency.sender : delay;
722   import concurrency.scheduler : ManualTimeWorker;
723   import concurrency.operations : withScheduler, whenAll;
724   import concurrency.sender : justFrom;
725   import core.time;
726 
727   import core.atomic;
728   shared int p;
729 
730   auto worker = new shared ManualTimeWorker();
731   auto sender = 5.msecs
732     .intervalStream()
733     .take(2)
734     .flatMapLatest(() shared {
735         return 1.msecs
736           .intervalStream(true)
737           .take(5)
738           .collect(() shared { p.atomicOp!"+="(1); });
739       })
740     .collect(()shared{})
741     .withScheduler(worker.getScheduler);
742 
743   auto driver = justFrom(() shared {
744       p.atomicLoad.should == 0;
745       worker.timeUntilNextEvent().should == 5.msecs;
746 
747       worker.advance(5.msecs);
748       p.atomicLoad.should == 1;
749       worker.timeUntilNextEvent().should == 1.msecs;
750 
751       worker.advance(1.msecs);
752       p.atomicLoad.should == 2;
753 
754       worker.advance(1.msecs);
755       p.atomicLoad.should == 3;
756 
757       worker.advance(1.msecs);
758       p.atomicLoad.should == 4;
759 
760       worker.advance(1.msecs);
761       p.atomicLoad.should == 5;
762 
763       worker.advance(1.msecs);
764       p.atomicLoad.should == 6;
765 
766       worker.advance(1.msecs);
767       p.atomicLoad.should == 7;
768 
769       worker.advance(1.msecs);
770       p.atomicLoad.should == 8;
771 
772       worker.advance(1.msecs);
773       p.atomicLoad.should == 9;
774 
775       worker.advance(1.msecs);
776       p.atomicLoad.should == 10;
777 
778       worker.timeUntilNextEvent().should == null;
779     });
780   whenAll(sender, driver).syncWait().assumeOk;
781 
782   p.atomicLoad.should == 10;
783 }
784 
785 @("flatmap.latest.intervalStream.intervalStream.sample")
786 @safe unittest {
787   import concurrency.sender : delay;
788   import concurrency.scheduler : ManualTimeWorker;
789   import concurrency.operations : withScheduler, whenAll;
790   import concurrency.sender : justFrom;
791   import core.time;
792 
793   import core.atomic;
794   shared int p;
795 
796   auto worker = new shared ManualTimeWorker();
797   auto sender = 5.msecs
798     .intervalStream()
799     .take(2)
800     .flatMapLatest(() shared {
801         return 1.msecs
802           .intervalStream(true)
803           .scan((int i) => i + 1, 0)
804           .sample(2.msecs.intervalStream())
805           .take(10)
806           .collect((int i) shared { p.atomicOp!"+="(1); });
807       })
808     .collect(()shared{})
809     .withScheduler(worker.getScheduler);
810 
811   auto driver = justFrom(() shared {
812       p.atomicLoad.should == 0;
813       worker.timeUntilNextEvent().should == 5.msecs;
814 
815       worker.advance(5.msecs);
816       p.atomicLoad.should == 0;
817       worker.timeUntilNextEvent().should == 1.msecs;
818 
819       worker.advance(1.msecs);
820       p.atomicLoad.should == 0;
821 
822       worker.advance(1.msecs);
823       p.atomicLoad.should == 1;
824 
825       worker.advance(1.msecs);
826       p.atomicLoad.should == 1;
827 
828       worker.advance(1.msecs);
829       p.atomicLoad.should == 2;
830 
831       worker.advance(1.msecs);
832       p.atomicLoad.should == 2;
833 
834       worker.advance(2.msecs);
835       p.atomicLoad.should == 3;
836 
837       worker.advance(2.msecs);
838       p.atomicLoad.should == 4;
839 
840       worker.advance(2.msecs);
841       p.atomicLoad.should == 5;
842 
843       worker.advance(2.msecs);
844       p.atomicLoad.should == 6;
845 
846       worker.advance(2.msecs);
847       p.atomicLoad.should == 7;
848 
849       worker.advance(2.msecs);
850       p.atomicLoad.should == 8;
851 
852       worker.advance(2.msecs);
853       p.atomicLoad.should == 9;
854 
855       worker.advance(2.msecs);
856       p.atomicLoad.should == 10;
857 
858       worker.advance(2.msecs);
859       p.atomicLoad.should == 11;
860 
861       worker.advance(2.msecs);
862       p.atomicLoad.should == 12;
863 
864       worker.timeUntilNextEvent().should == null;
865     });
866   whenAll(sender, driver).syncWait().assumeOk;
867 
868   p.atomicLoad.should == 12;
869 }
870 
871 @("deferStream.function")
872 @safe unittest {
873   import concurrency.stream.defer;
874   static auto getSender() @safe {
875     import concurrency.sender;
876     return just(1);
877   }
878   deferStream(&getSender).take(3).toList().syncWait().value.should == [1,1,1];
879 }
880 
881 @("deferStream.callable")
882 @safe unittest {
883   import concurrency.stream.defer;
884   static struct S {
885     auto opCall() shared @safe {
886       import concurrency.sender;
887       return just(1);
888     }
889   }
890   shared S s;
891   deferStream(s).take(3).toList().syncWait().value.should == [1,1,1];
892 }
893 
894 @("cron.timeTillNextMinute.Always")
895 @safe unittest {
896   import concurrency.stream.cron;
897   import core.time;
898   import std.datetime : SysTime, DateTime;
899 
900   auto spec = Always().Spec;
901   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 1.minutes;
902   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 1.seconds;
903 }
904 
905 @("cron.timeTillNextMinute.Exact")
906 @safe unittest {
907   import concurrency.stream.cron;
908   import core.time;
909   import std.datetime : SysTime, DateTime;
910 
911   auto spec = Exact(5).Spec;
912   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 35.minutes;
913   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 34.minutes + 1.seconds;
914   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 5.minutes;
915   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 1.seconds;
916 }
917 
918 @("cron.timeTillNextMinute.Every.basic")
919 @safe unittest {
920   import concurrency.stream.cron;
921   import core.time;
922   import std.datetime : SysTime, DateTime;
923 
924   auto spec = Every(5).Spec;
925   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 5.minutes;
926   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 4.minutes + 1.seconds;
927   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 5.minutes;
928   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 1.seconds;
929 }
930 
931 @("cron.timeTillNextMinute.Every.offset")
932 @safe unittest {
933   import concurrency.stream.cron;
934   import core.time;
935   import std.datetime : SysTime, DateTime;
936 
937   auto spec = Every(5, 3).Spec;
938   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 3.minutes;
939   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 2.minutes + 1.seconds;
940   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 3.minutes;
941   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 3.minutes + 1.seconds;
942 }
943 
944 @("cron.timeTillNextMinute.Each")
945 @safe unittest {
946   import concurrency.stream.cron;
947   import core.time;
948   import std.datetime : SysTime, DateTime;
949 
950   auto spec = Each([1,15,19,44]).Spec;
951   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 14.minutes;
952   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 13.minutes + 1.seconds;
953   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 1.minutes;
954   spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 10.minutes + 1.seconds;
955 }
956 
957 @("cron.timeTillNextTrigger.Always")
958 @safe unittest {
959   import concurrency.stream.cron;
960   import core.time;
961   import std.datetime : SysTime, DateTime;
962 
963   auto spec = CronSpec(Spec(Always()), Spec(Always()));
964   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 1.minutes;
965   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 1.seconds;
966 }
967 
968 @("cron.timeTillNextTrigger.5.over.every.hour")
969 @safe unittest {
970   import concurrency.stream.cron;
971   import core.time;
972   import std.datetime : SysTime, DateTime;
973 
974   auto spec = CronSpec(Spec(Always()), Spec(Exact(5)));
975   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 35.minutes;
976   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 34.minutes + 1.seconds;
977 }
978 
979 @("cron.timeTillNextTrigger.5.over.5.hour")
980 @safe unittest {
981   import concurrency.stream.cron;
982   import core.time;
983   import std.datetime : SysTime, DateTime;
984   import std.datetime.timezone : UTC;
985 
986   auto spec = CronSpec(Spec(Exact(5)), Spec(Exact(5)));
987   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 24.hours;
988   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 18.hours + 35.minutes;
989   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 18.hours + 34.minutes + 1.seconds;
990 }
991 
992 @("cron.timeTillNextTrigger.5.over.every.2.hours")
993 @safe unittest {
994   import concurrency.stream.cron;
995   import core.time;
996   import std.datetime : SysTime, DateTime;
997   import std.datetime.timezone : UTC;
998 
999   auto spec = CronSpec(Spec(Every(2)), Spec(Exact(5)));
1000   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 6, 5, 0), UTC())).should == 2.hours;
1001   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 1.hours;
1002   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 1.hours + 35.minutes;
1003   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 1.hours + 34.minutes + 1.seconds;
1004 }
1005 
1006 @("cron.timeTillNextTrigger.every.5.over.every.2.hours")
1007 @safe unittest {
1008   import concurrency.stream.cron;
1009   import core.time;
1010   import std.datetime : SysTime, DateTime;
1011   import std.datetime.timezone : UTC;
1012 
1013   auto spec = CronSpec(Spec(Every(2)), Spec(Every(5)));
1014   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 6, 5, 0), UTC())).should == 5.minutes;
1015   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 1.hours;
1016   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 5.minutes;
1017   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 4.minutes + 1.seconds;
1018   spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 59, 59), UTC())).should == 1.hours + 5.minutes + 1.seconds;
1019 }