1 module ut.concurrency.sender;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("syncWait.value")
12 @safe unittest {
13   ValueSender!(int)(5).syncWait.value.shouldEqual(5);
14 }
15 
16 @("syncWait.match")
17 @safe unittest {
18   ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true;
19 }
20 
21 @("syncWait.match.void")
22 @safe unittest {
23   VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true;
24 }
25 
26 @("value.start.attributes.1")
27 @safe nothrow @nogc unittest {
28   ValueSender!(int)(5).connect(NullReceiver!int()).start();
29 }
30 
31 @("value.start.attributes.2")
32 @safe nothrow unittest {
33   ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start();
34 }
35 
36 @("value.void")
37 @safe unittest {
38   ValueSender!void().syncWait().isOk.should == true;
39 }
40 
41 @("syncWait.thread")
42 @safe unittest {
43   ThreadSender().syncWait.isOk.should == true;
44 }
45 
46 @("syncWait.thread.then.value")
47 @safe unittest {
48   ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6);
49 }
50 
51 @("syncWait.thread.then.exception")
52 @safe unittest {
53   bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); };
54   ThreadSender()
55     .then(dg)
56     .syncWait()
57     .isError.should == true;
58 }
59 
60 @("toSenderObject.value")
61 @safe unittest {
62   ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4);
63 }
64 
65 @("toSenderObject.thread")
66 @safe unittest {
67   ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7);
68 }
69 
70 @("via.threadsender.error")
71 @safe unittest {
72   ThrowingSender().via(ThreadSender()).syncWait().isError.should == true;
73 }
74 
75 @("toShared.basic")
76 @safe unittest {
77   import std.typecons : tuple;
78 
79   shared int g;
80 
81   auto s = just(1)
82     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
83     .toShared();
84 
85   whenAll(s, s).syncWait.value.should == tuple(1,1);
86   race(s, s).syncWait.value.should == 1;
87   s.syncWait.value.should == 1;
88   s.syncWait.value.should == 1;
89 
90   s.reset();
91   s.syncWait.value.should == 2;
92   s.syncWait.value.should == 2;
93   whenAll(s, s).syncWait.value.should == tuple(2,2);
94   race(s, s).syncWait.value.should == 2;
95 }
96 
97 @("toShared.via.thread")
98 @safe unittest {
99   import concurrency.operations.toshared;
100 
101   shared int g;
102 
103   auto s = just(1)
104     .then((int i) @trusted shared { return g.atomicOp!"+="(1); })
105     .via(ThreadSender())
106     .toShared();
107 
108   race(s, s).syncWait.value.should == 1;
109   s.reset();
110   race(s, s).syncWait.value.should == 2;
111 }
112 
113 @("toShared.error")
114 @safe unittest {
115   shared int g;
116 
117   auto s = VoidSender()
118     .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); })
119     .toShared();
120 
121   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
122   g.should == 1;
123   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
124   g.should == 1;
125 
126   race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error");
127   g.should == 1;
128 
129   s.reset();
130   s.syncWait.assumeOk.shouldThrowWithMessage("Error");
131   g.should == 2;
132 }
133 
134 @("toShared.done")
135 @safe unittest {
136   shared int g;
137 
138   auto s = DoneSender()
139     .via(VoidSender()
140          .then(() @trusted shared { g.atomicOp!"+="(1); }))
141     .toShared();
142 
143   s.syncWait.isCancelled.should == true;
144   g.should == 1;
145   s.syncWait.isCancelled.should == true;
146   g.should == 1;
147 
148   race(s, s).syncWait.isCancelled.should == true;
149   g.should == 1;
150 
151   s.reset();
152   s.syncWait.isCancelled.should == true;
153   g.should == 2;
154 }
155 
156 @("toShared.stop")
157 @safe unittest {
158   import concurrency.stoptoken;
159   import core.atomic : atomicStore, atomicLoad;
160   shared bool g;
161 
162   auto waiting = ThreadSender().withStopToken((StopToken token) @trusted {
163       while (!token.isStopRequested) { }
164       g.atomicStore(true);
165     });
166   auto source = new StopSource();
167   auto stopper = just(source).then((StopSource source) shared { source.stop(); });
168 
169   whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true;
170 
171   g.atomicLoad.should == true;
172 }
173 
174 @("toShared.scheduler")
175 @safe unittest {
176   import core.time : msecs;
177   // by default toShared doesn't support scheduling
178   static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().isOk.should == true; }));
179   // have to pass scheduler explicitly
180   import concurrency.scheduler : localThreadScheduler;
181   DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().isOk.should == true;
182 }
183 
184 @("nvro")
185 @safe unittest {
186   static struct Op(Receiver) {
187     Receiver receiver;
188     void* atConstructor;
189     @disable this(ref return scope typeof(this) rhs);
190     this(Receiver receiver) @trusted {
191       this.receiver = receiver;
192       atConstructor = cast(void*)&this;
193     }
194     void start() @trusted nothrow {
195       void* atStart = cast(void*)&this;
196       receiver.setValue(atConstructor == atStart);
197     }
198   }
199   static struct NRVOSender {
200     alias Value = bool;
201     auto connect(Receiver)(return Receiver receiver) @safe scope return {
202       // ensure NRVO
203       auto op = Op!Receiver(receiver);
204       return op;
205     }
206   }
207   NRVOSender().syncWait().isOk.should == true;
208   NRVOSender().via(ThreadSender()).syncWait().isOk.should == true;
209   whenAll(NRVOSender(),VoidSender()).syncWait.isOk.should == true;
210   whenAll(VoidSender(),NRVOSender()).syncWait.isOk.should == true;
211   race(NRVOSender(),NRVOSender()).syncWait.isOk.should == true;
212 }
213 
214 @("justFrom")
215 @safe unittest {
216   justFrom(() shared =>42).syncWait.value.should == 42;
217 }
218 
219 @("delay")
220 @safe unittest {
221   import core.time : msecs;
222 
223   race(delay(2.msecs).then(() shared => 2),
224        delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1;
225 }