1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     void run() {
23       receiver.setValue(value);
24     }
25     void start() @trusted scope {
26       auto value = new Thread(&this.run).start();
27     }
28   }
29   auto connect(Receiver)(return Receiver receiver) @safe scope return {
30     // ensure NRVO
31     auto op = Op!(Receiver)(receiver, value);
32     return op;
33   }
34 }
35 
36 @("ignoreErrors.syncWait.value")
37 @safe unittest {
38   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
39   ThreadSender()
40     .then(dg)
41     .ignoreError()
42     .syncWait.isCancelled.should == true;
43 }
44 
45 @("oob")
46 unittest {
47   auto oob = OutOfBandValueSender!int(43);
48   oob.syncWait.value.should == 43;
49 }
50 
51 @("race")
52 unittest {
53   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
54   auto fastThread = ThreadSender().then(() shared => 1);
55   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
56   race(fastThread, slowThread).syncWait.value.should == 1;
57   race(slowThread, fastThread).syncWait.value.should == 1;
58 }
59 
60 @("race.multiple")
61 unittest {
62   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
63 }
64 
65 @("race.exception.single")
66 unittest {
67   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
68   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
69 }
70 
71 @("race.exception.double")
72 unittest {
73   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
74   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
75   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
76 }
77 
78 @("race.cancel-other")
79 unittest {
80   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
81       while (!token.isStopRequested) { Thread.yield(); }
82     });
83   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
84 }
85 
86 @("race.cancel")
87 unittest {
88   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
89       while (!token.isStopRequested) { Thread.yield(); }
90     });
91   auto nursery = new shared Nursery();
92   nursery.run(race(waiting, waiting));
93   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
94   nursery.syncWait.isCancelled.should == true;
95 }
96 
97 @("via")
98 unittest {
99   import std.typecons : tuple;
100   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
101   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
102   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
103 }
104 
105 @("then.value.delegate")
106 @safe unittest {
107   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
108 }
109 
110 @("then.value.function")
111 @safe unittest {
112   ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9);
113 }
114 
115 @("then.oob")
116 @safe unittest {
117   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
118 }
119 
120 @("finally")
121 unittest {
122   ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4;
123   ValueSender!int(2).finally_(3).syncWait.value.should == 3;
124   ThrowingSender().finally_(3).syncWait.value.should == 3;
125   ThrowingSender().finally_(() => 4).syncWait.value.should == 4;
126   ThrowingSender().finally_(3).syncWait.value.should == 3;
127   DoneSender().finally_(() => 4).syncWait.isCancelled.should == true;
128   DoneSender().finally_(3).syncWait.isCancelled.should == true;
129 }
130 
131 @("whenAll")
132 unittest {
133   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
134   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
135   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
136   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
137   whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true;
138   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
139   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
140   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
141   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
142   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
143   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
144 
145 }
146 
147 @("whenAll.cancel")
148 unittest {
149   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
150       while (!token.isStopRequested) { Thread.yield(); }
151     });
152   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
153   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
154   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
155   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
156       while (!token.isStopRequested) { Thread.yield(); }
157       return 42;
158     });
159   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
160   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
161   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
162 }
163 
164 @("whenAll.stop")
165 unittest {
166   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
167       while (!token.isStopRequested) { Thread.yield(); }
168     });
169   auto source = new StopSource();
170   auto stopper = just(source).then((StopSource source) shared => source.stop());
171   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
172 }
173 
174 @("retry")
175 unittest {
176   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
177   int t = 3;
178   int n = 0;
179   struct Sender {
180     alias Value = void;
181     static struct Op(Receiver) {
182       Receiver receiver;
183       bool fail;
184       void start() @safe nothrow {
185         if (fail)
186           receiver.setError(new Exception("Fail fail fail"));
187         else
188           receiver.setValue();
189       }
190     }
191     auto connect(Receiver)(return Receiver receiver) @safe scope return {
192       // ensure NRVO
193       auto op = Op!(Receiver)(receiver, n++ < t);
194       return op;
195     }
196   }
197   Sender().retry(Times(5)).syncWait.isOk.should == true;
198   n.should == 4;
199   n = 0;
200 
201   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
202   n.should == 2;
203   shared int p = 0;
204   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
205   p.should == 5;
206 }
207 
208 @("whenAll.oob")
209 unittest {
210   auto oob = OutOfBandValueSender!int(43);
211   auto value = ValueSender!int(11);
212   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
213 }
214 
215 @("withStopToken.oob")
216 unittest {
217   auto oob = OutOfBandValueSender!int(44);
218   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
219 }
220 
221 @("withStopSource.oob")
222 unittest {
223   auto oob = OutOfBandValueSender!int(45);
224   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
225 }
226 
227 @("value.withstoptoken.via.thread")
228 @safe unittest {
229   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
230 }
231 
232 @("completewithcancellation")
233 @safe unittest {
234   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
235 }
236 
237 @("raceAll")
238 @safe unittest {
239   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
240       while (!token.isStopRequested) { Thread.yield(); }
241     });
242   raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
243   raceAll(waiting, just(42)).syncWait.value.should == 42;
244   raceAll(waiting, ThrowingSender()).syncWait.isError.should == true;
245 }
246 
247 @("on.ManualTimeWorker")
248 @safe unittest {
249   import concurrency.scheduler : ManualTimeWorker;
250 
251   auto worker = new shared ManualTimeWorker();
252   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
253       worker.timeUntilNextEvent().should == 10.msecs;
254       worker.advance(5.msecs);
255       worker.timeUntilNextEvent().should == 5.msecs;
256       worker.advance(5.msecs);
257       worker.timeUntilNextEvent().should == null;
258     });
259   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
260 
261   whenAll(timer, driver).syncWait().isOk.should == true;
262 }
263 
264 @("on.ManualTimeWorker.cancel")
265 @safe unittest {
266   import concurrency.scheduler : ManualTimeWorker;
267 
268   auto worker = new shared ManualTimeWorker();
269   auto source = new StopSource();
270   auto driver = just(source).then((StopSource source) shared {
271       worker.timeUntilNextEvent().should == 10.msecs;
272       source.stop();
273       worker.timeUntilNextEvent().should == null;
274     });
275   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
276 
277   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
278 }
279 
280 @("then.stack.no-leak")
281 @safe unittest {
282   struct S {
283     void fun(int i) shared {
284     }
285   }
286   shared S s;
287   // its perfectly ok to point to a function on the stack
288   auto sender = just(42).then(&s.fun);
289 
290   sender.syncWait();
291 
292   void disappearSender(Sender)(Sender s) @safe;
293   // but the sender can't leak now
294   static assert(!__traits(compiles, disappearSender(sender)));
295 }
296 
297 @("forwardOn")
298 @safe unittest {
299   auto pool = stdTaskPool(2);
300 
301   VoidSender().forwardOn(pool.getScheduler).syncWait.isOk.should == true;
302   ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true;
303 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true;
304  just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42;
305 }