1 module ut.concurrency.asyncscope; 2 3 import concurrency.asyncscope; 4 import concurrency : syncWait; 5 import concurrency.sender : VoidSender, DoneSender, ThrowingSender; 6 import concurrency.stoptoken : StopToken; 7 import unit_threaded; 8 9 @("cleanup.empty") 10 @safe unittest { 11 auto s = asyncScope(); 12 s.cleanup.syncWait.assumeOk; 13 s.cleanup.syncWait.assumeOk; // test twice 14 } 15 16 @("cleanup.voidsender.single") 17 @safe unittest { 18 auto s = asyncScope(); 19 s.spawn(VoidSender()).should == true; 20 s.cleanup.syncWait.assumeOk; 21 s.cleanup.syncWait.assumeOk; // test twice 22 } 23 24 @("cleanup.voidsender.triple") 25 @safe unittest { 26 auto s = asyncScope(); 27 s.spawn(VoidSender()).should == true; 28 s.spawn(VoidSender()).should == true; 29 s.spawn(VoidSender()).should == true; 30 s.cleanup.syncWait.assumeOk; 31 s.cleanup.syncWait.assumeOk; // test twice 32 } 33 34 @("cleanup.waitingsender.single") 35 @safe unittest { 36 auto s = asyncScope(); 37 s.spawn(waitingTask).should == true; 38 s.cleanup.syncWait.assumeOk; 39 s.cleanup.syncWait.assumeOk; // test twice 40 } 41 42 @("cleanup.waitingsender.triple") 43 @safe unittest { 44 auto s = asyncScope(); 45 s.spawn(waitingTask).should == true; 46 s.spawn(waitingTask).should == true; 47 s.spawn(waitingTask).should == true; 48 s.cleanup.syncWait.assumeOk; 49 s.cleanup.syncWait.assumeOk; // test twice 50 } 51 52 @("spawn.stopped") 53 @safe unittest { 54 auto s = asyncScope(); 55 s.cleanup.syncWait.assumeOk; 56 s.spawn(VoidSender()).should == false; 57 } 58 59 @("spawn.error") 60 @safe unittest { 61 auto s = asyncScope(); 62 s.spawn(ThrowingSender()).should == true; 63 s.cleanup.syncWait.assumeOk.shouldThrow; 64 s.cleanup.syncWait.assumeOk.shouldThrow; // test twice 65 } 66 67 @("spawn.reentry") 68 @safe unittest { 69 import concurrency.sender : justFrom; 70 auto s = asyncScope(); 71 s.spawn(justFrom(() shared { s.spawn(VoidSender()); })).should == true; 72 s.cleanup.syncWait.assumeOk; 73 } 74 75 @("spawn.value.transform") 76 @safe unittest { 77 import concurrency.sender : just; 78 import concurrency.operations : then; 79 auto s = asyncScope(); 80 s.spawn(just(42).then((int) {})).should == true; 81 s.cleanup.syncWait.assumeOk; 82 s.cleanup.syncWait.assumeOk; // test twice 83 } 84 85 @("cleanup.scoped") 86 @safe unittest { 87 import concurrency.operations : onTermination; 88 import core.atomic : atomicStore; 89 shared bool p; 90 { 91 auto s = asyncScope(); 92 s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); })); 93 } 94 p.should == true; 95 } 96 97 @("cleanup.nested.struct") 98 @safe unittest { 99 import concurrency.operations : onTermination; 100 import core.atomic : atomicStore; 101 shared bool p; 102 static struct S { 103 shared AsyncScope s; 104 } 105 { 106 S s = S(asyncScope); 107 s.s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); })); 108 } 109 p.should == true; 110 } 111 112 @("cleanup.nested.class") 113 @trusted unittest { 114 import concurrency.operations : onTermination; 115 import core.atomic : atomicStore; 116 shared bool p; 117 static class S { 118 shared AsyncScope s; 119 this() { 120 s = asyncScope(); 121 } 122 } 123 auto s = new S(); 124 s.s.spawn(waitingTask().onTermination(() shared { p.atomicStore(true); })); 125 destroy(s); 126 p.should == true; 127 } 128 129 @("spawn.assert.thread") 130 @safe unittest { 131 import concurrency.thread : ThreadSender; 132 import concurrency.operations : then; 133 auto fail = ThreadSender().then(() shared { 134 assert(false, "bad things happen"); 135 }); 136 auto s = asyncScope(); 137 138 s.spawn(fail).should == true; 139 s.cleanup.syncWait.shouldThrow!Throwable; 140 } 141 142 @("spawn.assert.inline") 143 @trusted unittest { 144 import concurrency.thread : ThreadSender; 145 import concurrency.sender : justFrom; 146 147 auto fail = justFrom(() shared { 148 assert(0, "bad things happen 2"); 149 }); 150 auto s = asyncScope(); 151 152 s.spawn(fail).shouldThrow!Throwable; 153 s.cleanup.syncWait.shouldThrow!Throwable; 154 } 155 156 @("cleanup.assert.then") 157 @safe unittest { 158 import concurrency.thread : ThreadSender; 159 import concurrency.operations : then; 160 auto s = asyncScope(); 161 162 s.cleanup.then(() shared { assert(false, "Ohh no!"); }).syncWait.shouldThrow!Throwable; 163 } 164 165 auto waitingTask() { 166 import concurrency.thread : ThreadSender; 167 import concurrency.operations : withStopToken; 168 169 return ThreadSender().withStopToken((StopToken token) @trusted { 170 import core.thread : Thread; 171 while (!token.isStopRequested) { Thread.yield(); } 172 }); 173 } 174 175 @("withScheduler") 176 @safe unittest { 177 import concurrency.sender : VoidSender; 178 import concurrency.operations : withScheduler; 179 import concurrency.scheduler : localThreadScheduler; 180 auto s = asyncScope(); 181 182 s.spawn(VoidSender().withScheduler(localThreadScheduler)); 183 s.cleanup.syncWait.assumeOk; 184 }