1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 void run() { 23 receiver.setValue(value); 24 } 25 void start() @trusted scope { 26 auto value = new Thread(&this.run).start(); 27 } 28 } 29 auto connect(Receiver)(return Receiver receiver) @safe scope return { 30 // ensure NRVO 31 auto op = Op!(Receiver)(receiver, value); 32 return op; 33 } 34 } 35 36 @("ignoreErrors.syncWait.value") 37 @safe unittest { 38 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 39 ThreadSender() 40 .then(dg) 41 .ignoreError() 42 .syncWait.isCancelled.should == true; 43 } 44 45 @("oob") 46 unittest { 47 auto oob = OutOfBandValueSender!int(43); 48 oob.syncWait.value.should == 43; 49 } 50 51 @("race") 52 unittest { 53 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 54 auto fastThread = ThreadSender().then(() shared => 1); 55 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 56 race(fastThread, slowThread).syncWait.value.should == 1; 57 race(slowThread, fastThread).syncWait.value.should == 1; 58 } 59 60 @("race.multiple") 61 unittest { 62 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 63 } 64 65 @("race.exception.single") 66 unittest { 67 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 68 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 69 } 70 71 @("race.exception.double") 72 unittest { 73 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 74 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 75 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 76 } 77 78 @("race.cancel-other") 79 unittest { 80 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 81 while (!token.isStopRequested) { Thread.yield(); } 82 }); 83 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 84 } 85 86 @("race.cancel") 87 unittest { 88 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 89 while (!token.isStopRequested) { Thread.yield(); } 90 }); 91 auto nursery = new shared Nursery(); 92 nursery.run(race(waiting, waiting)); 93 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 94 nursery.syncWait.isCancelled.should == true; 95 } 96 97 @("via") 98 unittest { 99 import std.typecons : tuple; 100 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 101 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 102 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 103 } 104 105 @("then.value.delegate") 106 @safe unittest { 107 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 108 } 109 110 @("then.value.function") 111 @safe unittest { 112 ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9); 113 } 114 115 @("then.oob") 116 @safe unittest { 117 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 118 } 119 120 @("then.tuple") 121 @safe unittest { 122 just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1); 123 } 124 125 @("then.tuple.expand") 126 @safe unittest { 127 just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3); 128 } 129 130 @("finally") 131 unittest { 132 ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4; 133 ValueSender!int(2).finally_(3).syncWait.value.should == 3; 134 ThrowingSender().finally_(3).syncWait.value.should == 3; 135 ThrowingSender().finally_(() => 4).syncWait.value.should == 4; 136 ThrowingSender().finally_(3).syncWait.value.should == 3; 137 DoneSender().finally_(() => 4).syncWait.isCancelled.should == true; 138 DoneSender().finally_(3).syncWait.isCancelled.should == true; 139 } 140 141 @("whenAll") 142 unittest { 143 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 144 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 145 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 146 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 147 whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true; 148 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 149 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 150 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 151 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 152 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 153 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 154 155 } 156 157 @("whenAll.cancel") 158 unittest { 159 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 160 while (!token.isStopRequested) { Thread.yield(); } 161 }); 162 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 163 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 164 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 165 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 166 while (!token.isStopRequested) { Thread.yield(); } 167 return 42; 168 }); 169 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 170 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 171 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 172 } 173 174 @("whenAll.stop") 175 unittest { 176 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 177 while (!token.isStopRequested) { Thread.yield(); } 178 }); 179 auto source = new StopSource(); 180 auto stopper = just(source).then((StopSource source) shared => source.stop()); 181 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 182 } 183 184 @("retry") 185 unittest { 186 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 187 int t = 3; 188 int n = 0; 189 struct Sender { 190 alias Value = void; 191 static struct Op(Receiver) { 192 Receiver receiver; 193 bool fail; 194 void start() @safe nothrow { 195 if (fail) 196 receiver.setError(new Exception("Fail fail fail")); 197 else 198 receiver.setValue(); 199 } 200 } 201 auto connect(Receiver)(return Receiver receiver) @safe scope return { 202 // ensure NRVO 203 auto op = Op!(Receiver)(receiver, n++ < t); 204 return op; 205 } 206 } 207 Sender().retry(Times(5)).syncWait.isOk.should == true; 208 n.should == 4; 209 n = 0; 210 211 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 212 n.should == 2; 213 shared int p = 0; 214 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 215 p.should == 5; 216 } 217 218 @("whenAll.oob") 219 unittest { 220 auto oob = OutOfBandValueSender!int(43); 221 auto value = ValueSender!int(11); 222 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 223 } 224 225 @("withStopToken.oob") 226 unittest { 227 auto oob = OutOfBandValueSender!int(44); 228 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 229 } 230 231 @("withStopSource.oob") 232 unittest { 233 auto oob = OutOfBandValueSender!int(45); 234 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 235 } 236 237 @("withStopSource.tuple") 238 unittest { 239 just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742; 240 } 241 242 @("value.withstoptoken.via.thread") 243 @safe unittest { 244 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 245 } 246 247 @("completewithcancellation") 248 @safe unittest { 249 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 250 } 251 252 @("raceAll") 253 @safe unittest { 254 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 255 while (!token.isStopRequested) { Thread.yield(); } 256 }); 257 raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 258 raceAll(waiting, just(42)).syncWait.value.should == 42; 259 raceAll(waiting, ThrowingSender()).syncWait.isError.should == true; 260 } 261 262 @("on.ManualTimeWorker") 263 @safe unittest { 264 import concurrency.scheduler : ManualTimeWorker; 265 266 auto worker = new shared ManualTimeWorker(); 267 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 268 worker.timeUntilNextEvent().should == 10.msecs; 269 worker.advance(5.msecs); 270 worker.timeUntilNextEvent().should == 5.msecs; 271 worker.advance(5.msecs); 272 worker.timeUntilNextEvent().should == null; 273 }); 274 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 275 276 whenAll(timer, driver).syncWait().isOk.should == true; 277 } 278 279 @("on.ManualTimeWorker.cancel") 280 @safe unittest { 281 import concurrency.scheduler : ManualTimeWorker; 282 283 auto worker = new shared ManualTimeWorker(); 284 auto source = new StopSource(); 285 auto driver = just(source).then((StopSource source) shared { 286 worker.timeUntilNextEvent().should == 10.msecs; 287 source.stop(); 288 worker.timeUntilNextEvent().should == null; 289 }); 290 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 291 292 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 293 } 294 295 @("then.stack.no-leak") 296 @safe unittest { 297 struct S { 298 void fun(int i) shared { 299 } 300 } 301 shared S s; 302 // its perfectly ok to point to a function on the stack 303 auto sender = just(42).then(&s.fun); 304 305 sender.syncWait(); 306 307 void disappearSender(Sender)(Sender s) @safe; 308 // but the sender can't leak now 309 static assert(!__traits(compiles, disappearSender(sender))); 310 } 311 312 @("forwardOn") 313 @safe unittest { 314 auto pool = stdTaskPool(2); 315 316 VoidSender().forwardOn(pool.getScheduler).syncWait.isOk.should == true; 317 ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true; 318 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true; 319 just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42; 320 } 321 322 @("toSingleton") 323 @safe unittest { 324 import std.typecons : tuple; 325 import concurrency.scheduler : ManualTimeWorker; 326 import core.atomic : atomicOp; 327 328 shared int g; 329 330 auto worker = new shared ManualTimeWorker(); 331 332 auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler); 333 334 auto driver = justFrom(() shared => worker.advance(2.msecs)); 335 336 whenAll(single, single, driver).syncWait.value.should == tuple(1,1); 337 whenAll(single, single, driver).syncWait.value.should == tuple(2,2); 338 } 339 340 @("stopOn") 341 @safe unittest { 342 auto sourceInner = new shared StopSource(); 343 auto sourceOuter = new shared StopSource(); 344 345 shared bool b; 346 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 347 just(() => sourceOuter.stop()) 348 ).syncWait(sourceOuter).assumeOk; 349 b.should == true; 350 351 shared bool d; 352 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 353 just(() => sourceInner.stop()) 354 ).syncWait(sourceOuter).assumeOk; 355 d.should == false; 356 } 357 358 @("withChild") 359 @safe unittest { 360 import core.atomic; 361 362 class State { 363 import core.sync.event : Event; 364 bool parentAfterChild; 365 Event childEvent, parentEvent; 366 this() shared @trusted { 367 (cast()childEvent).initialize(false, false); 368 (cast()parentEvent).initialize(false, false); 369 } 370 void signalChild() shared @trusted { 371 (cast()childEvent).set(); 372 } 373 void waitChild() shared @trusted { 374 (cast()childEvent).wait(); 375 } 376 void signalParent() shared @trusted { 377 (cast()parentEvent).set(); 378 } 379 void waitParent() shared @trusted { 380 (cast()parentEvent).wait(); 381 } 382 } 383 auto state = new shared State(); 384 auto source = new shared StopSource; 385 386 import std.stdio; 387 auto child = just(state).withStopToken((StopToken token, shared State state) @trusted { 388 while(!token.isStopRequested) {} 389 state.signalParent(); 390 state.waitChild(); 391 }).via(ThreadSender()); 392 393 auto parent = just(state).withStopToken((StopToken token, shared State state){ 394 state.waitParent(); 395 state.parentAfterChild.atomicStore(token.isStopRequested == false); 396 state.signalChild(); 397 }).via(ThreadSender()); 398 399 whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled; 400 401 state.parentAfterChild.atomicLoad.should == true; 402 } 403 404 @("onTermination.value") 405 @safe unittest { 406 import core.atomic : atomicOp; 407 shared int g = 0; 408 just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 409 g.should == 1; 410 } 411 412 @("onTermination.done") 413 @safe unittest { 414 import core.atomic : atomicOp; 415 shared int g = 0; 416 DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 417 g.should == 1; 418 } 419 420 @("onTermination.error") 421 @safe unittest { 422 import core.atomic : atomicOp; 423 shared int g = 0; 424 ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 425 g.should == 1; 426 } 427 428 @("onError.value") 429 @safe unittest { 430 import core.atomic : atomicOp; 431 shared int g = 0; 432 just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 433 g.should == 0; 434 } 435 436 @("onError.done") 437 @safe unittest { 438 import core.atomic : atomicOp; 439 shared int g = 0; 440 DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 441 g.should == 0; 442 } 443 444 @("onError.error") 445 @safe unittest { 446 import core.atomic : atomicOp; 447 shared int g = 0; 448 ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 449 g.should == 1; 450 } 451 452 @("onError.throw") 453 @safe unittest { 454 import core.exception : AssertError; 455 auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.error; 456 err.msg.should == "in onError"; 457 err.next.msg.should == "ThrowingSender"; 458 }