1 module ut.concurrency.stream;
2 
3 import concurrency.stream;
4 import concurrency;
5 import unit_threaded;
6 import concurrency.stoptoken;
7 import core.atomic;
8 import concurrency.thread : ThreadSender;
9 
10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are.
11 
12 @("arrayStream")
13 @safe unittest {
14   shared int p = 0;
15   [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
16   p.should == 6;
17 }
18 
19 @("intervalStream")
20 @safe unittest {
21   import core.time;
22   import concurrency.operations : withScheduler, whenAll;
23   import concurrency.sender : justFrom;
24   import concurrency.scheduler : ManualTimeWorker;
25 
26   auto worker = new shared ManualTimeWorker();
27   auto interval = 5.msecs.intervalStream()
28     .take(2)
29     .collect(() shared {})
30     .withScheduler(worker.getScheduler);
31 
32   auto driver = justFrom(() shared {
33       worker.timeUntilNextEvent().should == 5.msecs;
34       worker.advance(5.msecs);
35 
36       worker.timeUntilNextEvent().should == 5.msecs;
37       worker.advance(5.msecs);
38 
39       worker.timeUntilNextEvent().should == null;
40     });
41 
42   whenAll(interval, driver).syncWait().isOk.should == true;
43 }
44 
45 
46 @("infiniteStream.stop")
47 @safe unittest {
48   import concurrency.operations : withStopSource;
49   shared int g = 0;
50   auto source = new shared StopSource();
51   infiniteStream(5).collect((int n) shared {
52       if (g < 14)
53         g.atomicOp!"+="(n);
54       else
55         source.stop();
56     })
57     .withStopSource(source).syncWait.isCancelled.should == true;
58   g.should == 15;
59 };
60 
61 @("infiniteStream.take")
62 @safe unittest {
63   shared int g = 0;
64   infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
65   g.should == 20;
66 }
67 
68 @("iotaStream")
69 @safe unittest {
70   import concurrency.stoptoken;
71   shared int g = 0;
72   iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
73   g.should == 10;
74 }
75 
76 @("loopStream")
77 @safe unittest {
78   struct Loop {
79     size_t b,e;
80     void loop(DG, StopToken)(DG emit, StopToken stopToken) {
81       foreach(i; b..e)
82         emit(i);
83     }
84   }
85   shared int g = 0;
86   Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true;
87   g.should == 6;
88 }
89 
90 @("toStreamObject")
91 @safe unittest {
92   import core.atomic : atomicOp;
93 
94   static StreamObjectBase!int getStream() {
95     return [1,2,3].arrayStream().toStreamObject();
96   }
97   shared int p;
98 
99   getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
100 
101   p.should == 6;
102 }
103 
104 
105 @("toStreamObject.take")
106 @safe unittest {
107   static StreamObjectBase!int getStream() {
108     return [1,2,3].arrayStream().toStreamObject();
109   }
110   shared int p;
111 
112   getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true;
113 
114   p.should == 3;
115 }
116 
117 @("toStreamObject.void")
118 @safe unittest {
119   import core.time : msecs;
120   shared bool p = false;
121 
122   1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true;
123 
124   p.should == true;
125 }
126 
127 @("transform.int.double")
128 @safe unittest {
129   shared int p = 0;
130   [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
131   p.should == 18;
132 }
133 
134 @("transform.int.bool")
135 @safe unittest {
136   shared int p = 0;
137   [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true;
138   p.should == 1;
139 }
140 
141 @("scan")
142 @safe unittest {
143   shared int p = 0;
144   [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
145   p.should == 10;
146 }
147 
148 @("scan.void-value")
149 @safe unittest {
150   import core.time;
151   shared int p = 0;
152   5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true;
153   p.should == 6;
154 }
155 
156 @("take.enough")
157 @safe unittest {
158   shared int p = 0;
159 
160   [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
161   p.should == 3;
162 }
163 
164 @("take.too-few")
165 @safe unittest {
166   shared int p = 0;
167 
168   [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true;
169   p.should == 6;
170 }
171 
172 @("take.donestream")
173 @safe unittest {
174   doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true;
175 }
176 
177 @("take.errorstream")
178 @safe unittest {
179   errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad");
180 }
181 
182 @("sample.trigger.stop")
183 @safe unittest {
184   import core.time;
185   auto sampler = 7.msecs.intervalStream()
186     .scan((int acc) => acc+1, 0)
187     .sample(10.msecs.intervalStream().take(3))
188     .collect((int i) shared {})
189     .syncWait().isOk.should == true;
190 }
191 
192 @("sample.slower")
193 @safe unittest {
194   import core.time;
195   import concurrency.operations : withScheduler, whenAll;
196   import concurrency.sender : justFrom;
197 
198   shared int p = 0;
199   import concurrency.scheduler : ManualTimeWorker;
200 
201   auto worker = new shared ManualTimeWorker();
202 
203   auto sampler = 7.msecs
204     .intervalStream()
205     .scan((int acc) => acc+1, 0)
206     .sample(10.msecs.intervalStream())
207     .take(3)
208     .collect((int i) shared { p.atomicOp!"+="(i); })
209     .withScheduler(worker.getScheduler);
210 
211   auto driver = justFrom(() shared {
212       worker.advance(7.msecs);
213       p.atomicLoad.should == 0;
214       worker.timeUntilNextEvent().should == 3.msecs;
215 
216       worker.advance(3.msecs);
217       p.atomicLoad.should == 1;
218       worker.timeUntilNextEvent().should == 4.msecs;
219 
220       worker.advance(4.msecs);
221       p.atomicLoad.should == 1;
222       worker.timeUntilNextEvent().should == 6.msecs;
223 
224       worker.advance(6.msecs);
225       p.atomicLoad.should == 3;
226       worker.timeUntilNextEvent().should == 1.msecs;
227 
228       worker.advance(1.msecs);
229       p.atomicLoad.should == 3;
230       worker.timeUntilNextEvent().should == 7.msecs;
231 
232       worker.advance(7.msecs);
233       p.atomicLoad.should == 3;
234       worker.timeUntilNextEvent().should == 2.msecs;
235 
236       worker.advance(2.msecs);
237       p.atomicLoad.should == 7;
238       worker.timeUntilNextEvent().should == null;
239     });
240 
241   whenAll(sampler, driver).syncWait().isOk.should == true;
242 
243   p.should == 7;
244 }
245 
246 @("sample.faster")
247 @safe unittest {
248   import core.time;
249 
250   shared int p = 0;
251 
252   7.msecs
253     .intervalStream()
254     .scan((int acc) => acc+1, 0)
255     .sample(3.msecs.intervalStream())
256     .take(3)
257     .collect((int i) shared { p.atomicOp!"+="(i); })
258     .syncWait().isOk.should == true;
259 
260   p.should == 6;
261 }
262 
263 @("sharedStream")
264 @safe unittest {
265   import concurrency.operations : then, race;
266 
267   auto source = sharedStream!int;
268 
269   shared int p = 0;
270 
271   auto emitter = ThreadSender().then(() shared {
272       source.emit(6);
273       source.emit(12);
274     });
275   auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); });
276 
277   race(collector, emitter).syncWait().isOk.should == true;
278 
279   p.atomicLoad.should == 18;
280 }
281 
282 @("throttling.throttleLast")
283 @safe unittest {
284   import core.time;
285 
286   shared int p = 0;
287 
288   1.msecs
289     .intervalStream()
290     .scan((int acc) => acc+1, 0)
291     .throttleLast(3.msecs)
292     .take(6)
293     .collect((int i) shared { p.atomicOp!"+="(i); })
294     .syncWait().isOk.should == true;
295 
296   p.atomicLoad.shouldBeGreaterThan(40);
297 }
298 
299 @("throttling.throttleLast.arrayStream")
300 @safe unittest {
301   import core.time;
302 
303   shared int p = 0;
304 
305   [1,2,3].arrayStream()
306     .throttleLast(30.msecs)
307     .collect((int i) shared { p.atomicOp!"+="(i); })
308     .syncWait().isOk.should == true;
309 
310   p.atomicLoad.should == 3;
311 }
312 
313 @("throttling.throttleLast.exception")
314 @safe unittest {
315   import core.time;
316 
317   1.msecs
318     .intervalStream()
319     .throttleLast(10.msecs)
320     .collect(() shared { throw new Exception("Bla"); })
321     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
322 }
323 
324 @("throttling.throttleLast.thread")
325 @safe unittest {
326   import core.time;
327 
328   shared int p = 0;
329 
330   1.msecs
331     .intervalStream()
332     .via(ThreadSender())
333     .scan((int acc) => acc+1, 0)
334     .throttleLast(3.msecs)
335     .take(6)
336     .collect((int i) shared { p.atomicOp!"+="(i); })
337     .syncWait().isOk.should == true;
338 
339   p.atomicLoad.shouldBeGreaterThan(40);
340 }
341 
342 @("throttling.throttleLast.thread.arrayStream")
343 @safe unittest {
344   import core.time;
345 
346   shared int p = 0;
347 
348   [1,2,3].arrayStream()
349     .via(ThreadSender())
350     .throttleLast(30.msecs)
351     .collect((int i) shared { p.atomicOp!"+="(i); })
352     .syncWait().isOk.should == true;
353 
354   p.atomicLoad.should == 3;
355 }
356 
357 @("throttling.throttleLast.thread.exception")
358 @safe unittest {
359   import core.time;
360 
361   1.msecs
362     .intervalStream()
363     .via(ThreadSender())
364     .throttleLast(10.msecs)
365     .collect(() shared { throw new Exception("Bla"); })
366     .syncWait.assumeOk.shouldThrowWithMessage("Bla");
367 }
368 
369 @("throttling.throttleFirst")
370 @safe unittest {
371   import core.time;
372   import concurrency.scheduler : ManualTimeWorker;
373   import concurrency.operations : withScheduler, whenAll;
374   import concurrency.sender : justFrom;
375 
376   shared int p = 0;
377   auto worker = new shared ManualTimeWorker();
378 
379   auto throttled = 1.msecs
380     .intervalStream()
381     .scan((int acc) => acc+1, 0)
382     .throttleFirst(3.msecs)
383     .take(2)
384     .collect((int i) shared { p.atomicOp!"+="(i); })
385     .withScheduler(worker.getScheduler);
386 
387   auto driver = justFrom(() shared {
388       p.atomicLoad.should == 0;
389 
390       worker.advance(1.msecs);
391       p.atomicLoad.should == 1;
392 
393       worker.advance(1.msecs);
394       p.atomicLoad.should == 1;
395 
396       worker.advance(1.msecs);
397       p.atomicLoad.should == 1;
398 
399       worker.advance(1.msecs);
400       p.atomicLoad.should == 5;
401 
402       worker.timeUntilNextEvent().should == null;
403     });
404   whenAll(throttled, driver).syncWait().isOk.should == true;
405 
406   p.should == 5;
407 }
408 
409 @("throttling.debounce")
410 @safe unittest {
411   import core.time;
412   import concurrency.scheduler : ManualTimeWorker;
413   import concurrency.operations : withScheduler, whenAll;
414   import concurrency.sender : justFrom;
415 
416   shared int p = 0;
417   auto worker = new shared ManualTimeWorker();
418   auto source = sharedStream!int;
419 
420   auto throttled = source
421     .debounce(3.msecs)
422     .take(2)
423     .collect((int i) shared { p.atomicOp!"+="(i); })
424     .withScheduler(worker.getScheduler);
425 
426   auto driver = justFrom(() shared {
427       source.emit(1);
428       p.atomicLoad.should == 0;
429       worker.timeUntilNextEvent().should == 3.msecs;
430 
431       worker.advance(3.msecs);
432       p.atomicLoad.should == 1;
433 
434       source.emit(2);
435       p.atomicLoad.should == 1;
436       worker.timeUntilNextEvent().should == 3.msecs;
437 
438       source.emit(3);
439       p.atomicLoad.should == 1;
440       worker.timeUntilNextEvent().should == 3.msecs;
441 
442       worker.advance(1.msecs);
443       p.atomicLoad.should == 1;
444       worker.timeUntilNextEvent().should == 2.msecs;
445 
446       source.emit(4);
447       p.atomicLoad.should == 1;
448       worker.timeUntilNextEvent().should == 3.msecs;
449 
450       worker.advance(3.msecs);
451       p.atomicLoad.should == 5;
452 
453       worker.timeUntilNextEvent().should == null;
454     });
455   whenAll(throttled, driver).syncWait().isOk.should == true;
456 
457   p.should == 5;
458 }
459 
460 @("slide")
461 @safe unittest {
462   import std.stdio;
463   import std.functional : toDelegate;
464   import std.algorithm : sum;
465   shared int p;
466 
467   [1,2,3,4,5,6,7].arrayStream
468     .slide(3)
469     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
470     .syncWait.isOk.should == true;
471 
472   p.should == 60;
473 
474   [1,2].arrayStream
475     .slide(3)
476     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
477     .syncWait.isOk.should == true;
478 
479   p.should == 60;
480 }
481 
482 @("slide.step")
483 @safe unittest {
484   import std.stdio;
485   import std.functional : toDelegate;
486   import std.algorithm : sum;
487   shared int p;
488 
489   [1,2,3,4,5,6,7].arrayStream
490     .slide(3, 2)
491     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
492     .syncWait.isOk.should == true;
493 
494   p.should == 36;
495 
496   [1,2].arrayStream
497     .slide(2, 2)
498     .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); })
499     .syncWait.isOk.should == true;
500 
501   p.should == 39;
502 }
503 
504 @("toList.arrayStream")
505 @safe unittest {
506   [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3];
507 }
508 
509 @("toList.arrayStream.whenAll")
510 @safe unittest {
511   import concurrency.operations : withScheduler, whenAll;
512   import std.typecons : tuple;
513   auto s1 = [1,2,3].arrayStream.toList;
514   auto s2 = [2,3,4].arrayStream.toList;
515   whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]);
516 }