1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     void run() {
23       receiver.setValue(value);
24     }
25     void start() @trusted scope {
26       auto value = new Thread(&this.run).start();
27     }
28   }
29   auto connect(Receiver)(return Receiver receiver) @safe scope return {
30     // ensure NRVO
31     auto op = Op!(Receiver)(receiver, value);
32     return op;
33   }
34 }
35 
36 @("ignoreErrors.syncWait.value")
37 @safe unittest {
38   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
39   ThreadSender()
40     .then(dg)
41     .ignoreError()
42     .syncWait.isCancelled.should == true;
43 }
44 
45 @("oob")
46 unittest {
47   auto oob = OutOfBandValueSender!int(43);
48   oob.syncWait.value.should == 43;
49 }
50 
51 @("race")
52 unittest {
53   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
54   auto fastThread = ThreadSender().then(() shared => 1);
55   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
56   race(fastThread, slowThread).syncWait.value.should == 1;
57   race(slowThread, fastThread).syncWait.value.should == 1;
58 }
59 
60 @("race.multiple")
61 unittest {
62   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
63 }
64 
65 @("race.exception.single")
66 unittest {
67   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
68   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
69 }
70 
71 @("race.exception.double")
72 unittest {
73   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
74   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
75   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
76 }
77 
78 @("race.cancel-other")
79 unittest {
80   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
81       while (!token.isStopRequested) { Thread.yield(); }
82     });
83   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
84 }
85 
86 @("race.cancel")
87 unittest {
88   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
89       while (!token.isStopRequested) { Thread.yield(); }
90     });
91   auto nursery = new shared Nursery();
92   nursery.run(race(waiting, waiting));
93   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
94   nursery.syncWait.isCancelled.should == true;
95 }
96 
97 @("race.array.just")
98 @safe unittest {
99   race([just(4), just(5)]).syncWait.value.should == 4;
100 }
101 
102 @("race.array.void")
103 @safe unittest {
104   race([VoidSender(), VoidSender()]).syncWait.assumeOk;
105 }
106 
107 @("via")
108 unittest {
109   import std.typecons : tuple;
110   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
111   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
112   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
113 }
114 
115 @("then.value.delegate")
116 @safe unittest {
117   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
118 }
119 
120 @("then.value.function")
121 @safe unittest {
122   ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9);
123 }
124 
125 @("then.oob")
126 @safe unittest {
127   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
128 }
129 
130 @("then.tuple")
131 @safe unittest {
132   just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1);
133 }
134 
135 @("then.tuple.expand")
136 @safe unittest {
137   just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3);
138 }
139 
140 @("finally")
141 unittest {
142   ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4;
143   ValueSender!int(2).finally_(3).syncWait.value.should == 3;
144   ThrowingSender().finally_(3).syncWait.value.should == 3;
145   ThrowingSender().finally_(() => 4).syncWait.value.should == 4;
146   ThrowingSender().finally_(3).syncWait.value.should == 3;
147   DoneSender().finally_(() => 4).syncWait.isCancelled.should == true;
148   DoneSender().finally_(3).syncWait.isCancelled.should == true;
149 }
150 
151 @("whenAll.basic")
152 unittest {
153   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
154   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
155   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
156   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
157   whenAll(VoidSender(), VoidSender()).syncWait.assumeOk;
158   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
159   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
160   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
161   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
162   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
163   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
164 
165 }
166 
167 @("whenAll.cancel")
168 unittest {
169   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
170       while (!token.isStopRequested) { Thread.yield(); }
171     });
172   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
173   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
174   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
175   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
176       while (!token.isStopRequested) { Thread.yield(); }
177       return 42;
178     });
179   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
180   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
181   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
182 }
183 
184 @("whenAll.stop")
185 unittest {
186   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
187       while (!token.isStopRequested) { Thread.yield(); }
188     });
189   auto source = new StopSource();
190   auto stopper = just(source).then((StopSource source) shared => source.stop());
191   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
192 }
193 
194 @("whenAll.array.just")
195 unittest {
196   whenAll([just(4), just(5)]).syncWait.value.should == [4,5];
197 }
198 
199 @("whenAll.array.void")
200 unittest {
201   whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk;
202 }
203 
204 @("retry")
205 unittest {
206   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
207   int t = 3;
208   int n = 0;
209   struct Sender {
210     alias Value = void;
211     static struct Op(Receiver) {
212       Receiver receiver;
213       bool fail;
214       void start() @safe nothrow {
215         if (fail)
216           receiver.setError(new Exception("Fail fail fail"));
217         else
218           receiver.setValue();
219       }
220     }
221     auto connect(Receiver)(return Receiver receiver) @safe scope return {
222       // ensure NRVO
223       auto op = Op!(Receiver)(receiver, n++ < t);
224       return op;
225     }
226   }
227   Sender().retry(Times(5)).syncWait.assumeOk;
228   n.should == 4;
229   n = 0;
230 
231   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
232   n.should == 2;
233   shared int p = 0;
234   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
235   p.should == 5;
236 }
237 
238 @("retryWhen.immediate.success")
239 unittest {
240   static struct Immediate {
241     auto failure(Exception e) {
242       return VoidSender();
243     }
244   }
245 
246   VoidSender().retryWhen(Immediate()).syncWait.assumeOk;
247 }
248 
249 struct ConnectCounter {
250   alias Value = int;
251   int counter = 0;
252   auto connect(Receiver)(return Receiver receiver) @trusted scope return {
253     // ensure NRVO
254     auto op = ValueSender!int(counter++).connect(receiver);
255     return op;
256   }
257 }
258 
259 @("retryWhen.immediate.retries")
260 unittest {
261   static struct Immediate {
262     auto failure(Exception e) {
263       return VoidSender();
264     }
265   }
266   ConnectCounter()
267     .then((int c) { if (c < 3) throw new Exception("jada"); return c; })
268     .retryWhen(Immediate())
269     .syncWait.value.should == 3;
270 }
271 
272 @("retryWhen.wait.retries")
273 unittest {
274   import core.time : msecs;
275   import concurrency.scheduler : ManualTimeWorker;
276 
277   static struct Wait {
278     auto failure(Exception e) @safe {
279       return delay(3.msecs);
280     }
281   }
282 
283   auto worker = new shared ManualTimeWorker();
284   auto sender = ConnectCounter()
285     .then((int c) { if (c < 3) throw new Exception("jada"); return c; })
286     .retryWhen(Wait())
287     .withScheduler(worker.getScheduler);
288 
289   auto driver = just(worker).then((shared ManualTimeWorker worker) {
290       worker.timeUntilNextEvent().should == 3.msecs;
291       worker.advance(3.msecs);
292       worker.timeUntilNextEvent().should == 3.msecs;
293       worker.advance(3.msecs);
294       worker.timeUntilNextEvent().should == 3.msecs;
295       worker.advance(3.msecs);
296       worker.timeUntilNextEvent().should == null;
297     });
298 
299   whenAll(sender, driver).syncWait.value.should == 3;
300 }
301 
302 @("retryWhen.throw")
303 unittest {
304   static struct Throw {
305     auto failure(Exception t) @safe {
306       return ErrorSender(new Exception("inner"));
307     }
308   }
309 
310   ErrorSender(new Exception("outer")).retryWhen(Throw()).syncWait.assumeOk.shouldThrowWithMessage("inner");
311 }
312 
313 @("whenAll.oob")
314 unittest {
315   auto oob = OutOfBandValueSender!int(43);
316   auto value = ValueSender!int(11);
317   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
318 }
319 
320 @("withStopToken.oob")
321 unittest {
322   auto oob = OutOfBandValueSender!int(44);
323   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
324 }
325 
326 @("withStopSource.oob")
327 unittest {
328   auto oob = OutOfBandValueSender!int(45);
329   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
330 }
331 
332 @("withStopSource.tuple")
333 unittest {
334   just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742;
335 }
336 
337 @("value.withstoptoken.via.thread")
338 @safe unittest {
339   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
340 }
341 
342 @("completewithcancellation")
343 @safe unittest {
344   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
345 }
346 
347 @("raceAll")
348 @safe unittest {
349   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
350       while (!token.isStopRequested) { Thread.yield(); }
351     });
352   raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
353   raceAll(waiting, just(42)).syncWait.value.should == 42;
354   raceAll(waiting, ThrowingSender()).syncWait.isError.should == true;
355 }
356 
357 @("on.ManualTimeWorker")
358 @safe unittest {
359   import concurrency.scheduler : ManualTimeWorker;
360 
361   auto worker = new shared ManualTimeWorker();
362   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
363       worker.timeUntilNextEvent().should == 10.msecs;
364       worker.advance(5.msecs);
365       worker.timeUntilNextEvent().should == 5.msecs;
366       worker.advance(5.msecs);
367       worker.timeUntilNextEvent().should == null;
368     });
369   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
370 
371   whenAll(timer, driver).syncWait().assumeOk;
372 }
373 
374 @("on.ManualTimeWorker.cancel")
375 @safe unittest {
376   import concurrency.scheduler : ManualTimeWorker;
377 
378   auto worker = new shared ManualTimeWorker();
379   auto source = new StopSource();
380   auto driver = just(source).then((StopSource source) shared {
381       worker.timeUntilNextEvent().should == 10.msecs;
382       source.stop();
383       worker.timeUntilNextEvent().should == null;
384     });
385   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
386 
387   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
388 }
389 
390 @("then.stack.no-leak")
391 @safe unittest {
392   struct S {
393     void fun(int i) shared {
394     }
395   }
396   shared S s;
397   // its perfectly ok to point to a function on the stack
398   auto sender = just(42).then(&s.fun);
399 
400   sender.syncWait();
401 
402   void disappearSender(Sender)(Sender s) @safe;
403   // but the sender can't leak now
404   static assert(!__traits(compiles, disappearSender(sender)));
405 }
406 
407 @("forwardOn")
408 @safe unittest {
409   auto pool = stdTaskPool(2);
410 
411   VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk;
412   ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true;
413 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true;
414  just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42;
415 }
416 
417 @("toSingleton")
418 @safe unittest {
419   import std.typecons : tuple;
420   import concurrency.scheduler : ManualTimeWorker;
421   import core.atomic : atomicOp;
422 
423   shared int g;
424 
425   auto worker = new shared ManualTimeWorker();
426 
427   auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler);
428 
429   auto driver = justFrom(() shared => worker.advance(2.msecs));
430 
431   whenAll(single, single, driver).syncWait.value.should == tuple(1,1);
432   whenAll(single, single, driver).syncWait.value.should == tuple(2,2);
433 }
434 
435 @("stopOn")
436 @safe unittest {
437   auto sourceInner = new shared StopSource();
438   auto sourceOuter = new shared StopSource();
439 
440   shared bool b;
441   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
442           just(() => sourceOuter.stop())
443           ).syncWait(sourceOuter).assumeOk;
444   b.should == true;
445 
446   shared bool d;
447   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
448           just(() => sourceInner.stop())
449           ).syncWait(sourceOuter).assumeOk;
450   d.should == false;
451 }
452 
453 @("withChild")
454 @safe unittest {
455   import core.atomic;
456 
457   class State {
458     import core.sync.event : Event;
459     bool parentAfterChild;
460     Event childEvent, parentEvent;
461     this() shared @trusted {
462       (cast()childEvent).initialize(false, false);
463       (cast()parentEvent).initialize(false, false);
464     }
465     void signalChild() shared @trusted {
466       (cast()childEvent).set();
467     }
468     void waitChild() shared @trusted {
469       (cast()childEvent).wait();
470     }
471     void signalParent() shared @trusted {
472       (cast()parentEvent).set();
473     }
474     void waitParent() shared @trusted {
475       (cast()parentEvent).wait();
476     }
477   }
478   auto state = new shared State();
479   auto source = new shared StopSource;
480 
481   import std.stdio;
482   auto child = just(state).withStopToken((StopToken token, shared State state) @trusted {
483       while(!token.isStopRequested) {}
484       state.signalParent();
485       state.waitChild();
486     }).via(ThreadSender());
487 
488   auto parent = just(state).withStopToken((StopToken token, shared State state){
489       state.waitParent();
490       state.parentAfterChild.atomicStore(token.isStopRequested == false);
491       state.signalChild();
492     }).via(ThreadSender());
493 
494   whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled.should == true;
495 
496   state.parentAfterChild.atomicLoad.should == true;
497 }
498 
499 @("onTermination.value")
500 @safe unittest {
501   import core.atomic : atomicOp;
502   shared int g = 0;
503   just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
504   g.should == 1;
505 }
506 
507 @("onTermination.done")
508 @safe unittest {
509   import core.atomic : atomicOp;
510   shared int g = 0;
511   DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
512   g.should == 1;
513 }
514 
515 @("onTermination.error")
516 @safe unittest {
517   import core.atomic : atomicOp;
518   shared int g = 0;
519   ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
520   g.should == 1;
521 }
522 
523 @("onError.value")
524 @safe unittest {
525   import core.atomic : atomicOp;
526   shared int g = 0;
527   just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
528   g.should == 0;
529 }
530 
531 @("onError.done")
532 @safe unittest {
533   import core.atomic : atomicOp;
534   shared int g = 0;
535   DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
536   g.should == 0;
537 }
538 
539 @("onError.error")
540 @safe unittest {
541   import core.atomic : atomicOp;
542   shared int g = 0;
543   ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
544   g.should == 1;
545 }
546 
547 @("onError.throw")
548 @safe unittest {
549   import core.exception : AssertError;
550   auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.get!Exception;
551   err.msg.should == "in onError";
552   err.next.msg.should == "ThrowingSender";
553 }
554 
555 @("stopWhen.source.value")
556 @safe unittest {
557   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
558       while (!token.isStopRequested) { Thread.yield(); }
559       return 43;
560     });
561   auto trigger = delay(100.msecs);
562   waiting.stopWhen(trigger).syncWait().value.should == 43;
563 }
564 
565 @("stopWhen.source.error")
566 @safe unittest {
567   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
568       while (!token.isStopRequested) { Thread.yield(); }
569       throw new Exception("Upside down");
570     });
571   auto trigger = delay(100.msecs);
572   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down");
573 }
574 
575 @("stopWhen.source.cancelled")
576 @safe unittest {
577   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
578       while (!token.isStopRequested) { Thread.yield(); }
579     }).completeWithCancellation;
580   auto trigger = delay(100.msecs);
581   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
582 }
583 
584 @("stopWhen.trigger.error")
585 @safe unittest {
586   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
587       while (!token.isStopRequested) { Thread.yield(); }
588       throw new Exception("This occurres later, so the other one gets propagated");
589     });
590   auto trigger = ThrowingSender();
591   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender");
592 }
593 
594 @("stopWhen.trigger.cancelled.value")
595 @safe unittest {
596   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
597       while (!token.isStopRequested) { Thread.yield(); }
598       return 42;
599     });
600   auto trigger = delay(100.msecs).completeWithCancellation;
601   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
602 }
603 
604 @("completewitherror.basic")
605 @safe unittest {
606   ValueSender!void().completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage("hello");
607 }
608 
609 @("completewitherror.exception.base")
610 @safe unittest {
611   ErrorSender(new Exception("not you")).completeWithError(new Exception("overridden")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("overridden");
612 }
613 
614 @("completewitherror.throwable.base")
615 @safe unittest {
616   ErrorSender(new Throwable("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("precedence");
617 }
618 
619 @("completewitherror.error.base")
620 @safe unittest {
621   ErrorSender(new Error("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Error("precedence");
622 }
623 
624 @("onCompletion.value")
625 @safe unittest {
626   import core.atomic : atomicOp;
627   shared int g = 0;
628   just(42).onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
629   g.should == 1;
630 }
631 
632 @("onCompletion.done")
633 @safe unittest {
634   import core.atomic : atomicOp;
635   shared int g = 0;
636   DoneSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
637   g.should == 1;
638 }
639 
640 @("onCompletion.error")
641 @safe unittest {
642   import core.atomic : atomicOp;
643   shared int g = 0;
644   ThrowingSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
645   g.should == 0;
646 }