1 module ut.concurrency.asyncscope;
2 
3 import concurrency.asyncscope;
4 import concurrency : syncWait;
5 import concurrency.sender : VoidSender, DoneSender, ThrowingSender;
6 import concurrency.stoptoken : StopToken;
7 import unit_threaded;
8 
9 @("cleanup.empty")
10 @safe unittest {
11   auto s = asyncScope();
12   s.cleanup.syncWait.assumeOk;
13   s.cleanup.syncWait.assumeOk; // test twice
14 }
15 
16 @("cleanup.voidsender.single")
17 @safe unittest {
18   auto s = asyncScope();
19   s.spawn(VoidSender()).should == true;
20   s.cleanup.syncWait.assumeOk;
21   s.cleanup.syncWait.assumeOk; // test twice
22 }
23 
24 @("cleanup.voidsender.triple")
25 @safe unittest {
26   auto s = asyncScope();
27   s.spawn(VoidSender()).should == true;
28   s.spawn(VoidSender()).should == true;
29   s.spawn(VoidSender()).should == true;
30   s.cleanup.syncWait.assumeOk;
31   s.cleanup.syncWait.assumeOk; // test twice
32 }
33 
34 @("cleanup.waitingsender.single")
35 @safe unittest {
36   auto s = asyncScope();
37   s.spawn(waitingTask).should == true;
38   s.cleanup.syncWait.assumeOk;
39   s.cleanup.syncWait.assumeOk; // test twice
40 }
41 
42 @("cleanup.waitingsender.triple")
43 @safe unittest {
44   auto s = asyncScope();
45   s.spawn(waitingTask).should == true;
46   s.spawn(waitingTask).should == true;
47   s.spawn(waitingTask).should == true;
48   s.cleanup.syncWait.assumeOk;
49   s.cleanup.syncWait.assumeOk; // test twice
50 }
51 
52 @("spawn.stopped")
53 @safe unittest {
54   auto s = asyncScope();
55   s.cleanup.syncWait.assumeOk;
56   s.spawn(VoidSender()).should == false;
57 }
58 
59 @("spawn.error")
60 @safe unittest {
61   auto s = asyncScope();
62   s.spawn(ThrowingSender()).should == true;
63   s.cleanup.syncWait.assumeOk.shouldThrow;
64   s.cleanup.syncWait.assumeOk.shouldThrow; // test twice
65 }
66 
67 @("spawn.reentry")
68 @safe unittest {
69   import concurrency.sender : justFrom;
70   auto s = asyncScope();
71   s.spawn(justFrom(() shared { s.spawn(VoidSender()); })).should == true;
72   s.cleanup.syncWait.assumeOk;
73 }
74 
75 @("spawn.value.transform")
76 @safe unittest {
77   import concurrency.sender : just;
78   import concurrency.operations : then;
79   auto s = asyncScope();
80   s.spawn(just(42).then((int) {})).should == true;
81   s.cleanup.syncWait.assumeOk;
82   s.cleanup.syncWait.assumeOk; // test twice
83 }
84 
85 @("cleanup.scoped")
86 @safe unittest {
87   import concurrency.operations : onTermination;
88   import core.atomic : atomicStore;
89   shared bool p;
90   {
91     auto s = asyncScope();
92     s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); }));
93   }
94   p.should == true;
95 }
96 
97 @("cleanup.nested.struct")
98 @safe unittest {
99   import concurrency.operations : onTermination;
100   import core.atomic : atomicStore;
101   shared bool p;
102   static struct S {
103     shared AsyncScope s;
104   }
105   {
106     S s = S(asyncScope);
107     s.s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); }));
108   }
109   p.should == true;
110 }
111 
112 @("cleanup.nested.class")
113 @trusted unittest {
114   import concurrency.operations : onTermination;
115   import core.atomic : atomicStore;
116   shared bool p;
117   static class S {
118     shared AsyncScope s;
119     this() {
120       s = asyncScope();
121     }
122   }
123   auto s = new S();
124   s.s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); }));
125   destroy(s);
126   p.should == true;
127 }
128 
129 @("spawn.assert.thread")
130 @safe unittest {
131   import concurrency.thread : ThreadSender;
132   import concurrency.operations : then;
133   auto fail = ThreadSender().then(() shared {
134       assert(false, "bad things happen");
135     });
136   auto s = asyncScope();
137 
138   s.spawn(fail).should == true;
139   s.cleanup.syncWait.shouldThrow!Throwable;
140 }
141 
142 @("spawn.assert.inline")
143 @trusted unittest {
144   import concurrency.thread : ThreadSender;
145   import concurrency.sender : justFrom;
146 
147   auto fail = justFrom(() shared {
148       assert(0, "bad things happen 2");
149     });
150   auto s = asyncScope();
151 
152   s.spawn(fail).shouldThrow!Throwable;
153   s.cleanup.syncWait.shouldThrow!Throwable;
154 }
155 
156 @("cleanup.assert.then")
157 @safe unittest {
158   import concurrency.thread : ThreadSender;
159   import concurrency.operations : then;
160   auto s = asyncScope();
161 
162   s.cleanup.then(() shared { assert(false, "Ohh no!"); }).syncWait.shouldThrow!Throwable;
163 }
164 
165 auto waitingTask() {
166   import concurrency.thread : ThreadSender;
167   import concurrency.operations : withStopToken;
168 
169   return ThreadSender().withStopToken((StopToken token) @trusted {
170       import core.thread : Thread;
171       while (!token.isStopRequested) { Thread.yield(); }
172     });
173 }
174 
175 @("withScheduler")
176 @safe unittest {
177   import concurrency.sender : VoidSender;
178   import concurrency.operations : withScheduler;
179   import concurrency.scheduler : localThreadScheduler;
180   auto s = asyncScope();
181 
182   s.spawn(VoidSender().withScheduler(localThreadScheduler));
183   s.cleanup.syncWait.assumeOk;
184 }