1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 @disable this(ref return scope typeof(this) rhs); 23 @disable this(this); 24 void run() { 25 receiver.setValue(value); 26 } 27 void start() @trusted scope { 28 auto value = new Thread(&this.run).start(); 29 } 30 } 31 auto connect(Receiver)(return Receiver receiver) @safe return scope { 32 // ensure NRVO 33 auto op = Op!(Receiver)(receiver, value); 34 return op; 35 } 36 } 37 38 @("ignoreErrors.syncWait.value") 39 @safe unittest { 40 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 41 ThreadSender() 42 .then(dg) 43 .ignoreError() 44 .syncWait.isCancelled.should == true; 45 } 46 47 @("oob") 48 @safe unittest { 49 auto oob = OutOfBandValueSender!int(43); 50 oob.syncWait.value.should == 43; 51 } 52 53 @("race") 54 @safe unittest { 55 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 56 auto fastThread = ThreadSender().then(() shared => 1); 57 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 58 race(fastThread, slowThread).syncWait.value.should == 1; 59 race(slowThread, fastThread).syncWait.value.should == 1; 60 } 61 62 @("race.multiple") 63 @safe unittest { 64 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 65 } 66 67 @("race.exception.single") 68 @safe unittest { 69 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 70 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 71 } 72 73 @("race.exception.double") 74 @safe unittest { 75 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 76 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 77 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 78 } 79 80 @("race.cancel-other") 81 @safe unittest { 82 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 83 while (!token.isStopRequested) { Thread.yield(); } 84 }); 85 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 86 } 87 88 @("race.cancel") 89 @safe unittest { 90 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 91 while (!token.isStopRequested) { Thread.yield(); } 92 }); 93 auto nursery = new shared Nursery(); 94 nursery.run(race(waiting, waiting)); 95 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 96 nursery.syncWait.isCancelled.should == true; 97 } 98 99 @("race.array.just") 100 @safe unittest { 101 race([just(4), just(5)]).syncWait.value.should == 4; 102 } 103 104 @("race.array.void") 105 @safe unittest { 106 race([VoidSender(), VoidSender()]).syncWait.assumeOk; 107 } 108 109 @("via") 110 @safe unittest { 111 import std.typecons : tuple; 112 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 113 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 114 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 115 } 116 117 @("then.value.delegate") 118 @safe unittest { 119 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 120 } 121 122 @("then.value.function") 123 @safe unittest { 124 ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9); 125 } 126 127 @("then.oob") 128 @safe unittest { 129 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 130 } 131 132 @("then.tuple") 133 @safe unittest { 134 just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1); 135 } 136 137 @("then.tuple.expand") 138 @safe unittest { 139 just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3); 140 } 141 142 @("whenAll.basic") 143 @safe unittest { 144 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 145 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 146 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 147 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 148 whenAll(VoidSender(), VoidSender()).syncWait.assumeOk; 149 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 150 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 151 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 152 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 153 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 154 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 155 156 } 157 158 @("whenAll.cancel") 159 @safe unittest { 160 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 161 while (!token.isStopRequested) { Thread.yield(); } 162 }); 163 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 164 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 165 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 166 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 167 while (!token.isStopRequested) { Thread.yield(); } 168 return 42; 169 }); 170 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 171 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 172 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 173 } 174 175 @("whenAll.stop") 176 @safe unittest { 177 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 178 while (!token.isStopRequested) { Thread.yield(); } 179 }); 180 auto source = new StopSource(); 181 auto stopper = just(source).then((StopSource source) shared => source.stop()); 182 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 183 } 184 185 @("whenAll.array.just") 186 @safe unittest { 187 whenAll([just(4), just(5)]).syncWait.value.should == [4,5]; 188 } 189 190 @("whenAll.array.void") 191 @safe unittest { 192 whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk; 193 } 194 195 @("retry") 196 @safe unittest { 197 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 198 int t = 3; 199 int n = 0; 200 struct Sender { 201 alias Value = void; 202 static struct Op(Receiver) { 203 Receiver receiver; 204 bool fail; 205 @disable this(ref return scope typeof(this) rhs); 206 @disable this(this); 207 void start() @safe nothrow { 208 if (fail) 209 receiver.setError(new Exception("Fail fail fail")); 210 else 211 receiver.setValue(); 212 } 213 } 214 auto connect(Receiver)(return Receiver receiver) @safe return scope { 215 // ensure NRVO 216 auto op = Op!(Receiver)(receiver, n++ < t); 217 return op; 218 } 219 } 220 Sender().retry(Times(5)).syncWait.assumeOk; 221 n.should == 4; 222 n = 0; 223 224 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 225 n.should == 2; 226 shared int p = 0; 227 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 228 p.should == 5; 229 } 230 231 @("retryWhen.immediate.success") 232 @safe unittest { 233 static struct Immediate { 234 auto failure(Exception e) { 235 return VoidSender(); 236 } 237 } 238 239 VoidSender().retryWhen(Immediate()).syncWait.assumeOk; 240 } 241 242 struct ConnectCounter { 243 alias Value = int; 244 int counter = 0; 245 auto connect(Receiver)(return Receiver receiver) @safe return scope { 246 // ensure NRVO 247 auto op = ValueSender!int(counter++).connect(receiver); 248 return op; 249 } 250 } 251 252 @("retryWhen.immediate.retries") 253 @safe unittest { 254 static struct Immediate { 255 auto failure(Exception e) { 256 return VoidSender(); 257 } 258 } 259 ConnectCounter() 260 .then((int c) { if (c < 3) throw new Exception("jada"); return c; }) 261 .retryWhen(Immediate()) 262 .syncWait.value.should == 3; 263 } 264 265 @("retryWhen.wait.retries") 266 @safe unittest { 267 import core.time : msecs; 268 import concurrency.scheduler : ManualTimeWorker; 269 270 static struct Wait { 271 auto failure(Exception e) @safe { 272 return delay(3.msecs); 273 } 274 } 275 276 auto worker = new shared ManualTimeWorker(); 277 auto sender = ConnectCounter() 278 .then((int c) { if (c < 3) throw new Exception("jada"); return c; }) 279 .retryWhen(Wait()) 280 .withScheduler(worker.getScheduler); 281 282 auto driver = just(worker).then((shared ManualTimeWorker worker) { 283 worker.timeUntilNextEvent().should == 3.msecs; 284 worker.advance(3.msecs); 285 worker.timeUntilNextEvent().should == 3.msecs; 286 worker.advance(3.msecs); 287 worker.timeUntilNextEvent().should == 3.msecs; 288 worker.advance(3.msecs); 289 worker.timeUntilNextEvent().should == null; 290 }); 291 292 whenAll(sender, driver).syncWait.value.should == 3; 293 } 294 295 @("retryWhen.throw") 296 @safe unittest { 297 static struct Throw { 298 auto failure(Exception t) @safe { 299 return ErrorSender(new Exception("inner")); 300 } 301 } 302 303 ErrorSender(new Exception("outer")).retryWhen(Throw()).syncWait.assumeOk.shouldThrowWithMessage("inner"); 304 } 305 306 @("whenAll.oob") 307 @safe unittest { 308 auto oob = OutOfBandValueSender!int(43); 309 auto value = ValueSender!int(11); 310 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 311 } 312 313 @("withStopToken.oob") 314 @safe unittest { 315 auto oob = OutOfBandValueSender!int(44); 316 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 317 } 318 319 @("withStopSource.oob") 320 @safe unittest { 321 auto oob = OutOfBandValueSender!int(45); 322 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 323 } 324 325 @("withStopSource.tuple") 326 @safe unittest { 327 just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742; 328 } 329 330 @("value.withstoptoken.via.thread") 331 @safe unittest { 332 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 333 } 334 335 @("completewithcancellation") 336 @safe unittest { 337 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 338 } 339 340 @("raceAll") 341 @safe unittest { 342 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 343 while (!token.isStopRequested) { Thread.yield(); } 344 }); 345 raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 346 raceAll(waiting, just(42)).syncWait.value.should == 42; 347 raceAll(waiting, ThrowingSender()).syncWait.isError.should == true; 348 } 349 350 @("on.ManualTimeWorker") 351 @safe unittest { 352 import concurrency.scheduler : ManualTimeWorker; 353 354 auto worker = new shared ManualTimeWorker(); 355 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 356 worker.timeUntilNextEvent().should == 10.msecs; 357 worker.advance(5.msecs); 358 worker.timeUntilNextEvent().should == 5.msecs; 359 worker.advance(5.msecs); 360 worker.timeUntilNextEvent().should == null; 361 }); 362 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 363 364 whenAll(timer, driver).syncWait().assumeOk; 365 } 366 367 @("on.ManualTimeWorker.cancel") 368 @safe unittest { 369 import concurrency.scheduler : ManualTimeWorker; 370 371 auto worker = new shared ManualTimeWorker(); 372 auto source = new StopSource(); 373 auto driver = just(source).then((StopSource source) shared { 374 worker.timeUntilNextEvent().should == 10.msecs; 375 source.stop(); 376 worker.timeUntilNextEvent().should == null; 377 }); 378 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 379 380 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 381 } 382 383 @("then.stack.no-leak") 384 @safe unittest { 385 struct S { 386 void fun(int i) shared { 387 } 388 } 389 shared S s; 390 // its perfectly ok to point to a function on the stack 391 auto sender = just(42).then(&s.fun); 392 393 sender.syncWait(); 394 395 void disappearSender(Sender)(Sender s) @safe; 396 // but the sender can't leak now 397 static assert(!__traits(compiles, disappearSender(sender))); 398 } 399 400 @("forwardOn") 401 @safe unittest { 402 auto pool = stdTaskPool(2); 403 404 VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk; 405 ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true; 406 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true; 407 just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42; 408 } 409 410 @("toSingleton") 411 @safe unittest { 412 import std.typecons : tuple; 413 import concurrency.scheduler : ManualTimeWorker; 414 import core.atomic : atomicOp; 415 416 shared int g; 417 418 auto worker = new shared ManualTimeWorker(); 419 420 auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler); 421 422 auto driver = justFrom(() shared => worker.advance(2.msecs)); 423 424 whenAll(single, single, driver).syncWait.value.should == tuple(1,1); 425 whenAll(single, single, driver).syncWait.value.should == tuple(2,2); 426 } 427 428 @("stopOn") 429 @safe unittest { 430 auto sourceInner = new shared StopSource(); 431 auto sourceOuter = new shared StopSource(); 432 433 shared bool b; 434 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 435 just(() => sourceOuter.stop()) 436 ).syncWait(sourceOuter).assumeOk; 437 b.should == true; 438 439 shared bool d; 440 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 441 just(() => sourceInner.stop()) 442 ).syncWait(sourceOuter).assumeOk; 443 d.should == false; 444 } 445 446 @("withChild") 447 @safe unittest { 448 import core.atomic; 449 450 class State { 451 import core.sync.event : Event; 452 bool parentAfterChild; 453 Event childEvent, parentEvent; 454 this() shared @trusted { 455 (cast()childEvent).initialize(false, false); 456 (cast()parentEvent).initialize(false, false); 457 } 458 void signalChild() shared @trusted { 459 (cast()childEvent).set(); 460 } 461 void waitChild() shared @trusted { 462 (cast()childEvent).wait(); 463 } 464 void signalParent() shared @trusted { 465 (cast()parentEvent).set(); 466 } 467 void waitParent() shared @trusted { 468 (cast()parentEvent).wait(); 469 } 470 } 471 auto state = new shared State(); 472 auto source = new shared StopSource; 473 474 import std.stdio; 475 auto child = just(state).withStopToken((StopToken token, shared State state) @trusted { 476 while(!token.isStopRequested) {} 477 state.signalParent(); 478 state.waitChild(); 479 }).via(ThreadSender()); 480 481 auto parent = just(state).withStopToken((StopToken token, shared State state){ 482 state.waitParent(); 483 state.parentAfterChild.atomicStore(token.isStopRequested == false); 484 state.signalChild(); 485 }).via(ThreadSender()); 486 487 whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled.should == true; 488 489 state.parentAfterChild.atomicLoad.should == true; 490 } 491 492 @("onTermination.value") 493 @safe unittest { 494 import core.atomic : atomicOp; 495 shared int g = 0; 496 just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 497 g.should == 1; 498 } 499 500 @("onTermination.done") 501 @safe unittest { 502 import core.atomic : atomicOp; 503 shared int g = 0; 504 DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 505 g.should == 1; 506 } 507 508 @("onTermination.error") 509 @safe unittest { 510 import core.atomic : atomicOp; 511 shared int g = 0; 512 ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 513 g.should == 1; 514 } 515 516 @("onError.value") 517 @safe unittest { 518 import core.atomic : atomicOp; 519 shared int g = 0; 520 just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 521 g.should == 0; 522 } 523 524 @("onError.done") 525 @safe unittest { 526 import core.atomic : atomicOp; 527 shared int g = 0; 528 DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 529 g.should == 0; 530 } 531 532 @("onError.error") 533 @safe unittest { 534 import core.atomic : atomicOp; 535 shared int g = 0; 536 ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 537 g.should == 1; 538 } 539 540 @("onError.throw") 541 @safe unittest { 542 import core.exception : AssertError; 543 auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.get!Exception; 544 err.msg.should == "in onError"; 545 err.next.msg.should == "ThrowingSender"; 546 } 547 548 @("stopWhen.source.value") 549 @safe unittest { 550 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 551 while (!token.isStopRequested) { Thread.yield(); } 552 return 43; 553 }); 554 auto trigger = delay(100.msecs); 555 waiting.stopWhen(trigger).syncWait().value.should == 43; 556 } 557 558 @("stopWhen.source.error") 559 @safe unittest { 560 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 561 while (!token.isStopRequested) { Thread.yield(); } 562 throw new Exception("Upside down"); 563 }); 564 auto trigger = delay(100.msecs); 565 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down"); 566 } 567 568 @("stopWhen.source.cancelled") 569 @safe unittest { 570 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 571 while (!token.isStopRequested) { Thread.yield(); } 572 }).completeWithCancellation; 573 auto trigger = delay(100.msecs); 574 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 575 } 576 577 @("stopWhen.trigger.error") 578 @safe unittest { 579 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 580 while (!token.isStopRequested) { Thread.yield(); } 581 throw new Exception("This occurres later, so the other one gets propagated"); 582 }); 583 auto trigger = ThrowingSender(); 584 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender"); 585 } 586 587 @("stopWhen.trigger.cancelled.value") 588 @safe unittest { 589 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 590 while (!token.isStopRequested) { Thread.yield(); } 591 return 42; 592 }); 593 auto trigger = delay(100.msecs).completeWithCancellation; 594 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 595 } 596 597 @("completewitherror.basic") 598 @safe unittest { 599 ValueSender!void().completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage("hello"); 600 } 601 602 @("completewitherror.exception.base") 603 @safe unittest { 604 ErrorSender(new Exception("not you")).completeWithError(new Exception("overridden")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("overridden"); 605 } 606 607 @("completewitherror.throwable.base") 608 @safe unittest { 609 ErrorSender(new Throwable("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Throwable("precedence"); 610 } 611 612 @("completewitherror.error.base") 613 @safe unittest { 614 ErrorSender(new Error("precedence")).completeWithError(new Exception("hello")).syncWait.assumeOk.shouldThrowWithMessage!Error("precedence"); 615 } 616 617 @("onCompletion.value") 618 @safe unittest { 619 import core.atomic : atomicOp; 620 shared int g = 0; 621 just(42).onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 622 g.should == 1; 623 } 624 625 @("onCompletion.done") 626 @safe unittest { 627 import core.atomic : atomicOp; 628 shared int g = 0; 629 DoneSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 630 g.should == 1; 631 } 632 633 @("onCompletion.error") 634 @safe unittest { 635 import core.atomic : atomicOp; 636 shared int g = 0; 637 ThrowingSender().onCompletion(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 638 g.should == 0; 639 } 640 641 @("onResult.value") 642 @safe unittest { 643 import core.atomic : atomicOp; 644 shared int g = 0; 645 just(42).onResult((Result!int r) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 646 just(42).tee((Result!int r) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 647 g.should == 2; 648 } 649 650 @("onResult.done") 651 @safe unittest { 652 import core.atomic : atomicOp; 653 shared int g = 0; 654 DoneSender().onResult((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 655 DoneSender().tee((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 656 g.should == 2; 657 } 658 659 @("onResult.error") 660 @safe unittest { 661 import core.atomic : atomicOp; 662 shared int g = 0; 663 ThrowingSender().onResult((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 664 ThrowingSender().tee((Result!void r) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 665 g.should == 2; 666 } 667 668 @("repeat.race") 669 @safe unittest { 670 import core.atomic : atomicOp; 671 import concurrency.scheduler : ManualTimeWorker; 672 shared int p = 0; 673 674 auto worker = new shared ManualTimeWorker(); 675 676 auto base = delay(1.msecs).then(() shared => cast(void)p.atomicOp!"+="(1)).repeat(); 677 678 auto driver = just(worker).then((shared ManualTimeWorker worker) { 679 worker.timeUntilNextEvent().should == 1.msecs; 680 worker.advance(1.msecs); 681 worker.timeUntilNextEvent().should == 1.msecs; 682 worker.advance(1.msecs); 683 worker.timeUntilNextEvent().should == 1.msecs; 684 }); 685 686 race(base, driver).withScheduler(worker.getScheduler).syncWait().assumeOk; 687 p.should == 2; 688 } 689 690 @("repeat.error") 691 @safe unittest { 692 static struct CountdownOp(Receiver) { 693 Receiver receiver; 694 bool fail; 695 @disable this(ref return scope typeof(this) rhs); 696 @disable this(this); 697 void start() @safe nothrow { 698 if (fail) 699 receiver.setError(new Exception("Bye!")); 700 else 701 receiver.setValueOrError(); 702 } 703 } 704 705 static struct Countdown { 706 alias Value = void; 707 int countdown; 708 auto connect(Receiver)(return Receiver receiver) @safe return scope { 709 // ensure NRVO 710 auto op = CountdownOp!(Receiver)(receiver, countdown-- == 0); 711 return op; 712 } 713 } 714 715 Countdown(3).syncWait().assumeOk(); 716 Countdown(3).repeat().syncWait().isError.should == true; 717 }