1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     void run() {
23       receiver.setValue(value);
24     }
25     void start() @trusted scope {
26       auto value = new Thread(&this.run).start();
27     }
28   }
29   auto connect(Receiver)(return Receiver receiver) @safe scope return {
30     // ensure NRVO
31     auto op = Op!(Receiver)(receiver, value);
32     return op;
33   }
34 }
35 
36 @("ignoreErrors.syncWait.value")
37 @safe unittest {
38   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
39   ThreadSender()
40     .then(dg)
41     .ignoreError()
42     .syncWait.isCancelled.should == true;
43 }
44 
45 @("oob")
46 unittest {
47   auto oob = OutOfBandValueSender!int(43);
48   oob.syncWait.value.should == 43;
49 }
50 
51 @("race")
52 unittest {
53   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
54   auto fastThread = ThreadSender().then(() shared => 1);
55   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
56   race(fastThread, slowThread).syncWait.value.should == 1;
57   race(slowThread, fastThread).syncWait.value.should == 1;
58 }
59 
60 @("race.multiple")
61 unittest {
62   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
63 }
64 
65 @("race.exception.single")
66 unittest {
67   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
68   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
69 }
70 
71 @("race.exception.double")
72 unittest {
73   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
74   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
75   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
76 }
77 
78 @("race.cancel-other")
79 unittest {
80   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
81       while (!token.isStopRequested) { Thread.yield(); }
82     });
83   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
84 }
85 
86 @("race.cancel")
87 unittest {
88   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
89       while (!token.isStopRequested) { Thread.yield(); }
90     });
91   auto nursery = new shared Nursery();
92   nursery.run(race(waiting, waiting));
93   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
94   nursery.syncWait.isCancelled.should == true;
95 }
96 
97 @("via")
98 unittest {
99   import std.typecons : tuple;
100   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
101   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
102   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
103 }
104 
105 @("then.value.delegate")
106 @safe unittest {
107   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
108 }
109 
110 @("then.value.function")
111 @safe unittest {
112   ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9);
113 }
114 
115 @("then.oob")
116 @safe unittest {
117   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
118 }
119 
120 @("then.tuple")
121 @safe unittest {
122   just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1);
123 }
124 
125 @("then.tuple.expand")
126 @safe unittest {
127   just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3);
128 }
129 
130 @("finally")
131 unittest {
132   ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4;
133   ValueSender!int(2).finally_(3).syncWait.value.should == 3;
134   ThrowingSender().finally_(3).syncWait.value.should == 3;
135   ThrowingSender().finally_(() => 4).syncWait.value.should == 4;
136   ThrowingSender().finally_(3).syncWait.value.should == 3;
137   DoneSender().finally_(() => 4).syncWait.isCancelled.should == true;
138   DoneSender().finally_(3).syncWait.isCancelled.should == true;
139 }
140 
141 @("whenAll")
142 unittest {
143   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
144   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
145   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
146   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
147   whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true;
148   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
149   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
150   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
151   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
152   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
153   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
154 
155 }
156 
157 @("whenAll.cancel")
158 unittest {
159   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
160       while (!token.isStopRequested) { Thread.yield(); }
161     });
162   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
163   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
164   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
165   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
166       while (!token.isStopRequested) { Thread.yield(); }
167       return 42;
168     });
169   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
170   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
171   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
172 }
173 
174 @("whenAll.stop")
175 unittest {
176   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
177       while (!token.isStopRequested) { Thread.yield(); }
178     });
179   auto source = new StopSource();
180   auto stopper = just(source).then((StopSource source) shared => source.stop());
181   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
182 }
183 
184 @("retry")
185 unittest {
186   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
187   int t = 3;
188   int n = 0;
189   struct Sender {
190     alias Value = void;
191     static struct Op(Receiver) {
192       Receiver receiver;
193       bool fail;
194       void start() @safe nothrow {
195         if (fail)
196           receiver.setError(new Exception("Fail fail fail"));
197         else
198           receiver.setValue();
199       }
200     }
201     auto connect(Receiver)(return Receiver receiver) @safe scope return {
202       // ensure NRVO
203       auto op = Op!(Receiver)(receiver, n++ < t);
204       return op;
205     }
206   }
207   Sender().retry(Times(5)).syncWait.isOk.should == true;
208   n.should == 4;
209   n = 0;
210 
211   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
212   n.should == 2;
213   shared int p = 0;
214   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
215   p.should == 5;
216 }
217 
218 @("whenAll.oob")
219 unittest {
220   auto oob = OutOfBandValueSender!int(43);
221   auto value = ValueSender!int(11);
222   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
223 }
224 
225 @("withStopToken.oob")
226 unittest {
227   auto oob = OutOfBandValueSender!int(44);
228   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
229 }
230 
231 @("withStopSource.oob")
232 unittest {
233   auto oob = OutOfBandValueSender!int(45);
234   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
235 }
236 
237 @("withStopSource.tuple")
238 unittest {
239   just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742;
240 }
241 
242 @("value.withstoptoken.via.thread")
243 @safe unittest {
244   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
245 }
246 
247 @("completewithcancellation")
248 @safe unittest {
249   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
250 }
251 
252 @("raceAll")
253 @safe unittest {
254   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
255       while (!token.isStopRequested) { Thread.yield(); }
256     });
257   raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
258   raceAll(waiting, just(42)).syncWait.value.should == 42;
259   raceAll(waiting, ThrowingSender()).syncWait.isError.should == true;
260 }
261 
262 @("on.ManualTimeWorker")
263 @safe unittest {
264   import concurrency.scheduler : ManualTimeWorker;
265 
266   auto worker = new shared ManualTimeWorker();
267   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
268       worker.timeUntilNextEvent().should == 10.msecs;
269       worker.advance(5.msecs);
270       worker.timeUntilNextEvent().should == 5.msecs;
271       worker.advance(5.msecs);
272       worker.timeUntilNextEvent().should == null;
273     });
274   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
275 
276   whenAll(timer, driver).syncWait().isOk.should == true;
277 }
278 
279 @("on.ManualTimeWorker.cancel")
280 @safe unittest {
281   import concurrency.scheduler : ManualTimeWorker;
282 
283   auto worker = new shared ManualTimeWorker();
284   auto source = new StopSource();
285   auto driver = just(source).then((StopSource source) shared {
286       worker.timeUntilNextEvent().should == 10.msecs;
287       source.stop();
288       worker.timeUntilNextEvent().should == null;
289     });
290   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
291 
292   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
293 }
294 
295 @("then.stack.no-leak")
296 @safe unittest {
297   struct S {
298     void fun(int i) shared {
299     }
300   }
301   shared S s;
302   // its perfectly ok to point to a function on the stack
303   auto sender = just(42).then(&s.fun);
304 
305   sender.syncWait();
306 
307   void disappearSender(Sender)(Sender s) @safe;
308   // but the sender can't leak now
309   static assert(!__traits(compiles, disappearSender(sender)));
310 }
311 
312 @("forwardOn")
313 @safe unittest {
314   auto pool = stdTaskPool(2);
315 
316   VoidSender().forwardOn(pool.getScheduler).syncWait.isOk.should == true;
317   ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true;
318 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true;
319  just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42;
320 }
321 
322 @("toSingleton")
323 @safe unittest {
324   import std.typecons : tuple;
325   import concurrency.scheduler : ManualTimeWorker;
326   import core.atomic : atomicOp;
327 
328   shared int g;
329 
330   auto worker = new shared ManualTimeWorker();
331 
332   auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler);
333 
334   auto driver = justFrom(() shared => worker.advance(2.msecs));
335 
336   whenAll(single, single, driver).syncWait.value.should == tuple(1,1);
337   whenAll(single, single, driver).syncWait.value.should == tuple(2,2);
338 }
339 
340 @("stopOn")
341 @safe unittest {
342   auto sourceInner = new shared StopSource();
343   auto sourceOuter = new shared StopSource();
344 
345   shared bool b;
346   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
347           just(() => sourceOuter.stop())
348           ).syncWait(sourceOuter).assumeOk;
349   b.should == true;
350 
351   shared bool d;
352   whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)),
353           just(() => sourceInner.stop())
354           ).syncWait(sourceOuter).assumeOk;
355   d.should == false;
356 }
357 
358 @("withChild")
359 @safe unittest {
360   import core.atomic;
361 
362   class State {
363     import core.sync.event : Event;
364     bool parentAfterChild;
365     Event childEvent, parentEvent;
366     this() shared @trusted {
367       (cast()childEvent).initialize(false, false);
368       (cast()parentEvent).initialize(false, false);
369     }
370     void signalChild() shared @trusted {
371       (cast()childEvent).set();
372     }
373     void waitChild() shared @trusted {
374       (cast()childEvent).wait();
375     }
376     void signalParent() shared @trusted {
377       (cast()parentEvent).set();
378     }
379     void waitParent() shared @trusted {
380       (cast()parentEvent).wait();
381     }
382   }
383   auto state = new shared State();
384   auto source = new shared StopSource;
385 
386   import std.stdio;
387   auto child = just(state).withStopToken((StopToken token, shared State state) @trusted {
388       while(!token.isStopRequested) {}
389       state.signalParent();
390       state.waitChild();
391     }).via(ThreadSender());
392 
393   auto parent = just(state).withStopToken((StopToken token, shared State state){
394       state.waitParent();
395       state.parentAfterChild.atomicStore(token.isStopRequested == false);
396       state.signalChild();
397     }).via(ThreadSender());
398 
399   whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled;
400 
401   state.parentAfterChild.atomicLoad.should == true;
402 }
403 
404 @("onTermination.value")
405 @safe unittest {
406   import core.atomic : atomicOp;
407   shared int g = 0;
408   just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
409   g.should == 1;
410 }
411 
412 @("onTermination.done")
413 @safe unittest {
414   import core.atomic : atomicOp;
415   shared int g = 0;
416   DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
417   g.should == 1;
418 }
419 
420 @("onTermination.error")
421 @safe unittest {
422   import core.atomic : atomicOp;
423   shared int g = 0;
424   ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
425   g.should == 1;
426 }
427 
428 @("onError.value")
429 @safe unittest {
430   import core.atomic : atomicOp;
431   shared int g = 0;
432   just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk;
433   g.should == 0;
434 }
435 
436 @("onError.done")
437 @safe unittest {
438   import core.atomic : atomicOp;
439   shared int g = 0;
440   DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true;
441   g.should == 0;
442 }
443 
444 @("onError.error")
445 @safe unittest {
446   import core.atomic : atomicOp;
447   shared int g = 0;
448   ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true;
449   g.should == 1;
450 }
451 
452 @("onError.throw")
453 @safe unittest {
454   import core.exception : AssertError;
455   auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.error;
456   err.msg.should == "in onError";
457   err.next.msg.should == "ThrowingSender";
458 }