1 module ut.concurrency.nursery; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.nursery; 8 import concurrency.stoptoken; 9 import unit_threaded; 10 11 @("run.value") 12 @safe unittest { 13 auto nursery = new shared Nursery(); 14 nursery.run(ValueSender!(int)(5)); 15 nursery.syncWait.isOk.should == true; 16 nursery.getStopToken().isStopRequested().shouldBeFalse(); 17 } 18 19 @("run.exception") 20 @safe unittest { 21 auto nursery = new shared Nursery(); 22 nursery.run(ThrowingSender()); 23 nursery.syncWait.assumeOk.shouldThrow(); 24 nursery.getStopToken().isStopRequested().shouldBeTrue(); 25 } 26 27 @("run.value.then") 28 @safe unittest { 29 auto nursery = new shared Nursery(); 30 shared(int) global; 31 nursery.run(ValueSender!(int)(5).then((int c) shared => global = c)); 32 global.shouldEqual(0); 33 nursery.syncWait.isOk.should == true; 34 global.shouldEqual(5); 35 } 36 37 @("run.thread.run") 38 @safe unittest { 39 auto nursery = new shared Nursery(); 40 shared(int) global; 41 nursery.run(ThreadSender().then(() shared @safe { 42 nursery.run(ValueSender!(int)(5).then((int c) shared @safe { 43 global = c; 44 })); 45 })); 46 global.shouldEqual(0); 47 nursery.syncWait.isOk.should == true; 48 global.shouldEqual(5); 49 nursery.getStopToken().isStopRequested().shouldBeFalse(); 50 } 51 52 @("run.thread.stop.internal") 53 @safe unittest { 54 auto nursery = new shared Nursery(); 55 nursery.run(ThreadSender().then(() shared @safe => nursery.stop())); 56 nursery.syncWait.isCancelled.should == true; 57 nursery.getStopToken().isStopRequested().shouldBeTrue(); 58 } 59 60 @("run.thread.stop.external") 61 @trusted unittest { 62 auto nursery = new shared Nursery(); 63 auto stopSource = new shared StopSource(); 64 nursery.run(ThreadSender().then(() shared @safe => stopSource.stop())); 65 nursery.syncWait(cast(StopSource)stopSource).isCancelled.should == true; 66 nursery.getStopToken().isStopRequested().shouldBeTrue(); 67 stopSource.isStopRequested().shouldBeTrue(); 68 } 69 70 @("run.thread.stop.internal.sibling") 71 @safe unittest { 72 import core.thread : Thread; 73 auto nursery = new shared Nursery(); 74 auto thread1 = ThreadSender().then(() shared @trusted { 75 auto token = nursery.getStopToken(); 76 while (!token.isStopRequested()) Thread.yield(); 77 }); 78 auto thread2 = ThreadSender().then(() shared @safe => nursery.stop()); 79 nursery.run(thread1); 80 nursery.run(thread2); 81 nursery.syncWait.isCancelled.should == true; 82 nursery.getStopToken().isStopRequested().shouldBeTrue(); 83 } 84 85 @("run.nested") 86 @safe unittest { 87 auto nursery1 = new shared Nursery(); 88 auto nursery2 = new shared Nursery(); 89 shared(int) global; 90 nursery1.run(nursery2); 91 nursery2.run(ValueSender!(int)(99).then((int c) shared => global = c)); 92 global.shouldEqual(0); 93 nursery1.syncWait.isOk.should == true; 94 global.shouldEqual(99); 95 nursery1.getStopToken().isStopRequested().shouldBeFalse(); 96 nursery2.getStopToken().isStopRequested().shouldBeFalse(); 97 } 98 99 @("run.error") 100 @safe unittest { 101 import core.thread : Thread; 102 auto nursery = new shared Nursery(); 103 auto thread1 = ThreadSender().then(() shared @trusted { 104 auto token = nursery.getStopToken(); 105 while (!token.isStopRequested()) Thread.yield(); 106 }); 107 auto thread2 = ThreadSender().withStopToken((StopToken token) shared @trusted { 108 while (!token.isStopRequested()) Thread.yield(); 109 }); 110 auto thread3 = ThreadSender().then(() shared @safe { throw new Exception("Error should stop everyone"); }); 111 nursery.run(thread1); 112 nursery.run(thread2); 113 nursery.run(thread3); 114 nursery.getStopToken().isStopRequested().shouldBeFalse(); 115 nursery.syncWait.assumeOk.shouldThrow(); 116 nursery.getStopToken().isStopRequested().shouldBeTrue(); 117 } 118 119 @("withStopSource.1") 120 unittest { 121 import core.thread : Thread; 122 auto stopSource = new StopSource(); 123 auto nursery = new shared Nursery(); 124 125 auto thread1 = ThreadSender() 126 .withStopToken((StopToken stopToken) shared @trusted { 127 while(!stopToken.isStopRequested) 128 Thread.yield(); 129 }) 130 .withStopSource(stopSource); 131 132 // stop via the source 133 auto stopper = ValueSender!StopSource(stopSource).then((StopSource stopSource) shared => stopSource.stop()); 134 135 nursery.run(thread1); 136 nursery.run(stopper); 137 138 nursery.syncWait.isOk.should == true; 139 } 140 141 @("withStopSource.2") 142 unittest { 143 import core.thread : Thread; 144 auto stopSource = new StopSource(); 145 auto nursery = new shared Nursery(); 146 147 auto thread1 = ThreadSender() 148 .withStopToken((StopToken stopToken) shared @trusted { 149 while(!stopToken.isStopRequested) 150 Thread.yield(); 151 }) 152 .withStopSource(stopSource); 153 154 // stop via the nursery 155 auto stopper = ValueSender!(shared Nursery)(nursery).then((shared Nursery nursery) shared => nursery.stop()); 156 157 nursery.run(thread1); 158 nursery.run(stopper); 159 160 nursery.syncWait.isCancelled.should == true; 161 } 162 163 @("nothrow") 164 @safe unittest { 165 auto nursery = new shared Nursery(); 166 (() nothrow => nursery.run(ValueSender!int(21)))(); 167 }