1 module ut.concurrency.sender;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("syncWait.value")
12 @safe unittest {
13   ValueSender!(int)(5).syncWait.value.shouldEqual(5);
14   whenAll(just(5), ThrowingSender()).syncWait.value.shouldThrow();
15   whenAll(just(5), DoneSender()).syncWait.value.shouldThrow();
16 }
17 
18 @("syncWait.assumeOk")
19 @safe unittest {
20   ThrowingSender().syncWait.assumeOk.shouldThrow();
21   DoneSender().syncWait.assumeOk.shouldThrow();
22   ErrorSender(new Exception("Failure")).syncWait.assumeOk.shouldThrow();
23 }
24 
25 @("syncWait.match")
26 @safe unittest {
27   ValueSender!(int)(5).syncWait.match!((int i) => true, "false").should == true;
28 }
29 
30 @("syncWait.match.void")
31 @safe unittest {
32   VoidSender().syncWait.match!((typeof(null)) => true, "false").should == true;
33 }
34 
35 @("syncWait.nested.basic")
36 @safe unittest {
37   import concurrency.stoptoken;
38   auto source = new shared StopSource();
39 
40   justFrom(() shared {
41       VoidSender().withStopToken((StopToken token) shared @safe {
42           source.stop();
43           token.isStopRequested.should == true;
44         }).syncWait().isCancelled.should == true;
45     }).syncWait(source).isCancelled.should == true;
46 }
47 
48 @("syncWait.nested.thread")
49 @safe unittest {
50   import concurrency.stoptoken;
51   auto source = new shared StopSource();
52 
53   justFrom(() shared {
54       VoidSender().withStopToken((StopToken token) shared @safe {
55           source.stop();
56           token.isStopRequested.should == true;
57         }).syncWait().isCancelled.should == true;
58     }).via(ThreadSender()).syncWait(source).isCancelled.should == true;
59 }
60 
61 @("syncWait.nested.threadpool")
62 @safe unittest {
63   import concurrency.stoptoken;
64   auto source = new shared StopSource();
65 
66   auto pool = stdTaskPool(2);
67 
68   justFrom(() shared {
69       VoidSender().withStopToken((StopToken token) shared @safe {
70           source.stop();
71           token.isStopRequested.should == true;
72         }).syncWait().isCancelled.should == true;
73     }).via(pool.getScheduler().schedule()).syncWait(source).isCancelled.should == true;
74 }
75 
76 @("value.start.attributes.1")
77 @safe nothrow @nogc unittest {
78   ValueSender!(int)(5).connect(NullReceiver!int()).start();
79 }
80 
81 @("value.start.attributes.2")
82 @safe nothrow unittest {
83   ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start();
84 }
85 
86 @("value.void")
87 @safe unittest {
88   ValueSender!void().syncWait().assumeOk;
89 }
90 
91 @("syncWait.thread")
92 @safe unittest {
93   ThreadSender().syncWait.assumeOk;
94 }
95 
96 @("syncWait.thread.then.value")
97 @safe unittest {
98   ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6);
99 }
100 
101 @("syncWait.thread.then.exception")
102 @safe unittest {
103   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are forwarded"); };
104   ThreadSender()
105     .then(dg)
106     .syncWait()
107     .isError.should == true;
108 }
109 
110 @("toSenderObject.value")
111 @safe unittest {
112   ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4);
113 }
114 
115 @("toSenderObject.thread")
116 @safe unittest {
117   ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7);
118 }
119 
120 @("via.threadsender.error")
121 @safe unittest {
122   ThrowingSender().via(ThreadSender()).syncWait().isError.should == true;
123 }
124 
125 @("toShared.basic")
126 @safe unittest {
127   import std.typecons : tuple;
128 
129   shared int g;
130 
131   auto s = just(1)
132     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
133     .toShared();
134 
135   whenAll(s, s).syncWait.value.should == tuple(1,1);
136   race(s, s).syncWait.value.should == 1;
137   s.syncWait.value.should == 1;
138   s.syncWait.value.should == 1;
139 
140   s.reset();
141   s.syncWait.value.should == 2;
142   s.syncWait.value.should == 2;
143   whenAll(s, s).syncWait.value.should == tuple(2,2);
144   race(s, s).syncWait.value.should == 2;
145 }
146 
147 @("toShared.via.thread")
148 @safe unittest {
149   import concurrency.operations.toshared;
150 
151   shared int g;
152 
153   auto s = just(1)
154     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
155     .via(ThreadSender())
156     .toShared();
157 
158   s.syncWait.value.should == 1;
159   s.syncWait.value.should == 1;
160 
161   s.reset();
162   s.syncWait.value.should == 2;
163   s.syncWait.value.should == 2;
164 }
165 
166 @("toShared.error")
167 @safe unittest {
168   shared int g;
169 
170   auto s = VoidSender()
171     .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); })
172     .toShared();
173 
174   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
175   g.should == 1;
176   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
177   g.should == 1;
178 
179   race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error");
180   g.should == 1;
181 
182   s.reset();
183   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
184   g.should == 2;
185 }
186 
187 @("toShared.done")
188 @safe unittest {
189   shared int g;
190 
191   auto s = DoneSender()
192     .via(VoidSender()
193          .then(() @trusted shared { g.atomicOp!"+="(1); }))
194     .toShared();
195 
196   s.syncWait.isCancelled.should == true;
197   g.should == 1;
198   s.syncWait.isCancelled.should == true;
199   g.should == 1;
200 
201   race(s, s).syncWait.isCancelled.should == true;
202   g.should == 1;
203 
204   s.reset();
205   s.syncWait.isCancelled.should == true;
206   g.should == 2;
207 }
208 
209 @("toShared.stop")
210 @safe unittest {
211   import concurrency.stoptoken;
212   import core.atomic : atomicStore, atomicLoad;
213   shared bool g;
214 
215   auto waiting = ThreadSender().withStopToken((StopToken token) shared @trusted {
216       while (!token.isStopRequested) { }
217       g.atomicStore(true);
218     });
219   auto source = new StopSource();
220   auto stopper = just(source).then((StopSource source) shared { source.stop(); });
221 
222   whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true;
223 
224   g.atomicLoad.should == true;
225 }
226 
227 @("toShared.scheduler")
228 @safe unittest {
229   import core.time : msecs;
230   // by default toShared doesn't support scheduling
231   static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().assumeOk; }));
232   // have to pass scheduler explicitly
233   import concurrency.scheduler : localThreadScheduler;
234   DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().assumeOk;
235 }
236 
237 @("toShared.nursery")
238 @safe unittest {
239   /// just see if we can instantiate
240   import concurrency.nursery;
241   import concurrency.scheduler;
242   auto n = new shared Nursery();
243   auto s = n.toShared(localThreadScheduler());
244 }
245 
246 @("nvro")
247 @safe unittest {
248   static struct Op(Receiver) {
249     Receiver receiver;
250     void* atConstructor;
251     @disable this(ref return scope typeof(this) rhs);
252     this(Receiver receiver) @trusted {
253       this.receiver = receiver;
254       atConstructor = cast(void*)&this;
255     }
256     void start() @trusted nothrow scope {
257       void* atStart = cast(void*)&this;
258       receiver.setValue(atConstructor == atStart);
259     }
260   }
261   static struct NRVOSender {
262     alias Value = bool;
263     auto connect(Receiver)(return Receiver receiver) @safe return scope {
264       // ensure NRVO
265       auto op = Op!Receiver(receiver);
266       return op;
267     }
268   }
269   NRVOSender().syncWait().value.should == true;
270   NRVOSender().via(ThreadSender()).syncWait().value.should == true;
271   whenAll(NRVOSender(),VoidSender()).syncWait.value.should == true;
272   whenAll(VoidSender(),NRVOSender()).syncWait.value.should == true;
273   race(NRVOSender(),NRVOSender()).syncWait.value.should == true;
274 }
275 
276 @("justFrom")
277 @safe unittest {
278   justFrom(() shared =>42).syncWait.value.should == 42;
279 }
280 
281 @("justFrom.exception")
282 @safe unittest {
283   justFrom(() shared { throw new Exception("failure"); }).syncWait.isError.should == true;
284 }
285 
286 @("delay")
287 @safe unittest {
288   import core.time : msecs;
289   import core.time : msecs;
290   import concurrency.scheduler : ManualTimeWorker;
291 
292   auto worker = new shared ManualTimeWorker();
293 
294   auto d = race(delay(20.msecs).then(() shared => 2),
295                 delay(1.msecs).then(() shared => 1))
296     .withScheduler(worker.getScheduler);
297 
298   auto driver = just(worker).then((shared ManualTimeWorker worker) {
299       worker.timeUntilNextEvent().should == 1.msecs;
300       worker.advance(1.msecs);
301     });
302 
303   whenAll(d, driver).syncWait.value.should == 1;
304 }
305 
306 @("promise.basic")
307 @safe unittest {
308   auto prom = promise!int;
309   auto cont = prom.sender.then((int i) => i * 2);
310   auto runner = justFrom(() shared => cast(void)prom.fulfill(72));
311 
312   whenAll(cont, runner).syncWait.value.should == 144;
313 }
314 
315 @("promise.double")
316 @safe unittest {
317   import std.typecons : tuple;
318   auto prom = promise!int;
319   auto cont = prom.sender.then((int i) => i * 2);
320   auto runner = justFrom(() shared => cast(void)prom.fulfill(72));
321 
322   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
323 }
324 
325 @("promise.scheduler")
326 @safe unittest {
327   import std.typecons : tuple;
328   auto prom = promise!int;
329   auto pool = stdTaskPool(2);
330 
331   auto cont = prom.sender.forwardOn(pool.getScheduler).then((int i) => i * 2);
332   auto runner = justFrom(() shared => cast(void)prom.fulfill(72)).via(ThreadSender());
333 
334   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
335 }
336 
337 @("promise.then.exception.inline")
338 @safe unittest {
339   auto prom = promise!int;
340   auto cont = prom.sender.then((int i) { throw new Exception("nope"); });
341   prom.fulfill(33);
342   cont.syncWait().assumeOk.shouldThrowWithMessage("nope");
343 }
344 
345 @("promise.then.exception.thread")
346 @safe unittest {
347   auto prom = promise!int;
348   auto cont = prom.sender.then((int i) { throw new Exception("nope"); });
349   auto runner = justFrom(() shared => cast(void)prom.fulfill(72)).via(ThreadSender());
350   whenAll(cont, runner).syncWait().assumeOk.shouldThrowWithMessage("nope");
351 }
352 
353 @("just.tuple")
354 @safe unittest {
355   import std.typecons : tuple;
356   import concurrency.stoptoken;
357   just(14, 52).syncWait.value.should == tuple(14, 52);
358   just(14, 53).then((int a, int b) => a*b).syncWait.value.should == 742;
359   just(14, 54).withStopToken((StopToken s, int a, int b) => a*b).syncWait.value.should == 756;
360 }
361 
362 @("just.scope")
363 @safe unittest {
364   void disappearSender(Sender)(Sender s) @safe;
365   int g;
366   scope int* s = &g;
367   just(s).syncWait().value.should == s;
368   just(s).retry(Times(5)).syncWait().value.should == s;
369   static assert(!__traits(compiles, disappearSender(just(s))));
370   static assert(!__traits(compiles, disappearSender(just(s).retry(Times(5)))));
371 }
372 
373 @("defer.static.fun")
374 @safe unittest {
375   static auto fun() {
376     return just(1);
377   }
378 
379   defer(&fun).syncWait().value.should == 1;
380 }
381 
382 @("defer.delegate")
383 @safe unittest {
384   defer(() => just(1)).syncWait().value.should == 1;
385 }
386 
387 @("defer.opCall")
388 @safe unittest {
389   static struct S {
390     auto opCall() @safe shared {
391       return just(1);
392     }
393   }
394   shared S s;
395   defer(s).syncWait().value.should == 1;
396 }