1 module ut.concurrency.sender;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("syncWait.value")
12 @safe unittest {
13   ValueSender!(int)(5).syncWait.value.shouldEqual(5);
14   whenAll(just(5), ThrowingSender()).syncWait.value.shouldThrow();
15   whenAll(just(5), DoneSender()).syncWait.value.shouldThrow();
16 }
17 
18 @("syncWait.assumeOk")
19 @safe unittest {
20   ThrowingSender().syncWait.assumeOk.shouldThrow();
21   DoneSender().syncWait.assumeOk.shouldThrow();
22   ErrorSender(new Exception("Failure")).syncWait.assumeOk.shouldThrow();
23 }
24 
25 @("syncWait.match")
26 @safe unittest {
27   ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true;
28 }
29 
30 @("syncWait.match.void")
31 @safe unittest {
32   VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true;
33 }
34 
35 @("value.start.attributes.1")
36 @safe nothrow @nogc unittest {
37   ValueSender!(int)(5).connect(NullReceiver!int()).start();
38 }
39 
40 @("value.start.attributes.2")
41 @safe nothrow unittest {
42   ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start();
43 }
44 
45 @("value.void")
46 @safe unittest {
47   ValueSender!void().syncWait().isOk.should == true;
48 }
49 
50 @("syncWait.thread")
51 @safe unittest {
52   ThreadSender().syncWait.isOk.should == true;
53 }
54 
55 @("syncWait.thread.then.value")
56 @safe unittest {
57   ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6);
58 }
59 
60 @("syncWait.thread.then.exception")
61 @safe unittest {
62   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
63   ThreadSender()
64     .then(dg)
65     .syncWait()
66     .isError.should == true;
67 }
68 
69 @("toSenderObject.value")
70 @safe unittest {
71   ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4);
72 }
73 
74 @("toSenderObject.thread")
75 @safe unittest {
76   ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7);
77 }
78 
79 @("via.threadsender.error")
80 @safe unittest {
81   ThrowingSender().via(ThreadSender()).syncWait().isError.should == true;
82 }
83 
84 @("toShared.basic")
85 @safe unittest {
86   import std.typecons : tuple;
87 
88   shared int g;
89 
90   auto s = just(1)
91     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
92     .toShared();
93 
94   whenAll(s, s).syncWait.value.should == tuple(1,1);
95   race(s, s).syncWait.value.should == 1;
96   s.syncWait.value.should == 1;
97   s.syncWait.value.should == 1;
98 
99   s.reset();
100   s.syncWait.value.should == 2;
101   s.syncWait.value.should == 2;
102   whenAll(s, s).syncWait.value.should == tuple(2,2);
103   race(s, s).syncWait.value.should == 2;
104 }
105 
106 @("toShared.via.thread")
107 @safe unittest {
108   import concurrency.operations.toshared;
109 
110   shared int g;
111 
112   auto s = just(1)
113     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
114     .via(ThreadSender())
115     .toShared();
116 
117   s.syncWait.value.should == 1;
118   s.syncWait.value.should == 1;
119 
120   s.reset();
121   s.syncWait.value.should == 2;
122   s.syncWait.value.should == 2;
123 }
124 
125 @("toShared.error")
126 @safe unittest {
127   shared int g;
128 
129   auto s = VoidSender()
130     .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); })
131     .toShared();
132 
133   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
134   g.should == 1;
135   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
136   g.should == 1;
137 
138   race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error");
139   g.should == 1;
140 
141   s.reset();
142   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
143   g.should == 2;
144 }
145 
146 @("toShared.done")
147 @safe unittest {
148   shared int g;
149 
150   auto s = DoneSender()
151     .via(VoidSender()
152          .then(() @trusted shared { g.atomicOp!"+="(1); }))
153     .toShared();
154 
155   s.syncWait.isCancelled.should == true;
156   g.should == 1;
157   s.syncWait.isCancelled.should == true;
158   g.should == 1;
159 
160   race(s, s).syncWait.isCancelled.should == true;
161   g.should == 1;
162 
163   s.reset();
164   s.syncWait.isCancelled.should == true;
165   g.should == 2;
166 }
167 
168 @("toShared.stop")
169 @safe unittest {
170   import concurrency.stoptoken;
171   import core.atomic : atomicStore, atomicLoad;
172   shared bool g;
173 
174   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
175       while (!token.isStopRequested) { }
176       g.atomicStore(true);
177     });
178   auto source = new StopSource();
179   auto stopper = just(source).then((StopSource source) shared { source.stop(); });
180 
181   whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true;
182 
183   g.atomicLoad.should == true;
184 }
185 
186 @("toShared.scheduler")
187 @safe unittest {
188   import core.time : msecs;
189   // by default toShared doesn't support scheduling
190   static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().isOk.should == true; }));
191   // have to pass scheduler explicitly
192   import concurrency.scheduler : localThreadScheduler;
193   DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().isOk.should == true;
194 }
195 
196 @("nvro")
197 @safe unittest {
198   static struct Op(Receiver) {
199     Receiver receiver;
200     void* atConstructor;
201     @disable this(ref return scope typeof(this) rhs);
202     this(Receiver receiver) @trusted {
203       this.receiver = receiver;
204       atConstructor = cast(void*)&this;
205     }
206     void start() @trusted nothrow {
207       void* atStart = cast(void*)&this;
208       receiver.setValue(atConstructor == atStart);
209     }
210   }
211   static struct NRVOSender {
212     alias Value = bool;
213     auto connect(Receiver)(return Receiver receiver) @safe scope return {
214       // ensure NRVO
215       auto op = Op!Receiver(receiver);
216       return op;
217     }
218   }
219   NRVOSender().syncWait().isOk.should == true;
220   NRVOSender().via(ThreadSender()).syncWait().isOk.should == true;
221   whenAll(NRVOSender(),VoidSender()).syncWait.isOk.should == true;
222   whenAll(VoidSender(),NRVOSender()).syncWait.isOk.should == true;
223   race(NRVOSender(),NRVOSender()).syncWait.isOk.should == true;
224 }
225 
226 @("justFrom")
227 @safe unittest {
228   justFrom(() shared =>42).syncWait.value.should == 42;
229 }
230 
231 @("delay")
232 @safe unittest {
233   import core.time : msecs;
234 
235   race(delay(2.msecs).then(() shared => 2),
236        delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1;
237 }
238 
239 @("promise.basic")
240 @safe unittest {
241   auto prom = promise!int;
242   auto cont = prom.then((int i) => i * 2);
243   auto runner = justFrom(() shared => prom.fulfill(72));
244   
245   whenAll(cont, runner).syncWait.value.should == 144;
246 }
247 
248 @("promise.double")
249 @safe unittest {
250   import std.typecons : tuple;
251   auto prom = promise!int;
252   auto cont = prom.then((int i) => i * 2);
253   auto runner = justFrom(() shared => prom.fulfill(72));
254   
255   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
256 }
257 
258 @("promise.scheduler")
259 @safe unittest {
260   import std.typecons : tuple;
261   auto prom = promise!int;
262   auto pool = stdTaskPool(2);
263 
264   auto cont = prom.forwardOn(pool.getScheduler).then((int i) => i * 2);
265   auto runner = justFrom(() shared => prom.fulfill(72)).via(ThreadSender());
266   
267   whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144);
268 }
269 
270 @("just.tuple")
271 @safe unittest {
272   import std.typecons : tuple;
273   import concurrency.stoptoken;
274   just(14, 52).syncWait.value.should == tuple(14, 52);
275   just(14, 53).then((int a, int b) => a*b).syncWait.value.should == 742;
276   just(14, 54).withStopToken((StopToken s, int a, int b) => a*b).syncWait.value.should == 756;
277 }