1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 void run() { 23 receiver.setValue(value); 24 } 25 void start() @trusted scope { 26 auto value = new Thread(&this.run).start(); 27 } 28 } 29 auto connect(Receiver)(return Receiver receiver) @safe scope return { 30 // ensure NRVO 31 auto op = Op!(Receiver)(receiver, value); 32 return op; 33 } 34 } 35 36 @("ignoreErrors.syncWait.value") 37 @safe unittest { 38 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 39 ThreadSender() 40 .then(dg) 41 .ignoreError() 42 .syncWait.isCancelled.should == true; 43 } 44 45 @("oob") 46 unittest { 47 auto oob = OutOfBandValueSender!int(43); 48 oob.syncWait.value.should == 43; 49 } 50 51 @("race") 52 unittest { 53 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 54 auto fastThread = ThreadSender().then(() shared => 1); 55 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 56 race(fastThread, slowThread).syncWait.value.should == 1; 57 race(slowThread, fastThread).syncWait.value.should == 1; 58 } 59 60 @("race.multiple") 61 unittest { 62 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 63 } 64 65 @("race.exception.single") 66 unittest { 67 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 68 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 69 } 70 71 @("race.exception.double") 72 unittest { 73 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 74 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 75 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 76 } 77 78 @("race.cancel-other") 79 unittest { 80 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 81 while (!token.isStopRequested) { Thread.yield(); } 82 }); 83 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 84 } 85 86 @("race.cancel") 87 unittest { 88 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 89 while (!token.isStopRequested) { Thread.yield(); } 90 }); 91 auto nursery = new shared Nursery(); 92 nursery.run(race(waiting, waiting)); 93 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 94 nursery.syncWait.isCancelled.should == true; 95 } 96 97 @("via") 98 unittest { 99 import std.typecons : tuple; 100 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 101 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 102 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 103 } 104 105 @("then.value.delegate") 106 @safe unittest { 107 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 108 } 109 110 @("then.value.function") 111 @safe unittest { 112 ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9); 113 } 114 115 @("then.oob") 116 @safe unittest { 117 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 118 } 119 120 @("finally") 121 unittest { 122 ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4; 123 ValueSender!int(2).finally_(3).syncWait.value.should == 3; 124 ThrowingSender().finally_(3).syncWait.value.should == 3; 125 ThrowingSender().finally_(() => 4).syncWait.value.should == 4; 126 ThrowingSender().finally_(3).syncWait.value.should == 3; 127 DoneSender().finally_(() => 4).syncWait.isCancelled.should == true; 128 DoneSender().finally_(3).syncWait.isCancelled.should == true; 129 } 130 131 @("whenAll") 132 unittest { 133 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 134 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 135 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 136 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 137 whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true; 138 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 139 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 140 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 141 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 142 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 143 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 144 145 } 146 147 @("whenAll.cancel") 148 unittest { 149 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 150 while (!token.isStopRequested) { Thread.yield(); } 151 }); 152 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 153 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 154 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 155 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 156 while (!token.isStopRequested) { Thread.yield(); } 157 return 42; 158 }); 159 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 160 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 161 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 162 } 163 164 @("whenAll.stop") 165 unittest { 166 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 167 while (!token.isStopRequested) { Thread.yield(); } 168 }); 169 auto source = new StopSource(); 170 auto stopper = just(source).then((StopSource source) shared => source.stop()); 171 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 172 } 173 174 @("retry") 175 unittest { 176 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 177 int t = 3; 178 int n = 0; 179 struct Sender { 180 alias Value = void; 181 static struct Op(Receiver) { 182 Receiver receiver; 183 bool fail; 184 void start() @safe nothrow { 185 if (fail) 186 receiver.setError(new Exception("Fail fail fail")); 187 else 188 receiver.setValue(); 189 } 190 } 191 auto connect(Receiver)(return Receiver receiver) @safe scope return { 192 // ensure NRVO 193 auto op = Op!(Receiver)(receiver, n++ < t); 194 return op; 195 } 196 } 197 Sender().retry(Times(5)).syncWait.isOk.should == true; 198 n.should == 4; 199 n = 0; 200 201 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 202 n.should == 2; 203 shared int p = 0; 204 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 205 p.should == 5; 206 } 207 208 @("whenAll.oob") 209 unittest { 210 auto oob = OutOfBandValueSender!int(43); 211 auto value = ValueSender!int(11); 212 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 213 } 214 215 @("withStopToken.oob") 216 unittest { 217 auto oob = OutOfBandValueSender!int(44); 218 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 219 } 220 221 @("withStopSource.oob") 222 unittest { 223 auto oob = OutOfBandValueSender!int(45); 224 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 225 } 226 227 @("value.withstoptoken.via.thread") 228 @safe unittest { 229 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 230 } 231 232 @("completewithcancellation") 233 @safe unittest { 234 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 235 } 236 237 @("raceAll") 238 @safe unittest { 239 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 240 while (!token.isStopRequested) { Thread.yield(); } 241 }); 242 raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 243 raceAll(waiting, just(42)).syncWait.value.should == 42; 244 raceAll(waiting, ThrowingSender()).syncWait.isError.should == true; 245 } 246 247 @("on.ManualTimeWorker") 248 @safe unittest { 249 import concurrency.scheduler : ManualTimeWorker; 250 251 auto worker = new shared ManualTimeWorker(); 252 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 253 worker.timeUntilNextEvent().should == 10.msecs; 254 worker.advance(5.msecs); 255 worker.timeUntilNextEvent().should == 5.msecs; 256 worker.advance(5.msecs); 257 worker.timeUntilNextEvent().should == null; 258 }); 259 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 260 261 whenAll(timer, driver).syncWait().isOk.should == true; 262 } 263 264 @("on.ManualTimeWorker.cancel") 265 @safe unittest { 266 import concurrency.scheduler : ManualTimeWorker; 267 268 auto worker = new shared ManualTimeWorker(); 269 auto source = new StopSource(); 270 auto driver = just(source).then((StopSource source) shared { 271 worker.timeUntilNextEvent().should == 10.msecs; 272 source.stop(); 273 worker.timeUntilNextEvent().should == null; 274 }); 275 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 276 277 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 278 } 279 280 @("then.stack.no-leak") 281 @safe unittest { 282 struct S { 283 void fun(int i) shared { 284 } 285 } 286 shared S s; 287 // its perfectly ok to point to a function on the stack 288 auto sender = just(42).then(&s.fun); 289 290 sender.syncWait(); 291 292 void disappearSender(Sender)(Sender s) @safe; 293 // but the sender can't leak now 294 static assert(!__traits(compiles, disappearSender(sender))); 295 } 296 297 @("forwardOn") 298 @safe unittest { 299 auto pool = stdTaskPool(2); 300 301 VoidSender().forwardOn(pool.getScheduler).syncWait.isOk.should == true; 302 ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true; 303 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true; 304 just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42; 305 }