1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     @disable this(ref return scope typeof(this) rhs);
23     @disable this(this);
24     void run() {
25       receiver.setValue(value);
26     }
27     void start() @trusted scope {
28       auto value = new Thread(&this.run).start();
29     }
30   }
31   auto connect(Receiver)(return Receiver receiver) @safe return scope {
32     // ensure NRVO
33     auto op = Op!(Receiver)(receiver, value);
34     return op;
35   }
36 }
37 
38 @("ignoreErrors.syncWait.value")
39 @safe unittest {
40   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
41   ThreadSender()
42     .then(dg)
43     .ignoreError()
44     .syncWait.isCancelled.should == true;
45 }
46 
47 @("oob")
48 @safe unittest {
49   auto oob = OutOfBandValueSender!int(43);
50   oob.syncWait.value.should == 43;
51 }
52 
53 @("race")
54 @safe unittest {
55   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
56   auto fastThread = ThreadSender().then(() shared => 1);
57   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
58   race(fastThread, slowThread).syncWait.value.should == 1;
59   race(slowThread, fastThread).syncWait.value.should == 1;
60 }
61 
62 @("race.multiple")
63 @safe unittest {
64   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
65 }
66 
67 @("race.exception.single")
68 @safe unittest {
69   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
70   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
71 }
72 
73 @("race.exception.double")
74 @safe unittest {
75   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
76   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
77   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
78 }
79 
80 @("race.cancel-other")
81 @safe unittest {
82   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
83       while (!token.isStopRequested) { Thread.yield(); }
84     });
85   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
86 }
87 
88 @("race.cancel")
89 @safe unittest {
90   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
91       while (!token.isStopRequested) { Thread.yield(); }
92     });
93   auto nursery = new shared Nursery();
94   nursery.run(race(waiting, waiting));
95   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
96   nursery.syncWait.isCancelled.should == true;
97 }
98 
99 @("race.array.just")
100 @safe unittest {
101   race([just(4), just(5)]).syncWait.value.should == 4;
102 }
103 
104 @("race.array.void")
105 @safe unittest {
106   race([VoidSender(), VoidSender()]).syncWait.assumeOk;
107 }
108 
109 @("via")
110 @safe unittest {
111   import std.typecons : tuple;
112   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
113   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
114   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
115 }
116 
117 @("then.value.delegate")
118 @safe unittest {
119   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
120 }
121 
122 @("then.value.function")
123 @safe unittest {
124   ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9);
125 }
126 
127 @("then.oob")
128 @safe unittest {
129   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
130 }
131 
132 @("then.tuple")
133 @safe unittest {
134   just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1);
135 }
136 
137 @("then.tuple.expand")
138 @safe unittest {
139   just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3);
140 }
141 
142 @("whenAll.basic")
143 @safe unittest {
144   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
145   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
146   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
147   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
148   whenAll(VoidSender(), VoidSender()).syncWait.assumeOk;
149   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
150   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
151   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
152   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
153   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
154   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
155 
156 }
157 
158 @("whenAll.cancel")
159 @safe unittest {
160   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
161       while (!token.isStopRequested) { Thread.yield(); }
162     });
163   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
164   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
165   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
166   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
167       while (!token.isStopRequested) { Thread.yield(); }
168       return 42;
169     });
170   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
171   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
172   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
173 }
174 
175 @("whenAll.stop")
176 @safe unittest {
177   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
178       while (!token.isStopRequested) { Thread.yield(); }
179     });
180   auto source = new StopSource();
181   auto stopper = just(source).then((StopSource source) shared => source.stop());
182   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
183 }
184 
185 @("whenAll.array.just")
186 @safe unittest {
187   whenAll([just(4), just(5)]).syncWait.value.should == [4,5];
188 }
189 
190 @("whenAll.array.void")
191 @safe unittest {
192   whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk;
193 }
194 
195 @("retry")
196 @safe unittest {
197   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
198   int t = 3;
199   int n = 0;
200   struct Sender {
201     alias Value = void;
202     static struct Op(Receiver) {
203       Receiver receiver;
204       bool fail;
205       @disable this(ref return scope typeof(this) rhs);
206       @disable this(this);
207       void start() @safe nothrow {
208         if (fail)
209           receiver.setError(new Exception("Fail fail fail"));
210         else
211           receiver.setValue();
212       }
213     }
214     auto connect(Receiver)(return Receiver receiver) @safe return scope {
215       // ensure NRVO
216       auto op = Op!(Receiver)(receiver, n++ < t);
217       return op;
218     }
219   }
220   Sender().retry(Times(5)).syncWait.assumeOk;
221   n.should == 4;
222   n = 0;
223 
224   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
225   n.should == 2;
226   shared int p = 0;
227   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
228   p.should == 5;
229 }
230 
231 @("retryWhen.immediate.success")
232 @safe unittest {
233   static struct Immediate {
234     auto failure(Exception e) {
235       return VoidSender();
236     }
237   }
238 
239   VoidSender().retryWhen(Immediate()).syncWait.assumeOk;
240 }
241 
242 struct ConnectCounter {
243   alias Value = int;
244   int counter = 0;
245   auto connect(Receiver)(return Receiver receiver) @safe return scope {
246     // ensure NRVO
247     auto op = ValueSender!int(counter++).connect(receiver);
248     return op;
249   }
250 }
251 
252 @("retryWhen.immediate.retries")
253 @safe unittest {
254   static struct Immediate {
255     auto failure(Exception e) {
256       return VoidSender();
257     }
258   }
259   ConnectCounter()
260     .then((int c) { if (c < 3) throw new Exception("jada"); return c; })
261     .retryWhen(Immediate())
262     .syncWait.value.should == 3;
263 }
264 
265 @("retryWhen.wait.retries")
266 @safe unittest {
267   import core.time : msecs;
268   import concurrency.scheduler : ManualTimeWorker;
269 
270   static struct Wait {
271     auto failure(Exception e) @safe {
272       return delay(3.msecs);
273     }
274   }
275 
276   auto worker = new shared ManualTimeWorker();
277   auto sender = ConnectCounter()
278     .then((int c) { if (c < 3) throw new Exception("jada"); return c; })
279     .retryWhen(Wait())
280     .withScheduler(worker.getScheduler);
281 
282   auto driver = just(worker).then((shared ManualTimeWorker worker) {
283       worker.timeUntilNextEvent().should == 3.msecs;
284       worker.advance(3.msecs);
285       worker.timeUntilNextEvent().should == 3.msecs;
286       worker.advance(3.msecs);
287       worker.timeUntilNextEvent().should == 3.msecs;
288       worker.advance(3.msecs);
289       worker.timeUntilNextEvent().should == null;
290     });
291 
292   whenAll(sender, driver).syncWait.value.should == 3;
293 }
294 
295 @("retryWhen.throw")
296 @safe unittest {
297   static struct Throw {
298     auto failure(Exception t) @safe {
299       return ErrorSender(new Exception("inner"));
300     }
301   }
302 
303   ErrorSender(new Exception("outer")).retryWhen(Throw()).syncWait.assumeOk.shouldThrowWithMessage("inner");
304 }
305 
306 @("whenAll.oob")
307 @safe unittest {
308   auto oob = OutOfBandValueSender!int(43);
309   auto value = ValueSender!int(11);
310   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
311 }
312 
313 @("withStopToken.oob")
314 @safe unittest {
315   auto oob = OutOfBandValueSender!int(44);
316   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
317 }
318 
319 @("withStopSource.oob")
320 @safe unittest {
321   auto oob = OutOfBandValueSender!int(45);
322   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
323 }
324 
325 @("withStopSource.tuple")
326 @safe unittest {
327   just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742;
328 }
329 
330 @("value.withstoptoken.via.thread")
331 @safe unittest {
332   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
333 }
334 
335 @("completewithcancellation")
336 @safe unittest {
337   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
338 }
339 
340 @("raceAll")
341 @safe unittest {
342   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
343       while (!token.isStopRequested) { Thread.yield(); }
344     });
345   raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
346   raceAll(waiting, just(42)).syncWait.value.should == 42;
347   raceAll(waiting, ThrowingSender()).syncWait.isError.should == true;
348 }
349 
350 @("on.ManualTimeWorker")
351 @safe unittest {
352   import concurrency.scheduler : ManualTimeWorker;
353 
354   auto worker = new shared ManualTimeWorker();
355   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
356       worker.timeUntilNextEvent().should == 10.msecs;
357       worker.advance(5.msecs);
358       worker.timeUntilNextEvent().should == 5.msecs;
359       worker.advance(5.msecs);
360       worker.timeUntilNextEvent().should == null;
361     });
362   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
363 
364   whenAll(timer, driver).syncWait().assumeOk;
365 }
366 
367 @("on.ManualTimeWorker.cancel")
368 @safe unittest {
369   import concurrency.scheduler : ManualTimeWorker;
370 
371   auto worker = new shared ManualTimeWorker();
372   auto source = new StopSource();
373   auto driver = just(source).then((StopSource source) shared {
374       worker.timeUntilNextEvent().should == 10.msecs;
375       source.stop();
376       worker.timeUntilNextEvent().should == null;
377     });
378   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
379 
380   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
381 }
382 
383 @("then.stack.no-leak")
384 @safe unittest {
385   struct S {
386     void fun(int i) shared {
387     }
388   }
389   shared S s;
390   // its perfectly ok to point to a function on the stack
391   auto sender = just(42).then(&s.fun);
392 
393   sender.syncWait();
394 
395   void disappearSender(Sender)(Sender s) @safe;
396   // but the sender can't leak now
397   static assert(!__traits(compiles, disappearSender(sender)));
398 }
399 
400 @("forwardOn")
401 @safe unittest {
402   auto pool = stdTaskPool(2);
403 
404   VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk;
405   ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true;
406 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true;
407  just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42;
408 }
409 
410 @("toSingleton")
411 @safe unittest {
412   import std.typecons : tuple;
413   import concurrency.scheduler : ManualTimeWorker;
414   import core.atomic : atomicOp;
415 
416   shared int g;
417 
418   auto worker = new shared ManualTimeWorker();
419 
420   auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler);
421 
422   auto driver = justFrom(() shared => worker.advance(2.msecs));
423 
424   whenAll(single, single, driver).syncWait.value.should == tuple(1,1);
425   whenAll(single, single, driver).syncWait.value.should == tuple(2,2);
426 }
427 
428 @("stopOn")
429 @safe unittest {
430   auto sourceInner = new shared StopSource();
431   auto sourceOuter = new shared StopSource();
432 
433   shared bool b;
434   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
435           just(() => sourceOuter.stop())
436           ).syncWait(sourceOuter).assumeOk;
437   b.should == true;
438 
439   shared bool d;
440   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
441           just(() => sourceInner.stop())
442           ).syncWait(sourceOuter).assumeOk;
443   d.should == false;
444 }
445 
446 @("withChild")
447 @safe unittest {
448   import core.atomic;
449 
450   class State {
451     import core.sync.event : Event;
452     bool parentAfterChild;
453     Event childEvent, parentEvent;
454     this() shared @trusted {
455       (cast()childEvent).initialize(false, false);
456       (cast()parentEvent).initialize(false, false);
457     }
458     void signalChild() shared @trusted {
459       (cast()childEvent).set();
460     }
461     void waitChild() shared @trusted {
462       (cast()childEvent).wait();
463     }
464     void signalParent() shared @trusted {
465       (cast()parentEvent).set();
466     }
467     void waitParent() shared @trusted {
468       (cast()parentEvent).wait();
469     }
470   }
471   auto state = new shared State();
472   auto source = new shared StopSource;
473 
474   import std.stdio;
475   auto child = just(state).withStopToken((StopToken token, shared State state) @trusted {
476       while(!token.isStopRequested) {}
477       state.signalParent();
478       state.waitChild();
479     }).via(ThreadSender());
480 
481   auto parent = just(state).withStopToken((StopToken token, shared State state){
482       state.waitParent();
483       state.parentAfterChild.atomicStore(token.isStopRequested == false);
484       state.signalChild();
485     }).via(ThreadSender());
486 
487   whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled.should == true;
488 
489   state.parentAfterChild.atomicLoad.should == true;
490 }
491 
492 @("onTermination.value")
493 @safe unittest {
494   import core.atomic : atomicOp;
495   shared int g = 0;
496   just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
497   g.should == 1;
498 }
499 
500 @("onTermination.done")
501 @safe unittest {
502   import core.atomic : atomicOp;
503   shared int g = 0;
504   DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
505   g.should == 1;
506 }
507 
508 @("onTermination.error")
509 @safe unittest {
510   import core.atomic : atomicOp;
511   shared int g = 0;
512   ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
513   g.should == 1;
514 }
515 
516 @("onError.value")
517 @safe unittest {
518   import core.atomic : atomicOp;
519   shared int g = 0;
520   just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
521   g.should == 0;
522 }
523 
524 @("onError.done")
525 @safe unittest {
526   import core.atomic : atomicOp;
527   shared int g = 0;
528   DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
529   g.should == 0;
530 }
531 
532 @("onError.error")
533 @safe unittest {
534   import core.atomic : atomicOp;
535   shared int g = 0;
536   ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
537   g.should == 1;
538 }
539 
540 @("onError.throw")
541 @safe unittest {
542   import core.exception : AssertError;
543   auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.get!Exception;
544   err.msg.should == "in onError";
545   err.next.msg.should == "ThrowingSender";
546 }
547 
548 @("stopWhen.source.value")
549 @safe unittest {
550   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
551       while (!token.isStopRequested) { Thread.yield(); }
552       return 43;
553     });
554   auto trigger = delay(100.msecs);
555   waiting.stopWhen(trigger).syncWait().value.should == 43;
556 }
557 
558 @("stopWhen.source.error")
559 @safe unittest {
560   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
561       while (!token.isStopRequested) { Thread.yield(); }
562       throw new Exception("Upside down");
563     });
564   auto trigger = delay(100.msecs);
565   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down");
566 }
567 
568 @("stopWhen.source.cancelled")
569 @safe unittest {
570   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
571       while (!token.isStopRequested) { Thread.yield(); }
572     }).completeWithCancellation;
573   auto trigger = delay(100.msecs);
574   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
575 }
576 
577 @("stopWhen.trigger.error")
578 @safe unittest {
579   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
580       while (!token.isStopRequested) { Thread.yield(); }
581       throw new Exception("This occurres later, so the other one gets propagated");
582     });
583   auto trigger = ThrowingSender();
584   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender");
585 }
586 
587 @("stopWhen.trigger.cancelled.value")
588 @safe unittest {
589   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
590       while (!token.isStopRequested) { Thread.yield(); }
591       return 42;
592     });
593   auto trigger = delay(100.msecs).completeWithCancellation;
594   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
595 }
596 
597 @("completewitherror.basic")
598 @safe unittest {
599   ValueSender!void().completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage("hello");
600 }
601 
602 @("completewitherror.exception.base")
603 @safe unittest {
604   ErrorSender(new Exception("not you")).completeWithError(new Exception("overridden")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("overridden");
605 }
606 
607 @("completewitherror.throwable.base")
608 @safe unittest {
609   ErrorSender(new Throwable("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("precedence");
610 }
611 
612 @("completewitherror.error.base")
613 @safe unittest {
614   ErrorSender(new Error("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Error("precedence");
615 }
616 
617 @("onCompletion.value")
618 @safe unittest {
619   import core.atomic : atomicOp;
620   shared int g = 0;
621   just(42).onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
622   g.should == 1;
623 }
624 
625 @("onCompletion.done")
626 @safe unittest {
627   import core.atomic : atomicOp;
628   shared int g = 0;
629   DoneSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
630   g.should == 1;
631 }
632 
633 @("onCompletion.error")
634 @safe unittest {
635   import core.atomic : atomicOp;
636   shared int g = 0;
637   ThrowingSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
638   g.should == 0;
639 }
640 
641 @("onResult.value")
642 @safe unittest {
643   import core.atomic : atomicOp;
644   shared int g = 0;
645   just(42).onResult((Result!int r) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
646   just(42).tee((Result!int r) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
647   g.should == 2;
648 }
649 
650 @("onResult.done")
651 @safe unittest {
652   import core.atomic : atomicOp;
653   shared int g = 0;
654   DoneSender().onResult((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
655   DoneSender().tee((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
656   g.should == 2;
657 }
658 
659 @("onResult.error")
660 @safe unittest {
661   import core.atomic : atomicOp;
662   shared int g = 0;
663   ThrowingSender().onResult((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
664   ThrowingSender().tee((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
665   g.should == 2;
666 }
667 
668 @("repeat.race")
669 @safe unittest {
670   import core.atomic : atomicOp;
671   import concurrency.scheduler : ManualTimeWorker;
672   shared int p = 0;
673 
674   auto worker = new shared ManualTimeWorker();
675 
676   auto base = delay(1.msecs).then(() shared => cast(void)p.atomicOp!"+="(1)).repeat();
677 
678   auto driver = just(worker).then((shared ManualTimeWorker worker) {
679       worker.timeUntilNextEvent().should == 1.msecs;
680       worker.advance(1.msecs);
681       worker.timeUntilNextEvent().should == 1.msecs;
682       worker.advance(1.msecs);
683       worker.timeUntilNextEvent().should == 1.msecs;
684     });
685 
686   race(base, driver).withScheduler(worker.getScheduler).syncWait().assumeOk;
687   p.should == 2;
688 }
689 
690 @("repeat.error")
691 @safe unittest {
692   static struct CountdownOp(Receiver) {
693     Receiver receiver;
694     bool fail;
695     @disable this(ref return scope typeof(this) rhs);
696     @disable this(this);
697     void start() @safe nothrow {
698       if (fail)
699         receiver.setError(new Exception("Bye!"));
700       else
701         receiver.setValueOrError();
702     }
703   }
704 
705   static struct Countdown {
706     alias Value = void;
707     int countdown;
708     auto connect(Receiver)(return Receiver receiver) @safe return scope {
709       // ensure NRVO
710       auto op = CountdownOp!(Receiver)(receiver, countdown-- == 0);
711       return op;
712     }
713   }
714 
715   Countdown(3).syncWait().assumeOk();
716   Countdown(3).repeat().syncWait().isError.should == true;
717 }