1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     void run() {
23       receiver.setValue(value);
24     }
25     void start() @trusted scope {
26       auto value = new Thread(&this.run).start();
27     }
28   }
29   auto connect(Receiver)(return Receiver receiver) @safe scope return {
30     // ensure NRVO
31     auto op = Op!(Receiver)(receiver, value);
32     return op;
33   }
34 }
35 
36 @("ignoreErrors.syncWait.value")
37 @safe unittest {
38   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
39   ThreadSender()
40     .then(dg)
41     .ignoreError()
42     .syncWait.isCancelled.should == true;
43 }
44 
45 @("oob")
46 unittest {
47   auto oob = OutOfBandValueSender!int(43);
48   oob.syncWait.value.should == 43;
49 }
50 
51 @("race")
52 unittest {
53   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
54   auto fastThread = ThreadSender().then(() shared => 1);
55   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
56   race(fastThread, slowThread).syncWait.value.should == 1;
57   race(slowThread, fastThread).syncWait.value.should == 1;
58 }
59 
60 @("race.multiple")
61 unittest {
62   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
63 }
64 
65 @("race.exception.single")
66 unittest {
67   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
68   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
69 }
70 
71 @("race.exception.double")
72 unittest {
73   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
74   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
75   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
76 }
77 
78 @("race.cancel-other")
79 unittest {
80   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
81       while (!token.isStopRequested) { Thread.yield(); }
82     });
83   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
84 }
85 
86 @("race.cancel")
87 unittest {
88   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
89       while (!token.isStopRequested) { Thread.yield(); }
90     });
91   auto nursery = new shared Nursery();
92   nursery.run(race(waiting, waiting));
93   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
94   nursery.syncWait.isCancelled.should == true;
95 }
96 
97 @("race.array.just")
98 @safe unittest {
99   race([just(4), just(5)]).syncWait.value.should == 4;
100 }
101 
102 @("race.array.void")
103 @safe unittest {
104   race([VoidSender(), VoidSender()]).syncWait.assumeOk;
105 }
106 
107 @("via")
108 unittest {
109   import std.typecons : tuple;
110   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
111   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
112   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
113 }
114 
115 @("then.value.delegate")
116 @safe unittest {
117   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
118 }
119 
120 @("then.value.function")
121 @safe unittest {
122   ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9);
123 }
124 
125 @("then.oob")
126 @safe unittest {
127   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
128 }
129 
130 @("then.tuple")
131 @safe unittest {
132   just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1);
133 }
134 
135 @("then.tuple.expand")
136 @safe unittest {
137   just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3);
138 }
139 
140 @("finally")
141 unittest {
142   ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4;
143   ValueSender!int(2).finally_(3).syncWait.value.should == 3;
144   ThrowingSender().finally_(3).syncWait.value.should == 3;
145   ThrowingSender().finally_(() => 4).syncWait.value.should == 4;
146   ThrowingSender().finally_(3).syncWait.value.should == 3;
147   DoneSender().finally_(() => 4).syncWait.isCancelled.should == true;
148   DoneSender().finally_(3).syncWait.isCancelled.should == true;
149 }
150 
151 @("whenAll.basic")
152 unittest {
153   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
154   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
155   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
156   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
157   whenAll(VoidSender(), VoidSender()).syncWait.assumeOk;
158   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
159   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
160   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
161   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
162   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
163   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
164 
165 }
166 
167 @("whenAll.cancel")
168 unittest {
169   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
170       while (!token.isStopRequested) { Thread.yield(); }
171     });
172   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
173   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
174   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
175   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
176       while (!token.isStopRequested) { Thread.yield(); }
177       return 42;
178     });
179   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
180   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
181   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
182 }
183 
184 @("whenAll.stop")
185 unittest {
186   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
187       while (!token.isStopRequested) { Thread.yield(); }
188     });
189   auto source = new StopSource();
190   auto stopper = just(source).then((StopSource source) shared => source.stop());
191   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
192 }
193 
194 @("whenAll.array.just")
195 unittest {
196   whenAll([just(4), just(5)]).syncWait.value.should == [4,5];
197 }
198 
199 @("whenAll.array.void")
200 unittest {
201   whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk;
202 }
203 
204 @("retry")
205 unittest {
206   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
207   int t = 3;
208   int n = 0;
209   struct Sender {
210     alias Value = void;
211     static struct Op(Receiver) {
212       Receiver receiver;
213       bool fail;
214       void start() @safe nothrow {
215         if (fail)
216           receiver.setError(new Exception("Fail fail fail"));
217         else
218           receiver.setValue();
219       }
220     }
221     auto connect(Receiver)(return Receiver receiver) @safe scope return {
222       // ensure NRVO
223       auto op = Op!(Receiver)(receiver, n++ < t);
224       return op;
225     }
226   }
227   Sender().retry(Times(5)).syncWait.assumeOk;
228   n.should == 4;
229   n = 0;
230 
231   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
232   n.should == 2;
233   shared int p = 0;
234   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
235   p.should == 5;
236 }
237 
238 @("whenAll.oob")
239 unittest {
240   auto oob = OutOfBandValueSender!int(43);
241   auto value = ValueSender!int(11);
242   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
243 }
244 
245 @("withStopToken.oob")
246 unittest {
247   auto oob = OutOfBandValueSender!int(44);
248   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
249 }
250 
251 @("withStopSource.oob")
252 unittest {
253   auto oob = OutOfBandValueSender!int(45);
254   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
255 }
256 
257 @("withStopSource.tuple")
258 unittest {
259   just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742;
260 }
261 
262 @("value.withstoptoken.via.thread")
263 @safe unittest {
264   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
265 }
266 
267 @("completewithcancellation")
268 @safe unittest {
269   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
270 }
271 
272 @("raceAll")
273 @safe unittest {
274   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
275       while (!token.isStopRequested) { Thread.yield(); }
276     });
277   raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
278   raceAll(waiting, just(42)).syncWait.value.should == 42;
279   raceAll(waiting, ThrowingSender()).syncWait.isError.should == true;
280 }
281 
282 @("on.ManualTimeWorker")
283 @safe unittest {
284   import concurrency.scheduler : ManualTimeWorker;
285 
286   auto worker = new shared ManualTimeWorker();
287   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
288       worker.timeUntilNextEvent().should == 10.msecs;
289       worker.advance(5.msecs);
290       worker.timeUntilNextEvent().should == 5.msecs;
291       worker.advance(5.msecs);
292       worker.timeUntilNextEvent().should == null;
293     });
294   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
295 
296   whenAll(timer, driver).syncWait().assumeOk;
297 }
298 
299 @("on.ManualTimeWorker.cancel")
300 @safe unittest {
301   import concurrency.scheduler : ManualTimeWorker;
302 
303   auto worker = new shared ManualTimeWorker();
304   auto source = new StopSource();
305   auto driver = just(source).then((StopSource source) shared {
306       worker.timeUntilNextEvent().should == 10.msecs;
307       source.stop();
308       worker.timeUntilNextEvent().should == null;
309     });
310   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
311 
312   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
313 }
314 
315 @("then.stack.no-leak")
316 @safe unittest {
317   struct S {
318     void fun(int i) shared {
319     }
320   }
321   shared S s;
322   // its perfectly ok to point to a function on the stack
323   auto sender = just(42).then(&s.fun);
324 
325   sender.syncWait();
326 
327   void disappearSender(Sender)(Sender s) @safe;
328   // but the sender can't leak now
329   static assert(!__traits(compiles, disappearSender(sender)));
330 }
331 
332 @("forwardOn")
333 @safe unittest {
334   auto pool = stdTaskPool(2);
335 
336   VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk;
337   ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true;
338 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true;
339  just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42;
340 }
341 
342 @("toSingleton")
343 @safe unittest {
344   import std.typecons : tuple;
345   import concurrency.scheduler : ManualTimeWorker;
346   import core.atomic : atomicOp;
347 
348   shared int g;
349 
350   auto worker = new shared ManualTimeWorker();
351 
352   auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler);
353 
354   auto driver = justFrom(() shared => worker.advance(2.msecs));
355 
356   whenAll(single, single, driver).syncWait.value.should == tuple(1,1);
357   whenAll(single, single, driver).syncWait.value.should == tuple(2,2);
358 }
359 
360 @("stopOn")
361 @safe unittest {
362   auto sourceInner = new shared StopSource();
363   auto sourceOuter = new shared StopSource();
364 
365   shared bool b;
366   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
367           just(() => sourceOuter.stop())
368           ).syncWait(sourceOuter).assumeOk;
369   b.should == true;
370 
371   shared bool d;
372   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
373           just(() => sourceInner.stop())
374           ).syncWait(sourceOuter).assumeOk;
375   d.should == false;
376 }
377 
378 @("withChild")
379 @safe unittest {
380   import core.atomic;
381 
382   class State {
383     import core.sync.event : Event;
384     bool parentAfterChild;
385     Event childEvent, parentEvent;
386     this() shared @trusted {
387       (cast()childEvent).initialize(false, false);
388       (cast()parentEvent).initialize(false, false);
389     }
390     void signalChild() shared @trusted {
391       (cast()childEvent).set();
392     }
393     void waitChild() shared @trusted {
394       (cast()childEvent).wait();
395     }
396     void signalParent() shared @trusted {
397       (cast()parentEvent).set();
398     }
399     void waitParent() shared @trusted {
400       (cast()parentEvent).wait();
401     }
402   }
403   auto state = new shared State();
404   auto source = new shared StopSource;
405 
406   import std.stdio;
407   auto child = just(state).withStopToken((StopToken token, shared State state) @trusted {
408       while(!token.isStopRequested) {}
409       state.signalParent();
410       state.waitChild();
411     }).via(ThreadSender());
412 
413   auto parent = just(state).withStopToken((StopToken token, shared State state){
414       state.waitParent();
415       state.parentAfterChild.atomicStore(token.isStopRequested == false);
416       state.signalChild();
417     }).via(ThreadSender());
418 
419   whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled;
420 
421   state.parentAfterChild.atomicLoad.should == true;
422 }
423 
424 @("onTermination.value")
425 @safe unittest {
426   import core.atomic : atomicOp;
427   shared int g = 0;
428   just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
429   g.should == 1;
430 }
431 
432 @("onTermination.done")
433 @safe unittest {
434   import core.atomic : atomicOp;
435   shared int g = 0;
436   DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
437   g.should == 1;
438 }
439 
440 @("onTermination.error")
441 @safe unittest {
442   import core.atomic : atomicOp;
443   shared int g = 0;
444   ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
445   g.should == 1;
446 }
447 
448 @("onError.value")
449 @safe unittest {
450   import core.atomic : atomicOp;
451   shared int g = 0;
452   just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
453   g.should == 0;
454 }
455 
456 @("onError.done")
457 @safe unittest {
458   import core.atomic : atomicOp;
459   shared int g = 0;
460   DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
461   g.should == 0;
462 }
463 
464 @("onError.error")
465 @safe unittest {
466   import core.atomic : atomicOp;
467   shared int g = 0;
468   ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
469   g.should == 1;
470 }
471 
472 @("onError.throw")
473 @safe unittest {
474   import core.exception : AssertError;
475   auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.error;
476   err.msg.should == "in onError";
477   err.next.msg.should == "ThrowingSender";
478 }
479 
480 @("stopWhen.source.value")
481 @safe unittest {
482   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
483       while (!token.isStopRequested) { Thread.yield(); }
484       return 43;
485     });
486   auto trigger = delay(100.msecs);
487   waiting.stopWhen(trigger).syncWait().value.should == 43;
488 }
489 
490 @("stopWhen.source.error")
491 @safe unittest {
492   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
493       while (!token.isStopRequested) { Thread.yield(); }
494       throw new Exception("Upside down");
495     });
496   auto trigger = delay(100.msecs);
497   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down");
498 }
499 
500 @("stopWhen.source.cancelled")
501 @safe unittest {
502   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
503       while (!token.isStopRequested) { Thread.yield(); }
504     }).completeWithCancellation;
505   auto trigger = delay(100.msecs);
506   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
507 }
508 
509 @("stopWhen.trigger.error")
510 @safe unittest {
511   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
512       while (!token.isStopRequested) { Thread.yield(); }
513       throw new Exception("This occurres later, so the other one gets propagated");
514     });
515   auto trigger = ThrowingSender();
516   waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender");
517 }
518 
519 @("stopWhen.trigger.cancelled.value")
520 @safe unittest {
521   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
522       while (!token.isStopRequested) { Thread.yield(); }
523       return 42;
524     });
525   auto trigger = delay(100.msecs).completeWithCancellation;
526   waiting.stopWhen(trigger).syncWait().isCancelled.should == true;
527 }