1 module ut.concurrency.operations;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import concurrency.stoptoken;
9 import concurrency.nursery;
10 import unit_threaded;
11 import core.time;
12 import core.thread;
13 import std.typecons;
14 
15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called
16 struct OutOfBandValueSender(T) {
17   alias Value = T;
18   T value;
19   struct Op(Receiver) {
20     Receiver receiver;
21     T value;
22     void run() {
23       receiver.setValue(value);
24     }
25     void start() @trusted scope {
26       auto value = new Thread(&this.run).start();
27     }
28   }
29   auto connect(Receiver)(return Receiver receiver) @safe scope return {
30     // ensure NRVO
31     auto op = Op!(Receiver)(receiver, value);
32     return op;
33   }
34 }
35 
36 @("ignoreErrors.syncWait.value")
37 @safe unittest {
38   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
39   ThreadSender()
40     .then(dg)
41     .ignoreError()
42     .syncWait.isCancelled.should == true;
43 }
44 
45 @("oob")
46 unittest {
47   auto oob = OutOfBandValueSender!int(43);
48   oob.syncWait.value.should == 43;
49 }
50 
51 @("race")
52 unittest {
53   race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4;
54   auto fastThread = ThreadSender().then(() shared => 1);
55   auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; });
56   race(fastThread, slowThread).syncWait.value.should == 1;
57   race(slowThread, fastThread).syncWait.value.should == 1;
58 }
59 
60 @("race.multiple")
61 unittest {
62   race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4;
63 }
64 
65 @("race.oob")
66 unittest {
67   auto oob = OutOfBandValueSender!int(43);
68   auto value = ValueSender!int(11);
69   race(oob, value).syncWait.value.should == 11;
70 }
71 
72 @("race.exception.single")
73 unittest {
74   race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5;
75   race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow();
76 }
77 
78 @("race.exception.double")
79 unittest {
80   auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); });
81   auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); });
82   race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast");
83 }
84 
85 @("race.cancel-other")
86 unittest {
87   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
88       while (!token.isStopRequested) { Thread.yield(); }
89     });
90   race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88;
91 }
92 
93 @("race.cancel")
94 unittest {
95   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
96       while (!token.isStopRequested) { Thread.yield(); }
97     });
98   auto nursery = new shared Nursery();
99   nursery.run(race(waiting, waiting));
100   nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); }));
101   nursery.syncWait.isCancelled.should == true;
102 }
103 
104 @("via")
105 unittest {
106   import std.typecons : tuple;
107   ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3);
108   ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5;
109   VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4;
110 }
111 
112 @("then.value")
113 @safe unittest {
114   ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9);
115 }
116 
117 @("then.oob")
118 @safe unittest {
119   OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138);
120 }
121 
122 @("finally")
123 unittest {
124   ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4;
125   ValueSender!int(2).finally_(3).syncWait.value.should == 3;
126   ThrowingSender().finally_(3).syncWait.value.should == 3;
127   ThrowingSender().finally_(() => 4).syncWait.value.should == 4;
128   ThrowingSender().finally_(3).syncWait.value.should == 3;
129   DoneSender().finally_(() => 4).syncWait.isCancelled.should == true;
130   DoneSender().finally_(3).syncWait.isCancelled.should == true;
131 }
132 
133 @("whenAll")
134 unittest {
135   whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2);
136   whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3);
137   whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2;
138   whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1;
139   whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true;
140   whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
141   whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
142   whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true;
143   whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true;
144   whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true;
145   whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender");
146 
147 }
148 
149 @("whenAll.cancel")
150 unittest {
151   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
152       while (!token.isStopRequested) { Thread.yield(); }
153     });
154   whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true;
155   whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow;
156   whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow;
157   auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted {
158       while (!token.isStopRequested) { Thread.yield(); }
159       return 42;
160     });
161   whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true;
162   whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow;
163   whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow;
164 }
165 
166 @("whenAll.stop")
167 unittest {
168   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
169       while (!token.isStopRequested) { Thread.yield(); }
170     });
171   auto source = new StopSource();
172   auto stopper = just(source).then((StopSource source) shared => source.stop());
173   whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true;
174 }
175 
176 @("retry")
177 unittest {
178   ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5;
179   int t = 3;
180   int n = 0;
181   struct Sender {
182     alias Value = void;
183     static struct Op(Receiver) {
184       Receiver receiver;
185       bool fail;
186       void start() @safe nothrow {
187         if (fail)
188           receiver.setError(new Exception("Fail fail fail"));
189         else
190           receiver.setValue();
191       }
192     }
193     auto connect(Receiver)(return Receiver receiver) @safe scope return {
194       // ensure NRVO
195       auto op = Op!(Receiver)(receiver, n++ < t);
196       return op;
197     }
198   }
199   Sender().retry(Times(5)).syncWait.isOk.should == true;
200   n.should == 4;
201   n = 0;
202 
203   Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail");
204   n.should == 2;
205   shared int p = 0;
206   ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed");
207   p.should == 5;
208 }
209 
210 @("whenAll.oob")
211 unittest {
212   auto oob = OutOfBandValueSender!int(43);
213   auto value = ValueSender!int(11);
214   whenAll(oob, value).syncWait.value.should == tuple(43, 11);
215 }
216 
217 @("withStopToken.oob")
218 unittest {
219   auto oob = OutOfBandValueSender!int(44);
220   oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44;
221 }
222 
223 @("withStopSource.oob")
224 unittest {
225   auto oob = OutOfBandValueSender!int(45);
226   oob.withStopSource(new StopSource()).syncWait.value.should == 45;
227 }
228 
229 @("value.withstoptoken.via.thread")
230 @safe unittest {
231   ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness");
232 }
233 
234 @("completewithcancellation")
235 @safe unittest {
236   ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true;
237 }
238 
239 @("raceAll")
240 @safe unittest {
241   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
242       while (!token.isStopRequested) { Thread.yield(); }
243     });
244   raceAll(waiting, DoneSender()).syncWait.isOk.should == true;
245   raceAll(waiting, just(42)).syncWait.value.should == 42;
246   raceAll(waiting, ThrowingSender()).syncWait.isOk.should == true;
247 }
248 
249 @("on.ManualTimeWorker")
250 @safe unittest {
251   import concurrency.scheduler : ManualTimeWorker;
252 
253   auto worker = new shared ManualTimeWorker();
254   auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
255       worker.timeUntilNextEvent().should == 10.msecs;
256       worker.advance(5.msecs);
257       worker.timeUntilNextEvent().should == 5.msecs;
258       worker.advance(5.msecs);
259       worker.timeUntilNextEvent().should == null;
260     });
261   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
262 
263   whenAll(timer, driver).syncWait().isOk.should == true;
264 }
265 
266 @("on.ManualTimeWorker.cancel")
267 @safe unittest {
268   import concurrency.scheduler : ManualTimeWorker;
269 
270   auto worker = new shared ManualTimeWorker();
271   auto source = new StopSource();
272   auto driver = just(source).then((StopSource source) shared {
273       worker.timeUntilNextEvent().should == 10.msecs;
274       source.stop();
275       worker.timeUntilNextEvent().should == null;
276     });
277   auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler);
278 
279   whenAll(timer, driver).syncWait(source).isCancelled.should == true;
280 }
281 
282 @("then.stack.no-leak")
283 @safe unittest {
284   struct S {
285     void fun(int i) shared {
286     }
287   }
288   shared S s;
289   // its perfectly ok to point to a function on the stack
290   auto sender = just(42).then(&s.fun);
291 
292   sender.syncWait();
293 
294   void disappearSender(Sender)(Sender s) @safe;
295   // but the sender can't leak now
296   static assert(!__traits(compiles, disappearSender(sender)));
297 }