1 module ut.concurrency.sender;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("syncWait.value")
12 @safe unittest {
13   ValueSender!(int)(5).syncWait.value.shouldEqual(5);
14   whenAll(just(5), ThrowingSender()).syncWait.value.shouldThrow();
15   whenAll(just(5), DoneSender()).syncWait.value.shouldThrow();
16 }
17 
18 @("syncWait.assumeOk")
19 @safe unittest {
20   ThrowingSender().syncWait.assumeOk.shouldThrow();
21   DoneSender().syncWait.assumeOk.shouldThrow();
22   ErrorSender(new Exception("Failure")).syncWait.assumeOk.shouldThrow();
23 }
24 
25 @("syncWait.match")
26 @safe unittest {
27   ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true;
28 }
29 
30 @("syncWait.match.void")
31 @safe unittest {
32   VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true;
33 }
34 
35 @("syncWait.nested.basic")
36 @safe unittest {
37   import concurrency.stoptoken;
38   auto source = new shared StopSource();
39 
40   justFrom(() shared {
41       VoidSender().withStopToken((StopToken token){
42           source.stop();
43           token.isStopRequested.should == true;
44         }).syncWait().isCancelled.should == true;
45     }).syncWait(source).isCancelled.should == true;
46 }
47 
48 @("syncWait.nested.thread")
49 @safe unittest {
50   import concurrency.stoptoken;
51   auto source = new shared StopSource();
52 
53   justFrom(() shared {
54       VoidSender().withStopToken((StopToken token){
55           source.stop();
56           token.isStopRequested.should == true;
57         }).syncWait().isCancelled.should == true;
58     }).via(ThreadSender()).syncWait(source).isCancelled.should == true;
59 }
60 
61 @("syncWait.nested.threadpool")
62 @safe unittest {
63   import concurrency.stoptoken;
64   auto source = new shared StopSource();
65 
66   auto pool = stdTaskPool(2);
67 
68   justFrom(() shared {
69       VoidSender().withStopToken((StopToken token){
70           source.stop();
71           token.isStopRequested.should == true;
72         }).syncWait().isCancelled.should == true;
73     }).via(pool.getScheduler().schedule()).syncWait(source).isCancelled.should == true;
74 }
75 
76 @("value.start.attributes.1")
77 @safe nothrow @nogc unittest {
78   ValueSender!(int)(5).connect(NullReceiver!int()).start();
79 }
80 
81 @("value.start.attributes.2")
82 @safe nothrow unittest {
83   ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start();
84 }
85 
86 @("value.void")
87 @safe unittest {
88   ValueSender!void().syncWait().assumeOk;
89 }
90 
91 @("syncWait.thread")
92 @safe unittest {
93   ThreadSender().syncWait.assumeOk;
94 }
95 
96 @("syncWait.thread.then.value")
97 @safe unittest {
98   ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6);
99 }
100 
101 @("syncWait.thread.then.exception")
102 @safe unittest {
103   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are forwarded"); };
104   ThreadSender()
105     .then(dg)
106     .syncWait()
107     .isError.should == true;
108 }
109 
110 @("toSenderObject.value")
111 @safe unittest {
112   ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4);
113 }
114 
115 @("toSenderObject.thread")
116 @safe unittest {
117   ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7);
118 }
119 
120 @("via.threadsender.error")
121 @safe unittest {
122   ThrowingSender().via(ThreadSender()).syncWait().isError.should == true;
123 }
124 
125 @("toShared.basic")
126 @safe unittest {
127   import std.typecons : tuple;
128 
129   shared int g;
130 
131   auto s = just(1)
132     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
133     .toShared();
134 
135   whenAll(s, s).syncWait.value.should == tuple(1,1);
136   race(s, s).syncWait.value.should == 1;
137   s.syncWait.value.should == 1;
138   s.syncWait.value.should == 1;
139 
140   s.reset();
141   s.syncWait.value.should == 2;
142   s.syncWait.value.should == 2;
143   whenAll(s, s).syncWait.value.should == tuple(2,2);
144   race(s, s).syncWait.value.should == 2;
145 }
146 
147 @("toShared.via.thread")
148 @safe unittest {
149   import concurrency.operations.toshared;
150 
151   shared int g;
152 
153   auto s = just(1)
154     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
155     .via(ThreadSender())
156     .toShared();
157 
158   s.syncWait.value.should == 1;
159   s.syncWait.value.should == 1;
160 
161   s.reset();
162   s.syncWait.value.should == 2;
163   s.syncWait.value.should == 2;
164 }
165 
166 @("toShared.error")
167 @safe unittest {
168   shared int g;
169 
170   auto s = VoidSender()
171     .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); })
172     .toShared();
173 
174   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
175   g.should == 1;
176   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
177   g.should == 1;
178 
179   race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error");
180   g.should == 1;
181 
182   s.reset();
183   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
184   g.should == 2;
185 }
186 
187 @("toShared.done")
188 @safe unittest {
189   shared int g;
190 
191   auto s = DoneSender()
192     .via(VoidSender()
193          .then(() @trusted shared { g.atomicOp!"+="(1); }))
194     .toShared();
195 
196   s.syncWait.isCancelled.should == true;
197   g.should == 1;
198   s.syncWait.isCancelled.should == true;
199   g.should == 1;
200 
201   race(s, s).syncWait.isCancelled.should == true;
202   g.should == 1;
203 
204   s.reset();
205   s.syncWait.isCancelled.should == true;
206   g.should == 2;
207 }
208 
209 @("toShared.stop")
210 @safe unittest {
211   import concurrency.stoptoken;
212   import core.atomic : atomicStore, atomicLoad;
213   shared bool g;
214 
215   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
216       while (!token.isStopRequested) { }
217       g.atomicStore(true);
218     });
219   auto source = new StopSource();
220   auto stopper = just(source).then((StopSource source) shared { source.stop(); });
221 
222   whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true;
223 
224   g.atomicLoad.should == true;
225 }
226 
227 @("toShared.scheduler")
228 @safe unittest {
229   import core.time : msecs;
230   // by default toShared doesn't support scheduling
231   static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().assumeOk; }));
232   // have to pass scheduler explicitly
233   import concurrency.scheduler : localThreadScheduler;
234   DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().assumeOk;
235 }
236 
237 @("toShared.nursery")
238 @safe unittest {
239   /// just see if we can instantiate
240   import concurrency.nursery;
241   import concurrency.scheduler;
242   auto n = new shared Nursery();
243   auto s = n.toShared(localThreadScheduler());
244 }
245 
246 @("nvro")
247 @safe unittest {
248   static struct Op(Receiver) {
249     Receiver receiver;
250     void* atConstructor;
251     @disable this(ref return scope typeof(this) rhs);
252     this(Receiver receiver) @trusted {
253       this.receiver = receiver;
254       atConstructor = cast(void*)&this;
255     }
256     void start() @trusted nothrow {
257       void* atStart = cast(void*)&this;
258       receiver.setValue(atConstructor == atStart);
259     }
260   }
261   static struct NRVOSender {
262     alias Value = bool;
263     auto connect(Receiver)(return Receiver receiver) @safe scope return {
264       // ensure NRVO
265       auto op = Op!Receiver(receiver);
266       return op;
267     }
268   }
269   NRVOSender().syncWait().assumeOk;
270   NRVOSender().via(ThreadSender()).syncWait().assumeOk;
271   whenAll(NRVOSender(),VoidSender()).syncWait.assumeOk;
272   whenAll(VoidSender(),NRVOSender()).syncWait.assumeOk;
273   race(NRVOSender(),NRVOSender()).syncWait.assumeOk;
274 }
275 
276 @("justFrom")
277 @safe unittest {
278   justFrom(() shared =>42).syncWait.value.should == 42;
279 }
280 
281 @("justFrom.exception")
282 @safe unittest {
283   justFrom(() shared { throw new Exception("failure"); }).syncWait.isError.should == true;
284 }
285 
286 @("delay")
287 @safe unittest {
288   import core.time : msecs;
289 
290   race(delay(20.msecs).then(() shared => 2),
291        delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1;
292 }
293 
294 @("promise.basic")
295 @safe unittest {
296   auto prom = promise!int;
297   auto cont = prom.then((int i) => i * 2);
298   auto runner = justFrom(() shared => prom.fulfill(72));
299 
300   whenAll(cont, runner).syncWait.value.should == 144;
301 }
302 
303 @("promise.double")
304 @safe unittest {
305   import std.typecons : tuple;
306   auto prom = promise!int;
307   auto cont = prom.then((int i) => i * 2);
308   auto runner = justFrom(() shared => prom.fulfill(72));
309 
310   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
311 }
312 
313 @("promise.scheduler")
314 @safe unittest {
315   import std.typecons : tuple;
316   auto prom = promise!int;
317   auto pool = stdTaskPool(2);
318 
319   auto cont = prom.forwardOn(pool.getScheduler).then((int i) => i * 2);
320   auto runner = justFrom(() shared => prom.fulfill(72)).via(ThreadSender());
321 
322   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
323 }
324 
325 @("just.tuple")
326 @safe unittest {
327   import std.typecons : tuple;
328   import concurrency.stoptoken;
329   just(14, 52).syncWait.value.should == tuple(14, 52);
330   just(14, 53).then((int a, int b) => a*b).syncWait.value.should == 742;
331   just(14, 54).withStopToken((StopToken s, int a, int b) => a*b).syncWait.value.should == 756;
332 }