1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 void run() { 23 receiver.setValue(value); 24 } 25 void start() @trusted scope { 26 auto value = new Thread(&this.run).start(); 27 } 28 } 29 auto connect(Receiver)(return Receiver receiver) @safe scope return { 30 // ensure NRVO 31 auto op = Op!(Receiver)(receiver, value); 32 return op; 33 } 34 } 35 36 @("ignoreErrors.syncWait.value") 37 @safe unittest { 38 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 39 ThreadSender() 40 .then(dg) 41 .ignoreError() 42 .syncWait.isCancelled.should == true; 43 } 44 45 @("oob") 46 unittest { 47 auto oob = OutOfBandValueSender!int(43); 48 oob.syncWait.value.should == 43; 49 } 50 51 @("race") 52 unittest { 53 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 54 auto fastThread = ThreadSender().then(() shared => 1); 55 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 56 race(fastThread, slowThread).syncWait.value.should == 1; 57 race(slowThread, fastThread).syncWait.value.should == 1; 58 } 59 60 @("race.multiple") 61 unittest { 62 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 63 } 64 65 @("race.exception.single") 66 unittest { 67 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 68 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 69 } 70 71 @("race.exception.double") 72 unittest { 73 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 74 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 75 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 76 } 77 78 @("race.cancel-other") 79 unittest { 80 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 81 while (!token.isStopRequested) { Thread.yield(); } 82 }); 83 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 84 } 85 86 @("race.cancel") 87 unittest { 88 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 89 while (!token.isStopRequested) { Thread.yield(); } 90 }); 91 auto nursery = new shared Nursery(); 92 nursery.run(race(waiting, waiting)); 93 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 94 nursery.syncWait.isCancelled.should == true; 95 } 96 97 @("race.array.just") 98 @safe unittest { 99 race([just(4), just(5)]).syncWait.value.should == 4; 100 } 101 102 @("race.array.void") 103 @safe unittest { 104 race([VoidSender(), VoidSender()]).syncWait.assumeOk; 105 } 106 107 @("via") 108 unittest { 109 import std.typecons : tuple; 110 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 111 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 112 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 113 } 114 115 @("then.value.delegate") 116 @safe unittest { 117 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 118 } 119 120 @("then.value.function") 121 @safe unittest { 122 ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9); 123 } 124 125 @("then.oob") 126 @safe unittest { 127 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 128 } 129 130 @("then.tuple") 131 @safe unittest { 132 just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1); 133 } 134 135 @("then.tuple.expand") 136 @safe unittest { 137 just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3); 138 } 139 140 @("finally") 141 unittest { 142 ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4; 143 ValueSender!int(2).finally_(3).syncWait.value.should == 3; 144 ThrowingSender().finally_(3).syncWait.value.should == 3; 145 ThrowingSender().finally_(() => 4).syncWait.value.should == 4; 146 ThrowingSender().finally_(3).syncWait.value.should == 3; 147 DoneSender().finally_(() => 4).syncWait.isCancelled.should == true; 148 DoneSender().finally_(3).syncWait.isCancelled.should == true; 149 } 150 151 @("whenAll.basic") 152 unittest { 153 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 154 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 155 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 156 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 157 whenAll(VoidSender(), VoidSender()).syncWait.assumeOk; 158 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 159 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 160 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 161 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 162 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 163 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 164 165 } 166 167 @("whenAll.cancel") 168 unittest { 169 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 170 while (!token.isStopRequested) { Thread.yield(); } 171 }); 172 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 173 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 174 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 175 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 176 while (!token.isStopRequested) { Thread.yield(); } 177 return 42; 178 }); 179 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 180 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 181 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 182 } 183 184 @("whenAll.stop") 185 unittest { 186 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 187 while (!token.isStopRequested) { Thread.yield(); } 188 }); 189 auto source = new StopSource(); 190 auto stopper = just(source).then((StopSource source) shared => source.stop()); 191 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 192 } 193 194 @("whenAll.array.just") 195 unittest { 196 whenAll([just(4), just(5)]).syncWait.value.should == [4,5]; 197 } 198 199 @("whenAll.array.void") 200 unittest { 201 whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk; 202 } 203 204 @("retry") 205 unittest { 206 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 207 int t = 3; 208 int n = 0; 209 struct Sender { 210 alias Value = void; 211 static struct Op(Receiver) { 212 Receiver receiver; 213 bool fail; 214 void start() @safe nothrow { 215 if (fail) 216 receiver.setError(new Exception("Fail fail fail")); 217 else 218 receiver.setValue(); 219 } 220 } 221 auto connect(Receiver)(return Receiver receiver) @safe scope return { 222 // ensure NRVO 223 auto op = Op!(Receiver)(receiver, n++ < t); 224 return op; 225 } 226 } 227 Sender().retry(Times(5)).syncWait.assumeOk; 228 n.should == 4; 229 n = 0; 230 231 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 232 n.should == 2; 233 shared int p = 0; 234 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 235 p.should == 5; 236 } 237 238 @("retryWhen.immediate.success") 239 unittest { 240 static struct Immediate { 241 auto failure(Exception e) { 242 return VoidSender(); 243 } 244 } 245 246 VoidSender().retryWhen(Immediate()).syncWait.assumeOk; 247 } 248 249 struct ConnectCounter { 250 alias Value = int; 251 int counter = 0; 252 auto connect(Receiver)(return Receiver receiver) @trusted scope return { 253 // ensure NRVO 254 auto op = ValueSender!int(counter++).connect(receiver); 255 return op; 256 } 257 } 258 259 @("retryWhen.immediate.retries") 260 unittest { 261 static struct Immediate { 262 auto failure(Exception e) { 263 return VoidSender(); 264 } 265 } 266 ConnectCounter() 267 .then((int c) { if (c < 3) throw new Exception("jada"); return c; }) 268 .retryWhen(Immediate()) 269 .syncWait.value.should == 3; 270 } 271 272 @("retryWhen.wait.retries") 273 unittest { 274 import core.time : msecs; 275 import concurrency.scheduler : ManualTimeWorker; 276 277 static struct Wait { 278 auto failure(Exception e) @safe { 279 return delay(3.msecs); 280 } 281 } 282 283 auto worker = new shared ManualTimeWorker(); 284 auto sender = ConnectCounter() 285 .then((int c) { if (c < 3) throw new Exception("jada"); return c; }) 286 .retryWhen(Wait()) 287 .withScheduler(worker.getScheduler); 288 289 auto driver = just(worker).then((shared ManualTimeWorker worker) { 290 worker.timeUntilNextEvent().should == 3.msecs; 291 worker.advance(3.msecs); 292 worker.timeUntilNextEvent().should == 3.msecs; 293 worker.advance(3.msecs); 294 worker.timeUntilNextEvent().should == 3.msecs; 295 worker.advance(3.msecs); 296 worker.timeUntilNextEvent().should == null; 297 }); 298 299 whenAll(sender, driver).syncWait.value.should == 3; 300 } 301 302 @("retryWhen.throw") 303 unittest { 304 static struct Throw { 305 auto failure(Exception t) @safe { 306 return ErrorSender(new Exception("inner")); 307 } 308 } 309 310 ErrorSender(new Exception("outer")).retryWhen(Throw()).syncWait.assumeOk.shouldThrowWithMessage("inner"); 311 } 312 313 @("whenAll.oob") 314 unittest { 315 auto oob = OutOfBandValueSender!int(43); 316 auto value = ValueSender!int(11); 317 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 318 } 319 320 @("withStopToken.oob") 321 unittest { 322 auto oob = OutOfBandValueSender!int(44); 323 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 324 } 325 326 @("withStopSource.oob") 327 unittest { 328 auto oob = OutOfBandValueSender!int(45); 329 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 330 } 331 332 @("withStopSource.tuple") 333 unittest { 334 just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742; 335 } 336 337 @("value.withstoptoken.via.thread") 338 @safe unittest { 339 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 340 } 341 342 @("completewithcancellation") 343 @safe unittest { 344 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 345 } 346 347 @("raceAll") 348 @safe unittest { 349 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 350 while (!token.isStopRequested) { Thread.yield(); } 351 }); 352 raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 353 raceAll(waiting, just(42)).syncWait.value.should == 42; 354 raceAll(waiting, ThrowingSender()).syncWait.isError.should == true; 355 } 356 357 @("on.ManualTimeWorker") 358 @safe unittest { 359 import concurrency.scheduler : ManualTimeWorker; 360 361 auto worker = new shared ManualTimeWorker(); 362 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 363 worker.timeUntilNextEvent().should == 10.msecs; 364 worker.advance(5.msecs); 365 worker.timeUntilNextEvent().should == 5.msecs; 366 worker.advance(5.msecs); 367 worker.timeUntilNextEvent().should == null; 368 }); 369 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 370 371 whenAll(timer, driver).syncWait().assumeOk; 372 } 373 374 @("on.ManualTimeWorker.cancel") 375 @safe unittest { 376 import concurrency.scheduler : ManualTimeWorker; 377 378 auto worker = new shared ManualTimeWorker(); 379 auto source = new StopSource(); 380 auto driver = just(source).then((StopSource source) shared { 381 worker.timeUntilNextEvent().should == 10.msecs; 382 source.stop(); 383 worker.timeUntilNextEvent().should == null; 384 }); 385 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 386 387 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 388 } 389 390 @("then.stack.no-leak") 391 @safe unittest { 392 struct S { 393 void fun(int i) shared { 394 } 395 } 396 shared S s; 397 // its perfectly ok to point to a function on the stack 398 auto sender = just(42).then(&s.fun); 399 400 sender.syncWait(); 401 402 void disappearSender(Sender)(Sender s) @safe; 403 // but the sender can't leak now 404 static assert(!__traits(compiles, disappearSender(sender))); 405 } 406 407 @("forwardOn") 408 @safe unittest { 409 auto pool = stdTaskPool(2); 410 411 VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk; 412 ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true; 413 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true; 414 just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42; 415 } 416 417 @("toSingleton") 418 @safe unittest { 419 import std.typecons : tuple; 420 import concurrency.scheduler : ManualTimeWorker; 421 import core.atomic : atomicOp; 422 423 shared int g; 424 425 auto worker = new shared ManualTimeWorker(); 426 427 auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler); 428 429 auto driver = justFrom(() shared => worker.advance(2.msecs)); 430 431 whenAll(single, single, driver).syncWait.value.should == tuple(1,1); 432 whenAll(single, single, driver).syncWait.value.should == tuple(2,2); 433 } 434 435 @("stopOn") 436 @safe unittest { 437 auto sourceInner = new shared StopSource(); 438 auto sourceOuter = new shared StopSource(); 439 440 shared bool b; 441 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 442 just(() => sourceOuter.stop()) 443 ).syncWait(sourceOuter).assumeOk; 444 b.should == true; 445 446 shared bool d; 447 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 448 just(() => sourceInner.stop()) 449 ).syncWait(sourceOuter).assumeOk; 450 d.should == false; 451 } 452 453 @("withChild") 454 @safe unittest { 455 import core.atomic; 456 457 class State { 458 import core.sync.event : Event; 459 bool parentAfterChild; 460 Event childEvent, parentEvent; 461 this() shared @trusted { 462 (cast()childEvent).initialize(false, false); 463 (cast()parentEvent).initialize(false, false); 464 } 465 void signalChild() shared @trusted { 466 (cast()childEvent).set(); 467 } 468 void waitChild() shared @trusted { 469 (cast()childEvent).wait(); 470 } 471 void signalParent() shared @trusted { 472 (cast()parentEvent).set(); 473 } 474 void waitParent() shared @trusted { 475 (cast()parentEvent).wait(); 476 } 477 } 478 auto state = new shared State(); 479 auto source = new shared StopSource; 480 481 import std.stdio; 482 auto child = just(state).withStopToken((StopToken token, shared State state) @trusted { 483 while(!token.isStopRequested) {} 484 state.signalParent(); 485 state.waitChild(); 486 }).via(ThreadSender()); 487 488 auto parent = just(state).withStopToken((StopToken token, shared State state){ 489 state.waitParent(); 490 state.parentAfterChild.atomicStore(token.isStopRequested == false); 491 state.signalChild(); 492 }).via(ThreadSender()); 493 494 whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled.should == true; 495 496 state.parentAfterChild.atomicLoad.should == true; 497 } 498 499 @("onTermination.value") 500 @safe unittest { 501 import core.atomic : atomicOp; 502 shared int g = 0; 503 just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 504 g.should == 1; 505 } 506 507 @("onTermination.done") 508 @safe unittest { 509 import core.atomic : atomicOp; 510 shared int g = 0; 511 DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 512 g.should == 1; 513 } 514 515 @("onTermination.error") 516 @safe unittest { 517 import core.atomic : atomicOp; 518 shared int g = 0; 519 ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 520 g.should == 1; 521 } 522 523 @("onError.value") 524 @safe unittest { 525 import core.atomic : atomicOp; 526 shared int g = 0; 527 just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 528 g.should == 0; 529 } 530 531 @("onError.done") 532 @safe unittest { 533 import core.atomic : atomicOp; 534 shared int g = 0; 535 DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 536 g.should == 0; 537 } 538 539 @("onError.error") 540 @safe unittest { 541 import core.atomic : atomicOp; 542 shared int g = 0; 543 ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 544 g.should == 1; 545 } 546 547 @("onError.throw") 548 @safe unittest { 549 import core.exception : AssertError; 550 auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.get!Exception; 551 err.msg.should == "in onError"; 552 err.next.msg.should == "ThrowingSender"; 553 } 554 555 @("stopWhen.source.value") 556 @safe unittest { 557 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 558 while (!token.isStopRequested) { Thread.yield(); } 559 return 43; 560 }); 561 auto trigger = delay(100.msecs); 562 waiting.stopWhen(trigger).syncWait().value.should == 43; 563 } 564 565 @("stopWhen.source.error") 566 @safe unittest { 567 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 568 while (!token.isStopRequested) { Thread.yield(); } 569 throw new Exception("Upside down"); 570 }); 571 auto trigger = delay(100.msecs); 572 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down"); 573 } 574 575 @("stopWhen.source.cancelled") 576 @safe unittest { 577 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 578 while (!token.isStopRequested) { Thread.yield(); } 579 }).completeWithCancellation; 580 auto trigger = delay(100.msecs); 581 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 582 } 583 584 @("stopWhen.trigger.error") 585 @safe unittest { 586 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 587 while (!token.isStopRequested) { Thread.yield(); } 588 throw new Exception("This occurres later, so the other one gets propagated"); 589 }); 590 auto trigger = ThrowingSender(); 591 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender"); 592 } 593 594 @("stopWhen.trigger.cancelled.value") 595 @safe unittest { 596 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 597 while (!token.isStopRequested) { Thread.yield(); } 598 return 42; 599 }); 600 auto trigger = delay(100.msecs).completeWithCancellation; 601 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 602 } 603 604 @("completewitherror.basic") 605 @safe unittest { 606 ValueSender!void().completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage("hello"); 607 } 608 609 @("completewitherror.exception.base") 610 @safe unittest { 611 ErrorSender(new Exception("not you")).completeWithError(new Exception("overridden")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("overridden"); 612 } 613 614 @("completewitherror.throwable.base") 615 @safe unittest { 616 ErrorSender(new Throwable("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("precedence"); 617 } 618 619 @("completewitherror.error.base") 620 @safe unittest { 621 ErrorSender(new Error("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Error("precedence"); 622 } 623 624 @("onCompletion.value") 625 @safe unittest { 626 import core.atomic : atomicOp; 627 shared int g = 0; 628 just(42).onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 629 g.should == 1; 630 } 631 632 @("onCompletion.done") 633 @safe unittest { 634 import core.atomic : atomicOp; 635 shared int g = 0; 636 DoneSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 637 g.should == 1; 638 } 639 640 @("onCompletion.error") 641 @safe unittest { 642 import core.atomic : atomicOp; 643 shared int g = 0; 644 ThrowingSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 645 g.should == 0; 646 }