1 module ut.concurrency.nursery;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.nursery;
8 import concurrency.stoptoken;
9 import unit_threaded;
10 
11 @("run.stopped")
12 @safe unittest {
13   auto nursery = new shared Nursery();
14   nursery.stop();
15   nursery.syncWait().isCancelled.should == true;
16 }
17 
18 @("run.empty")
19 @safe unittest {
20   auto nursery = new shared Nursery();
21   auto stop = justFrom(() shared => nursery.stop());
22   whenAll(nursery, stop).syncWait().isCancelled.should == true;
23 }
24 
25 @("run.value")
26 @safe unittest {
27   auto nursery = new shared Nursery();
28   nursery.run(ValueSender!(int)(5));
29   nursery.syncWait.assumeOk;
30   nursery.getStopToken().isStopRequested().shouldBeFalse();
31 }
32 
33 @("run.exception")
34 @safe unittest {
35   auto nursery = new shared Nursery();
36   nursery.run(ThrowingSender());
37   nursery.syncWait.assumeOk.shouldThrow();
38   nursery.getStopToken().isStopRequested().shouldBeTrue();
39 }
40 
41 @("run.value.then")
42 @safe unittest {
43   auto nursery = new shared Nursery();
44   shared(int) global;
45   nursery.run(ValueSender!(int)(5).then((int c) shared => global = c));
46   global.shouldEqual(0);
47   nursery.syncWait.assumeOk;
48   global.shouldEqual(5);
49 }
50 
51 @("run.thread.run")
52 @safe unittest {
53   auto nursery = new shared Nursery();
54   shared(int) global;
55   nursery.run(ThreadSender().then(() shared @safe {
56         nursery.run(ValueSender!(int)(5).then((int c) shared @safe {
57               global = c;
58             }));
59       }));
60   global.shouldEqual(0);
61   nursery.syncWait.assumeOk;
62   global.shouldEqual(5);
63   nursery.getStopToken().isStopRequested().shouldBeFalse();
64 }
65 
66 @("run.thread.stop.internal")
67 @safe unittest {
68   auto nursery = new shared Nursery();
69   nursery.run(ThreadSender().then(() shared @safe => nursery.stop()));
70   nursery.syncWait.isCancelled.should == true;
71   nursery.getStopToken().isStopRequested().shouldBeTrue();
72 }
73 
74 @("run.thread.stop.external")
75 @trusted unittest {
76   auto nursery = new shared Nursery();
77   auto stopSource = new shared StopSource();
78   nursery.run(ThreadSender().then(() shared @safe => stopSource.stop()));
79   nursery.syncWait(cast(StopSource)stopSource).isCancelled.should == true;
80   nursery.getStopToken().isStopRequested().shouldBeTrue();
81   stopSource.isStopRequested().shouldBeTrue();
82 }
83 
84 @("run.thread.stop.internal.sibling")
85 @safe unittest {
86   import core.thread : Thread;
87   auto nursery = new shared Nursery();
88   auto thread1 = ThreadSender().then(() shared @trusted {
89       auto token = nursery.getStopToken();
90       while (!token.isStopRequested()) Thread.yield();
91     });
92   auto thread2 = ThreadSender().then(() shared @safe => nursery.stop());
93   nursery.run(thread1);
94   nursery.run(thread2);
95   nursery.syncWait.isCancelled.should == true;
96   nursery.getStopToken().isStopRequested().shouldBeTrue();
97 }
98 
99 @("run.nested")
100 @safe unittest {
101   auto nursery1 = new shared Nursery();
102   auto nursery2 = new shared Nursery();
103   shared(int) global;
104   nursery1.run(nursery2);
105   nursery2.run(ValueSender!(int)(99).then((int c) shared => global = c));
106   global.shouldEqual(0);
107   nursery1.syncWait.assumeOk;
108   global.shouldEqual(99);
109   nursery1.getStopToken().isStopRequested().shouldBeFalse();
110   nursery2.getStopToken().isStopRequested().shouldBeFalse();
111 }
112 
113 @("run.error")
114 @safe unittest {
115   import core.thread : Thread;
116   auto nursery = new shared Nursery();
117   auto thread1 = ThreadSender().then(() shared @trusted {
118       auto token = nursery.getStopToken();
119       while (!token.isStopRequested()) Thread.yield();
120     });
121   auto thread2 = ThreadSender().withStopToken((StopToken token) shared @trusted {
122       while (!token.isStopRequested()) Thread.yield();
123     });
124   auto thread3 = ThreadSender().then(() shared @safe { throw new Exception("Error should stop everyone"); });
125   nursery.run(thread1);
126   nursery.run(thread2);
127   nursery.run(thread3);
128   nursery.getStopToken().isStopRequested().shouldBeFalse();
129   nursery.syncWait.assumeOk.shouldThrow();
130   nursery.getStopToken().isStopRequested().shouldBeTrue();
131 }
132 
133 @("withStopSource.1")
134 unittest {
135   import core.thread : Thread;
136   auto stopSource = new StopSource();
137   auto nursery = new shared Nursery();
138 
139   auto thread1 = ThreadSender()
140     .withStopToken((StopToken stopToken) shared @trusted {
141           while(!stopToken.isStopRequested)
142             Thread.yield();
143       })
144     .withStopSource(stopSource);
145 
146   // stop via the source
147   auto stopper = ValueSender!StopSource(stopSource).then((StopSource stopSource) shared => stopSource.stop());
148 
149   nursery.run(thread1);
150   nursery.run(stopper);
151 
152   nursery.syncWait.assumeOk;
153 }
154 
155 @("withStopSource.2")
156 unittest {
157   import core.thread : Thread;
158   auto stopSource = new StopSource();
159   auto nursery = new shared Nursery();
160 
161   auto thread1 = ThreadSender()
162     .withStopToken((StopToken stopToken) shared @trusted {
163         while(!stopToken.isStopRequested)
164           Thread.yield();
165       })
166     .withStopSource(stopSource);
167 
168   // stop via the nursery
169   auto stopper = ValueSender!(shared Nursery)(nursery).then((shared Nursery nursery) shared => nursery.stop());
170 
171   nursery.run(thread1);
172   nursery.run(stopper);
173 
174   nursery.syncWait.isCancelled.should == true;
175 }