1 module ut.concurrency.nursery; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.nursery; 8 import concurrency.stoptoken; 9 import unit_threaded; 10 11 @("run.stopped") 12 @safe unittest { 13 auto nursery = new shared Nursery(); 14 nursery.stop(); 15 nursery.syncWait().isCancelled.should == true; 16 } 17 18 @("run.empty") 19 @safe unittest { 20 auto nursery = new shared Nursery(); 21 auto stop = justFrom(() shared => nursery.stop()); 22 whenAll(nursery, stop).syncWait().isCancelled.should == true; 23 } 24 25 @("run.value") 26 @safe unittest { 27 auto nursery = new shared Nursery(); 28 nursery.run(ValueSender!(int)(5)); 29 nursery.syncWait.assumeOk; 30 nursery.getStopToken().isStopRequested().shouldBeFalse(); 31 } 32 33 @("run.exception") 34 @safe unittest { 35 auto nursery = new shared Nursery(); 36 nursery.run(ThrowingSender()); 37 nursery.syncWait.assumeOk.shouldThrow(); 38 nursery.getStopToken().isStopRequested().shouldBeTrue(); 39 } 40 41 @("run.value.then") 42 @safe unittest { 43 auto nursery = new shared Nursery(); 44 shared(int) global; 45 nursery.run(ValueSender!(int)(5).then((int c) shared => global = c)); 46 global.shouldEqual(0); 47 nursery.syncWait.assumeOk; 48 global.shouldEqual(5); 49 } 50 51 @("run.thread.run") 52 @safe unittest { 53 auto nursery = new shared Nursery(); 54 shared(int) global; 55 nursery.run(ThreadSender().then(() shared @safe { 56 nursery.run(ValueSender!(int)(5).then((int c) shared @safe { 57 global = c; 58 })); 59 })); 60 global.shouldEqual(0); 61 nursery.syncWait.assumeOk; 62 global.shouldEqual(5); 63 nursery.getStopToken().isStopRequested().shouldBeFalse(); 64 } 65 66 @("run.thread.stop.internal") 67 @safe unittest { 68 auto nursery = new shared Nursery(); 69 nursery.run(ThreadSender().then(() shared @safe => nursery.stop())); 70 nursery.syncWait.isCancelled.should == true; 71 nursery.getStopToken().isStopRequested().shouldBeTrue(); 72 } 73 74 @("run.thread.stop.external") 75 @trusted unittest { 76 auto nursery = new shared Nursery(); 77 auto stopSource = new shared StopSource(); 78 nursery.run(ThreadSender().then(() shared @safe => stopSource.stop())); 79 nursery.syncWait(cast(StopSource)stopSource).isCancelled.should == true; 80 nursery.getStopToken().isStopRequested().shouldBeTrue(); 81 stopSource.isStopRequested().shouldBeTrue(); 82 } 83 84 @("run.thread.stop.internal.sibling") 85 @safe unittest { 86 import core.thread : Thread; 87 auto nursery = new shared Nursery(); 88 auto thread1 = ThreadSender().then(() shared @trusted { 89 auto token = nursery.getStopToken(); 90 while (!token.isStopRequested()) Thread.yield(); 91 }); 92 auto thread2 = ThreadSender().then(() shared @safe => nursery.stop()); 93 nursery.run(thread1); 94 nursery.run(thread2); 95 nursery.syncWait.isCancelled.should == true; 96 nursery.getStopToken().isStopRequested().shouldBeTrue(); 97 } 98 99 @("run.nested") 100 @safe unittest { 101 auto nursery1 = new shared Nursery(); 102 auto nursery2 = new shared Nursery(); 103 shared(int) global; 104 nursery1.run(nursery2); 105 nursery2.run(ValueSender!(int)(99).then((int c) shared => global = c)); 106 global.shouldEqual(0); 107 nursery1.syncWait.assumeOk; 108 global.shouldEqual(99); 109 nursery1.getStopToken().isStopRequested().shouldBeFalse(); 110 nursery2.getStopToken().isStopRequested().shouldBeFalse(); 111 } 112 113 @("run.error") 114 @safe unittest { 115 import core.thread : Thread; 116 auto nursery = new shared Nursery(); 117 auto thread1 = ThreadSender().then(() shared @trusted { 118 auto token = nursery.getStopToken(); 119 while (!token.isStopRequested()) Thread.yield(); 120 }); 121 auto thread2 = ThreadSender().withStopToken((StopToken token) shared @trusted { 122 while (!token.isStopRequested()) Thread.yield(); 123 }); 124 auto thread3 = ThreadSender().then(() shared @safe { throw new Exception("Error should stop everyone"); }); 125 nursery.run(thread1); 126 nursery.run(thread2); 127 nursery.run(thread3); 128 nursery.getStopToken().isStopRequested().shouldBeFalse(); 129 nursery.syncWait.assumeOk.shouldThrow(); 130 nursery.getStopToken().isStopRequested().shouldBeTrue(); 131 } 132 133 @("withStopSource.1") 134 unittest { 135 import core.thread : Thread; 136 auto stopSource = new StopSource(); 137 auto nursery = new shared Nursery(); 138 139 auto thread1 = ThreadSender() 140 .withStopToken((StopToken stopToken) shared @trusted { 141 while(!stopToken.isStopRequested) 142 Thread.yield(); 143 }) 144 .withStopSource(stopSource); 145 146 // stop via the source 147 auto stopper = ValueSender!StopSource(stopSource).then((StopSource stopSource) shared => stopSource.stop()); 148 149 nursery.run(thread1); 150 nursery.run(stopper); 151 152 nursery.syncWait.assumeOk; 153 } 154 155 @("withStopSource.2") 156 unittest { 157 import core.thread : Thread; 158 auto stopSource = new StopSource(); 159 auto nursery = new shared Nursery(); 160 161 auto thread1 = ThreadSender() 162 .withStopToken((StopToken stopToken) shared @trusted { 163 while(!stopToken.isStopRequested) 164 Thread.yield(); 165 }) 166 .withStopSource(stopSource); 167 168 // stop via the nursery 169 auto stopper = ValueSender!(shared Nursery)(nursery).then((shared Nursery nursery) shared => nursery.stop()); 170 171 nursery.run(thread1); 172 nursery.run(stopper); 173 174 nursery.syncWait.isCancelled.should == true; 175 }