1 module ut.concurrency.stream;
2 
3 import concurrency.stream;
4 import concurrency;
5 import unit_threaded;
6 import concurrency.stoptoken;
7 import core.atomic;
8 import concurrency.thread : ThreadSender;
9 
10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.
11 
12 @("arrayStream")
13 @safe unittest {
14   shared int p = 0;
15   [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
16   p.should == 6;
17 }
18 
19 @("intervalStream")
20 @safe unittest {
21   import core.time;
22   import concurrency.operations : withScheduler, whenAll;
23   import concurrency.sender : justFrom;
24   import concurrency.scheduler : ManualTimeWorker;
25 
26   auto worker = new shared ManualTimeWorker();
27   auto interval = 5.msecs.intervalStream()
28     .take(2)
29     .collect(() shared {})
30     .withScheduler(worker.getScheduler);
31 
32   auto driver = justFrom(() shared {
33       worker.timeUntilNextEvent().should == 5.msecs;
34       worker.advance(5.msecs);
35 
36       worker.timeUntilNextEvent().should == 5.msecs;
37       worker.advance(5.msecs);
38 
39       worker.timeUntilNextEvent().should == null;
40     });
41 
42   whenAll(interval, driver).syncWait().isOk.should == true;
43 }
44 
45 
46 @("infiniteStream.stop")
47 @safe unittest {
48   import concurrency.operations : withStopSource;
49   shared int g = 0;
50   auto source = new shared StopSource();
51   infiniteStream(5).collect((int n) shared {
52       if (g < 14)
53         g.atomicOp!"+="(n);
54       else
55         source.stop();
56     })
57     .withStopSource(source).syncWait.isCancelled.should == true;
58   g.should == 15;
59 };
60 
61 @("infiniteStream.take")
62 @safe unittest {
63   shared int g = 0;
64   infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
65   g.should == 20;
66 }
67 
68 @("iotaStream")
69 @safe unittest {
70   import concurrency.stoptoken;
71   shared int g = 0;
72   iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
73   g.should == 10;
74 }
75 
76 @("loopStream")
77 @safe unittest {
78   struct Loop {
79     size_t b,e;
80     void loop(DG, StopToken)(DG emit, StopToken stopToken) {
81       foreach(i; b..e)
82         emit(i);
83     }
84   }
85   shared int g = 0;
86   Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
87   g.should == 6;
88 }
89 
90 @("toStreamObject")
91 @safe unittest {
92   import core.atomic : atomicOp;
93 
94   static StreamObjectBase!int getStream() {
95     return [1,2,3].arrayStream().toStreamObject();
96   }
97   shared int p;
98 
99   getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
100 
101   p.should == 6;
102 }
103 
104 
105 @("toStreamObject.take")
106 @safe unittest {
107   static StreamObjectBase!int getStream() {
108     return [1,2,3].arrayStream().toStreamObject();
109   }
110   shared int p;
111 
112   getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
113 
114   p.should == 3;
115 }
116 
117 @("toStreamObject.void")
118 @safe unittest {
119   import core.time : msecs;
120   shared bool p = false;
121 
122   1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true;
123 
124   p.should == true;
125 }
126 
127 @("transform.int.double")
128 @safe unittest {
129   shared int p = 0;
130   [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
131   p.should == 18;
132 }
133 
134 @("transform.int.bool")
135 @safe unittest {
136   shared int p = 0;
137   [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true;
138   p.should == 1;
139 }
140 
141 @("scan")
142 @safe unittest {
143   shared int p = 0;
144   [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
145   p.should == 10;
146 }
147 
148 @("scan.void-value")
149 @safe unittest {
150   import core.time;
151   shared int p = 0;
152   5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
153   p.should == 6;
154 }
155 
156 @("take.enough")
157 @safe unittest {
158   shared int p = 0;
159 
160   [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
161   p.should == 3;
162 }
163 
164 @("take.too-few")
165 @safe unittest {
166   shared int p = 0;
167 
168   [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
169   p.should == 6;
170 }
171 
172 @("take.donestream")
173 @safe unittest {
174   doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true;
175 }
176 
177 @("take.errorstream")
178 @safe unittest {
179   errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad");
180 }
181 
182 @("sample.trigger.stop")
183 @safe unittest {
184   import core.time;
185   auto sampler = 7.msecs.intervalStream()
186     .scan((int acc) => acc+1, 0)
187     .sample(10.msecs.intervalStream().take(3))
188     .collect((int i) shared {})
189     .syncWait().isOk.should == true;
190 }
191 
192 @("sample.slower")
193 @safe unittest {
194   import core.time;
195   import concurrency.operations : withScheduler, whenAll;
196   import concurrency.sender : justFrom;
197 
198   shared int p = 0;
199   import concurrency.scheduler : ManualTimeWorker;
200 
201   auto worker = new shared ManualTimeWorker();
202 
203   auto sampler = 7.msecs
204     .intervalStream()
205     .scan((int acc) => acc+1, 0)
206     .sample(10.msecs.intervalStream())
207     .take(3)
208     .collect((int i) shared { p.atomicOp!"+="(i); })
209     .withScheduler(worker.getScheduler);
210 
211   auto driver = justFrom(() shared {
212       worker.advance(7.msecs);
213       p.atomicLoad.should == 0;
214       worker.timeUntilNextEvent().should == 3.msecs;
215 
216       worker.advance(3.msecs);
217       p.atomicLoad.should == 1;
218       worker.timeUntilNextEvent().should == 4.msecs;
219 
220       worker.advance(4.msecs);
221       p.atomicLoad.should == 1;
222       worker.timeUntilNextEvent().should == 6.msecs;
223 
224       worker.advance(6.msecs);
225       p.atomicLoad.should == 3;
226       worker.timeUntilNextEvent().should == 1.msecs;
227 
228       worker.advance(1.msecs);
229       p.atomicLoad.should == 3;
230       worker.timeUntilNextEvent().should == 7.msecs;
231 
232       worker.advance(7.msecs);
233       p.atomicLoad.should == 3;
234       worker.timeUntilNextEvent().should == 2.msecs;
235 
236       worker.advance(2.msecs);
237       p.atomicLoad.should == 7;
238       worker.timeUntilNextEvent().should == null;
239     });
240 
241   whenAll(sampler, driver).syncWait().isOk.should == true;
242 
243   p.should == 7;
244 }
245 
246 @("sample.faster")
247 @safe unittest {
248   import core.time;
249   import concurrency.operations : withScheduler, whenAll;
250   import concurrency.sender : justFrom;
251 
252   shared int p = 0;
253   import concurrency.scheduler : ManualTimeWorker;
254 
255   auto worker = new shared ManualTimeWorker();
256 
257   auto sampler = 7.msecs
258     .intervalStream()
259     .scan((int acc) => acc+1, 0)
260     .sample(3.msecs.intervalStream())
261     .take(3)
262     .collect((int i) shared { p.atomicOp!"+="(i); })
263     .withScheduler(worker.getScheduler);
264 
265   auto driver = justFrom(() shared {
266       worker.advance(3.msecs);
267       p.atomicLoad.should == 0;
268       worker.timeUntilNextEvent().should == 3.msecs;
269 
270       worker.advance(3.msecs);
271       p.atomicLoad.should == 0;
272       worker.timeUntilNextEvent().should == 1.msecs;
273 
274       worker.advance(1.msecs);
275       p.atomicLoad.should == 0;
276       worker.timeUntilNextEvent().should == 2.msecs;
277 
278       worker.advance(2.msecs);
279       p.atomicLoad.should == 1;
280       worker.timeUntilNextEvent().should == 3.msecs;
281 
282       worker.advance(3.msecs);
283       p.atomicLoad.should == 1;
284       worker.timeUntilNextEvent().should == 2.msecs;
285 
286       worker.advance(2.msecs);
287       p.atomicLoad.should == 1;
288       worker.timeUntilNextEvent().should == 1.msecs;
289 
290       worker.advance(1.msecs);
291       p.atomicLoad.should == 3;
292       worker.timeUntilNextEvent().should == 3.msecs;
293 
294       worker.advance(3.msecs);
295       p.atomicLoad.should == 3;
296       worker.timeUntilNextEvent().should == 3.msecs;
297 
298       worker.advance(3.msecs);
299       // NOTE: normally `p` ought to be 6 since 3*7 msecs have already elapsed.
300       // but due to the way slots work in the timingwheels implementation
301       // timers that are added later are executed earlier.
302       // see https://github.com/symmetryinvestments/concurrency/issues/35
303       p.atomicLoad.should == 3;
304       worker.timeUntilNextEvent().should == 3.msecs;
305 
306       worker.advance(3.msecs);
307       p.atomicLoad.should == 6;
308       worker.timeUntilNextEvent().should == null;
309     });
310 
311   whenAll(sampler, driver).syncWait().assumeOk;
312 
313   p.should == 6;
314 }
315 
316 @("sharedStream")
317 @safe unittest {
318   import concurrency.operations : then, race;
319 
320   auto source = sharedStream!int;
321 
322   shared int p = 0;
323 
324   auto emitter = ThreadSender().then(() shared {
325       source.emit(6);
326       source.emit(12);
327     });
328   auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); });
329 
330   race(collector, emitter).syncWait().isOk.should == true;
331 
332   p.atomicLoad.should == 18;
333 }
334 
335 @("throttling.throttleLast")
336 @safe unittest {
337   import core.time;
338 
339   shared int p = 0;
340 
341   1.msecs
342     .intervalStream()
343     .scan((int acc) => acc+1, 0)
344     .throttleLast(3.msecs)
345     .take(6)
346     .collect((int i) shared { p.atomicOp!"+="(i); })
347     .syncWait().isOk.should == true;
348 
349   p.atomicLoad.shouldBeGreaterThan(40);
350 }
351 
352 @("throttling.throttleLast.arrayStream")
353 @safe unittest {
354   import core.time;
355 
356   shared int p = 0;
357 
358   [1,2,3].arrayStream()
359     .throttleLast(30.msecs)
360     .collect((int i) shared { p.atomicOp!"+="(i); })
361     .syncWait().isOk.should == true;
362 
363   p.atomicLoad.should == 3;
364 }
365 
366 @("throttling.throttleLast.exception")
367 @safe unittest {
368   import core.time;
369 
370   1.msecs
371     .intervalStream()
372     .throttleLast(10.msecs)
373     .collect(() shared { throw new Exception("Bla"); })
374     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
375 }
376 
377 @("throttling.throttleLast.thread")
378 @safe unittest {
379   import core.time;
380 
381   shared int p = 0;
382 
383   1.msecs
384     .intervalStream()
385     .via(ThreadSender())
386     .scan((int acc) => acc+1, 0)
387     .throttleLast(3.msecs)
388     .take(6)
389     .collect((int i) shared { p.atomicOp!"+="(i); })
390     .syncWait().isOk.should == true;
391 
392   p.atomicLoad.shouldBeGreaterThan(40);
393 }
394 
395 @("throttling.throttleLast.thread.arrayStream")
396 @safe unittest {
397   import core.time;
398 
399   shared int p = 0;
400 
401   [1,2,3].arrayStream()
402     .via(ThreadSender())
403     .throttleLast(30.msecs)
404     .collect((int i) shared { p.atomicOp!"+="(i); })
405     .syncWait().isOk.should == true;
406 
407   p.atomicLoad.should == 3;
408 }
409 
410 @("throttling.throttleLast.thread.exception")
411 @safe unittest {
412   import core.time;
413 
414   1.msecs
415     .intervalStream()
416     .via(ThreadSender())
417     .throttleLast(10.msecs)
418     .collect(() shared { throw new Exception("Bla"); })
419     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
420 }
421 
422 @("throttling.throttleFirst")
423 @safe unittest {
424   import core.time;
425   import concurrency.scheduler : ManualTimeWorker;
426   import concurrency.operations : withScheduler, whenAll;
427   import concurrency.sender : justFrom;
428 
429   shared int p = 0;
430   auto worker = new shared ManualTimeWorker();
431 
432   auto throttled = 1.msecs
433     .intervalStream()
434     .scan((int acc) => acc+1, 0)
435     .throttleFirst(3.msecs)
436     .take(2)
437     .collect((int i) shared { p.atomicOp!"+="(i); })
438     .withScheduler(worker.getScheduler);
439 
440   auto driver = justFrom(() shared {
441       p.atomicLoad.should == 0;
442 
443       worker.advance(1.msecs);
444       p.atomicLoad.should == 1;
445 
446       worker.advance(1.msecs);
447       p.atomicLoad.should == 1;
448 
449       worker.advance(1.msecs);
450       p.atomicLoad.should == 1;
451 
452       worker.advance(1.msecs);
453       p.atomicLoad.should == 5;
454 
455       worker.timeUntilNextEvent().should == null;
456     });
457   whenAll(throttled, driver).syncWait().isOk.should == true;
458 
459   p.should == 5;
460 }
461 
462 @("throttling.debounce")
463 @safe unittest {
464   import core.time;
465   import concurrency.scheduler : ManualTimeWorker;
466   import concurrency.operations : withScheduler, whenAll;
467   import concurrency.sender : justFrom;
468 
469   shared int p = 0;
470   auto worker = new shared ManualTimeWorker();
471   auto source = sharedStream!int;
472 
473   auto throttled = source
474     .debounce(3.msecs)
475     .take(2)
476     .collect((int i) shared { p.atomicOp!"+="(i); })
477     .withScheduler(worker.getScheduler);
478 
479   auto driver = justFrom(() shared {
480       source.emit(1);
481       p.atomicLoad.should == 0;
482       worker.timeUntilNextEvent().should == 3.msecs;
483 
484       worker.advance(3.msecs);
485       p.atomicLoad.should == 1;
486 
487       source.emit(2);
488       p.atomicLoad.should == 1;
489       worker.timeUntilNextEvent().should == 3.msecs;
490 
491       source.emit(3);
492       p.atomicLoad.should == 1;
493       worker.timeUntilNextEvent().should == 3.msecs;
494 
495       worker.advance(1.msecs);
496       p.atomicLoad.should == 1;
497       worker.timeUntilNextEvent().should == 2.msecs;
498 
499       source.emit(4);
500       p.atomicLoad.should == 1;
501       worker.timeUntilNextEvent().should == 3.msecs;
502 
503       worker.advance(3.msecs);
504       p.atomicLoad.should == 5;
505 
506       worker.timeUntilNextEvent().should == null;
507     });
508   whenAll(throttled, driver).syncWait().isOk.should == true;
509 
510   p.should == 5;
511 }
512 
513 @("slide")
514 @safe unittest {
515   import std.stdio;
516   import std.functional : toDelegate;
517   import std.algorithm : sum;
518   shared int p;
519 
520   [1,2,3,4,5,6,7].arrayStream
521     .slide(3)
522     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
523     .syncWait.isOk.should == true;
524 
525   p.should == 60;
526 
527   [1,2].arrayStream
528     .slide(3)
529     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
530     .syncWait.isOk.should == true;
531 
532   p.should == 60;
533 }
534 
535 @("slide.step")
536 @safe unittest {
537   import std.stdio;
538   import std.functional : toDelegate;
539   import std.algorithm : sum;
540   shared int p;
541 
542   [1,2,3,4,5,6,7].arrayStream
543     .slide(3, 2)
544     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
545     .syncWait.isOk.should == true;
546 
547   p.should == 36;
548 
549   [1,2].arrayStream
550     .slide(2, 2)
551     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
552     .syncWait.isOk.should == true;
553 
554   p.should == 39;
555 }
556 
557 @("toList.arrayStream")
558 @safe unittest {
559   [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3];
560 }
561 
562 @("toList.arrayStream.whenAll")
563 @safe unittest {
564   import concurrency.operations : withScheduler, whenAll;
565   import std.typecons : tuple;
566   auto s1 = [1,2,3].arrayStream.toList;
567   auto s2 = [2,3,4].arrayStream.toList;
568   whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]);
569 }
570 
571 @("filter")
572 unittest {
573   [1,2,3,4].arrayStream
574     .filter((int i) => i % 2 == 0)
575     .toList
576     .syncWait
577     .value.should == [2,4];
578 }
579 
580 @("cycle")
581 unittest {
582   "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/";
583 }