1 module ut.concurrency.nursery;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.nursery;
8 import concurrency.stoptoken;
9 import unit_threaded;
10 
11 @("run.value")
12 @safe unittest {
13   auto nursery = new shared Nursery();
14   nursery.run(ValueSender!(int)(5));
15   nursery.syncWait.isOk.should == true;
16   nursery.getStopToken().isStopRequested().shouldBeFalse();
17 }
18 
19 @("run.exception")
20 @safe unittest {
21   auto nursery = new shared Nursery();
22   nursery.run(ThrowingSender());
23   nursery.syncWait.assumeOk.shouldThrow();
24   nursery.getStopToken().isStopRequested().shouldBeTrue();
25 }
26 
27 @("run.value.then")
28 @safe unittest {
29   auto nursery = new shared Nursery();
30   shared(int) global;
31   nursery.run(ValueSender!(int)(5).then((int c) shared => global = c));
32   global.shouldEqual(0);
33   nursery.syncWait.isOk.should == true;
34   global.shouldEqual(5);
35 }
36 
37 @("run.thread.run")
38 @safe unittest {
39   auto nursery = new shared Nursery();
40   shared(int) global;
41   nursery.run(ThreadSender().then(() shared @safe {
42         nursery.run(ValueSender!(int)(5).then((int c) shared @safe {
43               global = c;
44             }));
45       }));
46   global.shouldEqual(0);
47   nursery.syncWait.isOk.should == true;
48   global.shouldEqual(5);
49   nursery.getStopToken().isStopRequested().shouldBeFalse();
50 }
51 
52 @("run.thread.stop.internal")
53 @safe unittest {
54   auto nursery = new shared Nursery();
55   nursery.run(ThreadSender().then(() shared @safe => nursery.stop()));
56   nursery.syncWait.isCancelled.should == true;
57   nursery.getStopToken().isStopRequested().shouldBeTrue();
58 }
59 
60 @("run.thread.stop.external")
61 @trusted unittest {
62   auto nursery = new shared Nursery();
63   auto stopSource = new shared StopSource();
64   nursery.run(ThreadSender().then(() shared @safe => stopSource.stop()));
65   nursery.syncWait(cast(StopSource)stopSource).isCancelled.should == true;
66   nursery.getStopToken().isStopRequested().shouldBeTrue();
67   stopSource.isStopRequested().shouldBeTrue();
68 }
69 
70 @("run.thread.stop.internal.sibling")
71 @safe unittest {
72   import core.thread : Thread;
73   auto nursery = new shared Nursery();
74   auto thread1 = ThreadSender().then(() shared @trusted {
75       auto token = nursery.getStopToken();
76       while (!token.isStopRequested()) Thread.yield();
77     });
78   auto thread2 = ThreadSender().then(() shared @safe => nursery.stop());
79   nursery.run(thread1);
80   nursery.run(thread2);
81   nursery.syncWait.isCancelled.should == true;
82   nursery.getStopToken().isStopRequested().shouldBeTrue();
83 }
84 
85 @("run.nested")
86 @safe unittest {
87   auto nursery1 = new shared Nursery();
88   auto nursery2 = new shared Nursery();
89   shared(int) global;
90   nursery1.run(nursery2);
91   nursery2.run(ValueSender!(int)(99).then((int c) shared => global = c));
92   global.shouldEqual(0);
93   nursery1.syncWait.isOk.should == true;
94   global.shouldEqual(99);
95   nursery1.getStopToken().isStopRequested().shouldBeFalse();
96   nursery2.getStopToken().isStopRequested().shouldBeFalse();
97 }
98 
99 @("run.error")
100 @safe unittest {
101   import core.thread : Thread;
102   auto nursery = new shared Nursery();
103   auto thread1 = ThreadSender().then(() shared @trusted {
104       auto token = nursery.getStopToken();
105       while (!token.isStopRequested()) Thread.yield();
106     });
107   auto thread2 = ThreadSender().withStopToken((StopToken token) shared @trusted {
108       while (!token.isStopRequested()) Thread.yield();
109     });
110   auto thread3 = ThreadSender().then(() shared @safe { throw new Exception("Error should stop everyone"); });
111   nursery.run(thread1);
112   nursery.run(thread2);
113   nursery.run(thread3);
114   nursery.getStopToken().isStopRequested().shouldBeFalse();
115   nursery.syncWait.assumeOk.shouldThrow();
116   nursery.getStopToken().isStopRequested().shouldBeTrue();
117 }
118 
119 @("withStopSource.1")
120 unittest {
121   import core.thread : Thread;
122   auto stopSource = new StopSource();
123   auto nursery = new shared Nursery();
124 
125   auto thread1 = ThreadSender()
126     .withStopToken((StopToken stopToken) shared @trusted {
127           while(!stopToken.isStopRequested)
128             Thread.yield();
129       })
130     .withStopSource(stopSource);
131 
132   // stop via the source
133   auto stopper = ValueSender!StopSource(stopSource).then((StopSource stopSource) shared => stopSource.stop());
134 
135   nursery.run(thread1);
136   nursery.run(stopper);
137 
138   nursery.syncWait.isOk.should == true;
139 }
140 
141 @("withStopSource.2")
142 unittest {
143   import core.thread : Thread;
144   auto stopSource = new StopSource();
145   auto nursery = new shared Nursery();
146 
147   auto thread1 = ThreadSender()
148     .withStopToken((StopToken stopToken) shared @trusted {
149         while(!stopToken.isStopRequested)
150           Thread.yield();
151       })
152     .withStopSource(stopSource);
153 
154   // stop via the nursery
155   auto stopper = ValueSender!(shared Nursery)(nursery).then((shared Nursery nursery) shared => nursery.stop());
156 
157   nursery.run(thread1);
158   nursery.run(stopper);
159 
160   nursery.syncWait.isCancelled.should == true;
161 }
162 
163 @("nothrow")
164 @safe unittest {
165   auto nursery = new shared Nursery();
166   (() nothrow => nursery.run(ValueSender!int(21)))();
167 }