1 module ut.concurrency.stream; 2 3 import concurrency.stream; 4 import concurrency; 5 import unit_threaded; 6 import concurrency.stoptoken; 7 import core.atomic; 8 import concurrency.thread : ThreadSender; 9 10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. 11 12 @("arrayStream") 13 @safe unittest { 14 shared int p = 0; 15 [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 16 p.should == 6; 17 } 18 19 @("intervalStream") 20 @safe unittest { 21 import core.time; 22 import concurrency.operations : withScheduler, whenAll; 23 import concurrency.sender : justFrom; 24 import concurrency.scheduler : ManualTimeWorker; 25 26 auto worker = new shared ManualTimeWorker(); 27 auto interval = 5.msecs.intervalStream() 28 .take(2) 29 .collect(() shared {}) 30 .withScheduler(worker.getScheduler); 31 32 auto driver = justFrom(() shared { 33 worker.timeUntilNextEvent().should == 5.msecs; 34 worker.advance(5.msecs); 35 36 worker.timeUntilNextEvent().should == 5.msecs; 37 worker.advance(5.msecs); 38 39 worker.timeUntilNextEvent().should == null; 40 }); 41 42 whenAll(interval, driver).syncWait().assumeOk; 43 } 44 45 46 @("infiniteStream.stop") 47 @safe unittest { 48 import concurrency.operations : withStopSource; 49 shared int g = 0; 50 auto source = new shared StopSource(); 51 infiniteStream(5).collect((int n) shared { 52 if (g < 14) 53 g.atomicOp!"+="(n); 54 else 55 source.stop(); 56 }) 57 .withStopSource(source).syncWait.isCancelled.should == true; 58 g.should == 15; 59 }; 60 61 @("infiniteStream.take") 62 @safe unittest { 63 shared int g = 0; 64 infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 65 g.should == 20; 66 } 67 68 @("iotaStream") 69 @safe unittest { 70 import concurrency.stoptoken; 71 shared int g = 0; 72 iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 73 g.should == 10; 74 } 75 76 @("loopStream") 77 @safe unittest { 78 struct Loop { 79 size_t b,e; 80 void loop(DG, StopToken)(DG emit, StopToken stopToken) { 81 foreach(i; b..e) 82 emit(i); 83 } 84 } 85 shared int g = 0; 86 Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 87 g.should == 6; 88 } 89 90 @("toStreamObject") 91 @safe unittest { 92 import core.atomic : atomicOp; 93 94 static StreamObjectBase!int getStream() { 95 return [1,2,3].arrayStream().toStreamObject(); 96 } 97 shared int p; 98 99 getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().assumeOk; 100 101 p.should == 6; 102 } 103 104 105 @("toStreamObject.take") 106 @safe unittest { 107 static StreamObjectBase!int getStream() { 108 return [1,2,3].arrayStream().toStreamObject(); 109 } 110 shared int p; 111 112 getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().assumeOk; 113 114 p.should == 3; 115 } 116 117 @("toStreamObject.void") 118 @safe unittest { 119 import core.time : msecs; 120 shared bool p = false; 121 122 1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().assumeOk; 123 124 p.should == true; 125 } 126 127 @("transform.int.double") 128 @safe unittest { 129 shared int p = 0; 130 [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 131 p.should == 18; 132 } 133 134 @("transform.int.bool") 135 @safe unittest { 136 shared int p = 0; 137 [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().assumeOk; 138 p.should == 1; 139 } 140 141 @("scan") 142 @safe unittest { 143 shared int p = 0; 144 [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 145 p.should == 10; 146 } 147 148 @("scan.void-value") 149 @safe unittest { 150 import core.time; 151 shared int p = 0; 152 5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 153 p.should == 6; 154 } 155 156 @("take.enough") 157 @safe unittest { 158 shared int p = 0; 159 160 [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk; 161 p.should == 3; 162 } 163 164 @("take.too-few") 165 @safe unittest { 166 shared int p = 0; 167 168 [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk; 169 p.should == 6; 170 } 171 172 @("take.donestream") 173 @safe unittest { 174 doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true; 175 } 176 177 @("take.errorstream") 178 @safe unittest { 179 errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad"); 180 } 181 182 @("sample.trigger.stop") 183 @safe unittest { 184 import core.time; 185 7.msecs.intervalStream() 186 .scan((int acc) => acc+1, 0) 187 .sample(10.msecs.intervalStream().take(3)) 188 .collect((int i) shared {}) 189 .syncWait().assumeOk; 190 } 191 192 @("sample.slower") 193 @safe unittest { 194 import core.time; 195 import concurrency.operations : withScheduler, whenAll; 196 import concurrency.sender : justFrom; 197 198 shared int p = 0; 199 import concurrency.scheduler : ManualTimeWorker; 200 201 auto worker = new shared ManualTimeWorker(); 202 203 auto sampler = 7.msecs 204 .intervalStream() 205 .scan((int acc) => acc+1, 0) 206 .sample(10.msecs.intervalStream()) 207 .take(3) 208 .collect((int i) shared { p.atomicOp!"+="(i); }) 209 .withScheduler(worker.getScheduler); 210 211 auto driver = justFrom(() shared { 212 worker.advance(7.msecs); 213 p.atomicLoad.should == 0; 214 worker.timeUntilNextEvent().should == 3.msecs; 215 216 worker.advance(3.msecs); 217 p.atomicLoad.should == 1; 218 worker.timeUntilNextEvent().should == 4.msecs; 219 220 worker.advance(4.msecs); 221 p.atomicLoad.should == 1; 222 worker.timeUntilNextEvent().should == 6.msecs; 223 224 worker.advance(6.msecs); 225 p.atomicLoad.should == 3; 226 worker.timeUntilNextEvent().should == 1.msecs; 227 228 worker.advance(1.msecs); 229 p.atomicLoad.should == 3; 230 worker.timeUntilNextEvent().should == 7.msecs; 231 232 worker.advance(7.msecs); 233 p.atomicLoad.should == 3; 234 worker.timeUntilNextEvent().should == 2.msecs; 235 236 worker.advance(2.msecs); 237 p.atomicLoad.should == 7; 238 worker.timeUntilNextEvent().should == null; 239 }); 240 241 whenAll(sampler, driver).syncWait().assumeOk; 242 243 p.should == 7; 244 } 245 246 @("sample.faster") 247 @safe unittest { 248 import core.time; 249 import concurrency.operations : withScheduler, whenAll; 250 import concurrency.sender : justFrom; 251 252 shared int p = 0; 253 import concurrency.scheduler : ManualTimeWorker; 254 255 auto worker = new shared ManualTimeWorker(); 256 257 auto sampler = 7.msecs 258 .intervalStream() 259 .scan((int acc) => acc+1, 0) 260 .sample(3.msecs.intervalStream()) 261 .take(3) 262 .collect((int i) shared { p.atomicOp!"+="(i); }) 263 .withScheduler(worker.getScheduler); 264 265 auto driver = justFrom(() shared { 266 worker.advance(3.msecs); 267 p.atomicLoad.should == 0; 268 worker.timeUntilNextEvent().should == 3.msecs; 269 270 worker.advance(3.msecs); 271 p.atomicLoad.should == 0; 272 worker.timeUntilNextEvent().should == 1.msecs; 273 274 worker.advance(1.msecs); 275 p.atomicLoad.should == 0; 276 worker.timeUntilNextEvent().should == 2.msecs; 277 278 worker.advance(2.msecs); 279 p.atomicLoad.should == 1; 280 worker.timeUntilNextEvent().should == 3.msecs; 281 282 worker.advance(3.msecs); 283 p.atomicLoad.should == 1; 284 worker.timeUntilNextEvent().should == 2.msecs; 285 286 worker.advance(2.msecs); 287 p.atomicLoad.should == 1; 288 worker.timeUntilNextEvent().should == 1.msecs; 289 290 worker.advance(1.msecs); 291 p.atomicLoad.should == 3; 292 worker.timeUntilNextEvent().should == 3.msecs; 293 294 worker.advance(3.msecs); 295 p.atomicLoad.should == 3; 296 worker.timeUntilNextEvent().should == 3.msecs; 297 298 worker.advance(3.msecs); 299 p.atomicLoad.should == 6; 300 worker.timeUntilNextEvent().should == null; 301 }); 302 303 whenAll(sampler, driver).syncWait().assumeOk; 304 305 p.should == 6; 306 } 307 308 @("sharedStream") 309 @safe unittest { 310 import concurrency.operations : then, race; 311 312 auto source = sharedStream!int; 313 314 shared int p = 0; 315 316 auto emitter = ThreadSender().then(() shared { 317 source.emit(6); 318 source.emit(12); 319 }); 320 auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); }); 321 322 race(collector, emitter).syncWait().assumeOk; 323 324 p.atomicLoad.should == 18; 325 } 326 327 @("throttling.throttleLast") 328 @safe unittest { 329 import core.time; 330 import concurrency.scheduler : ManualTimeWorker; 331 import concurrency.operations : withScheduler, whenAll; 332 import concurrency.sender : justFrom; 333 334 shared int p = 0; 335 auto worker = new shared ManualTimeWorker(); 336 337 auto throttled = 1.msecs 338 .intervalStream(true) 339 .scan((int acc) => acc+1, 0) 340 .throttleLast(3.msecs) 341 .take(4) 342 .collect((int i) shared { p.atomicOp!"+="(i); }) 343 .withScheduler(worker.getScheduler); 344 345 auto driver = justFrom(() shared { 346 p.atomicLoad.should == 0; 347 worker.timeUntilNextEvent().should == 1.msecs; 348 349 foreach(expected; [0,0,3,3,3,9,9,9,18,18,18,30]) { 350 worker.advance(1.msecs); 351 p.atomicLoad.should == expected; 352 } 353 354 worker.timeUntilNextEvent().should == null; 355 }); 356 357 whenAll(throttled, driver).syncWait().assumeOk; 358 359 p.atomicLoad.should == 30; 360 } 361 362 @("throttling.throttleLast.arrayStream") 363 @safe unittest { 364 import core.time; 365 366 shared int p = 0; 367 368 [1,2,3].arrayStream() 369 .throttleLast(30.msecs) 370 .collect((int i) shared { p.atomicOp!"+="(i); }) 371 .syncWait().assumeOk; 372 373 p.atomicLoad.should == 3; 374 } 375 376 @("throttling.throttleLast.exception") 377 @safe unittest { 378 import core.time; 379 380 1.msecs 381 .intervalStream() 382 .throttleLast(10.msecs) 383 .collect(() shared { throw new Exception("Bla"); }) 384 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 385 } 386 387 @("throttling.throttleLast.thread.arrayStream") 388 @safe unittest { 389 import core.time; 390 391 shared int p = 0; 392 393 [1,2,3].arrayStream() 394 .via(ThreadSender()) 395 .throttleLast(30.msecs) 396 .collect((int i) shared { p.atomicOp!"+="(i); }) 397 .syncWait().assumeOk; 398 399 p.atomicLoad.should == 3; 400 } 401 402 @("throttling.throttleLast.thread.exception") 403 @safe unittest { 404 import core.time; 405 406 1.msecs 407 .intervalStream() 408 .via(ThreadSender()) 409 .throttleLast(10.msecs) 410 .collect(() shared { throw new Exception("Bla"); }) 411 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 412 } 413 414 @("throttling.throttleFirst") 415 @safe unittest { 416 import core.time; 417 import concurrency.scheduler : ManualTimeWorker; 418 import concurrency.operations : withScheduler, whenAll; 419 import concurrency.sender : justFrom; 420 421 shared int p = 0; 422 auto worker = new shared ManualTimeWorker(); 423 424 auto throttled = 1.msecs 425 .intervalStream() 426 .scan((int acc) => acc+1, 0) 427 .throttleFirst(3.msecs) 428 .take(2) 429 .collect((int i) shared { p.atomicOp!"+="(i); }) 430 .withScheduler(worker.getScheduler); 431 432 auto driver = justFrom(() shared { 433 p.atomicLoad.should == 0; 434 435 worker.advance(1.msecs); 436 p.atomicLoad.should == 1; 437 438 worker.advance(1.msecs); 439 p.atomicLoad.should == 1; 440 441 worker.advance(1.msecs); 442 p.atomicLoad.should == 1; 443 444 worker.advance(1.msecs); 445 p.atomicLoad.should == 5; 446 447 worker.timeUntilNextEvent().should == null; 448 }); 449 whenAll(throttled, driver).syncWait().assumeOk; 450 451 p.should == 5; 452 } 453 454 @("throttling.debounce") 455 @safe unittest { 456 import core.time; 457 import concurrency.scheduler : ManualTimeWorker; 458 import concurrency.operations : withScheduler, whenAll; 459 import concurrency.sender : justFrom; 460 461 shared int p = 0; 462 auto worker = new shared ManualTimeWorker(); 463 auto source = sharedStream!int; 464 465 auto throttled = source 466 .debounce(3.msecs) 467 .take(2) 468 .collect((int i) shared { p.atomicOp!"+="(i); }) 469 .withScheduler(worker.getScheduler); 470 471 auto driver = justFrom(() shared { 472 source.emit(1); 473 p.atomicLoad.should == 0; 474 worker.timeUntilNextEvent().should == 3.msecs; 475 476 worker.advance(3.msecs); 477 p.atomicLoad.should == 1; 478 479 source.emit(2); 480 p.atomicLoad.should == 1; 481 worker.timeUntilNextEvent().should == 3.msecs; 482 483 source.emit(3); 484 p.atomicLoad.should == 1; 485 worker.timeUntilNextEvent().should == 3.msecs; 486 487 worker.advance(1.msecs); 488 p.atomicLoad.should == 1; 489 worker.timeUntilNextEvent().should == 2.msecs; 490 491 source.emit(4); 492 p.atomicLoad.should == 1; 493 worker.timeUntilNextEvent().should == 3.msecs; 494 495 worker.advance(3.msecs); 496 p.atomicLoad.should == 5; 497 498 worker.timeUntilNextEvent().should == null; 499 }); 500 whenAll(throttled, driver).syncWait().assumeOk; 501 502 p.should == 5; 503 } 504 505 @("slide.basic") 506 @safe unittest { 507 [1,2,3,4,5,6,7].arrayStream 508 .slide(3) 509 .transform((int[] a) => a.dup) 510 .toList 511 .syncWait.value.should == [[1,2,3],[2,3,4],[3,4,5],[4,5,6],[5,6,7]]; 512 513 [1,2].arrayStream 514 .slide(3) 515 .toList 516 .syncWait.value.length.should == 0; 517 } 518 519 @("slide.step") 520 @safe unittest { 521 [1,2,3,4,5,6,7].arrayStream 522 .slide(3, 2) 523 .transform((int[] a) => a.dup) 524 .toList 525 .syncWait.value.should == [[1,2,3],[3,4,5],[5,6,7]]; 526 527 [1,2].arrayStream 528 .slide(2, 2) 529 .transform((int[] a) => a.dup) 530 .toList 531 .syncWait.value.should == [[1,2]]; 532 533 [1,2,3,4,5,6,7].arrayStream 534 .slide(2, 2) 535 .transform((int[] a) => a.dup) 536 .toList 537 .syncWait.value.should == [[1,2],[3,4],[5,6]]; 538 539 [1,2,3,4,5,6,7].arrayStream 540 .slide(2, 3) 541 .transform((int[] a) => a.dup) 542 .toList 543 .syncWait.value.should == [[1,2],[4,5]]; 544 545 [1,2,3,4,5,6,7,8,9,10].arrayStream 546 .slide(2, 4) 547 .transform((int[] a) => a.dup) 548 .toList 549 .syncWait.value.should == [[1,2],[5,6],[9,10]]; 550 } 551 552 @("toList.arrayStream") 553 @safe unittest { 554 [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3]; 555 } 556 557 @("toList.arrayStream.whenAll") 558 @safe unittest { 559 import concurrency.operations : withScheduler, whenAll; 560 import std.typecons : tuple; 561 auto s1 = [1,2,3].arrayStream.toList; 562 auto s2 = [2,3,4].arrayStream.toList; 563 whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]); 564 } 565 566 @("filter") 567 unittest { 568 [1,2,3,4].arrayStream 569 .filter((int i) => i % 2 == 0) 570 .toList 571 .syncWait 572 .value.should == [2,4]; 573 } 574 575 @("cycle") 576 unittest { 577 "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/"; 578 } 579 580 @("flatmap.concat.just") 581 @safe unittest { 582 import concurrency.sender : just; 583 584 [1,2,3].arrayStream 585 .flatMapConcat((int i) => just(i)) 586 .toList 587 .syncWait 588 .value 589 .should == [1,2,3]; 590 } 591 592 @("flatmap.concat.thread") 593 @safe unittest { 594 import concurrency.sender : just; 595 import concurrency.operations : via; 596 597 [1,2,3].arrayStream 598 .flatMapConcat((int i) => just(i).via(ThreadSender())) 599 .toList 600 .syncWait 601 .value 602 .should == [1,2,3]; 603 } 604 605 @("flatmap.concat.error") 606 @safe unittest { 607 import concurrency.sender : just, ErrorSender; 608 import concurrency.operations : via; 609 610 [1,2,3].arrayStream 611 .flatMapConcat((int i) => ErrorSender()) 612 .collect(()shared{}) 613 .syncWait 614 .assumeOk 615 .shouldThrow(); 616 } 617 618 @("flatmap.concat.thread.on.thread") 619 @safe unittest { 620 import concurrency.sender : just; 621 import concurrency.operations : via; 622 623 [1,2,3].arrayStream 624 .flatMapConcat((int i) => just(i).via(ThreadSender())) 625 .toList 626 .via(ThreadSender()) 627 .syncWait 628 .value 629 .should == [1,2,3]; 630 } 631 632 @("flatmap.latest.just") 633 @safe unittest { 634 import concurrency.sender : just; 635 636 [1,2,3].arrayStream 637 .flatMapLatest((int i) => just(i)) 638 .toList 639 .syncWait 640 .value 641 .should == [1,2,3]; 642 } 643 644 @("flatmap.latest.delay") 645 @safe unittest { 646 import concurrency.sender : just, delay; 647 import concurrency.operations : via, onTermination; 648 import core.time; 649 650 import std.stdio; 651 [1,2,3].arrayStream 652 .flatMapLatest((int i) => just(i).via(delay(50.msecs))) 653 .toList 654 .via(ThreadSender()) 655 .syncWait 656 .value 657 .should == [3]; 658 } 659 660 @("flatmap.latest.error") 661 @safe unittest { 662 import concurrency.sender : just, ErrorSender; 663 import concurrency.operations : via; 664 665 [1,2,3].arrayStream 666 .flatMapLatest((int i) => ErrorSender()) 667 .collect(()shared{}) 668 .syncWait 669 .assumeOk 670 .shouldThrow(); 671 } 672 673 @("flatmap.latest.justfrom.exception") 674 @safe unittest { 675 import concurrency.sender : justFrom; 676 677 import core.time; 678 679 1.msecs 680 .intervalStream() 681 .flatMapLatest(() => justFrom(() { throw new Exception("oops"); })) 682 .collect(()shared{}) 683 .syncWait 684 .assumeOk 685 .shouldThrow(); 686 } 687 688 @("flatmap.latest.exception") 689 @safe unittest { 690 import concurrency.sender : VoidSender; 691 692 import core.time; 693 694 1.msecs 695 .intervalStream() 696 .flatMapLatest(function VoidSender(){ 697 throw new Exception("oops"); 698 }) 699 .collect(()shared{}) 700 .syncWait 701 .assumeOk 702 .shouldThrow(); 703 } 704 705 @("flatmap.latest.intervalStream.overlap.delay") 706 @safe unittest { 707 import concurrency.sender : delay; 708 import core.time; 709 710 1.msecs 711 .intervalStream() 712 .take(2) 713 .flatMapLatest(() => 2.msecs.delay()) 714 .collect(()shared{}) 715 .syncWait 716 .assumeOk(); 717 } 718 719 @("flatmap.latest.intervalStream.intervalStream.take") 720 @safe unittest { 721 import concurrency.sender : delay; 722 import concurrency.scheduler : ManualTimeWorker; 723 import concurrency.operations : withScheduler, whenAll; 724 import concurrency.sender : justFrom; 725 import core.time; 726 727 import core.atomic; 728 shared int p; 729 730 auto worker = new shared ManualTimeWorker(); 731 auto sender = 5.msecs 732 .intervalStream() 733 .take(2) 734 .flatMapLatest(() shared { 735 return 1.msecs 736 .intervalStream(true) 737 .take(5) 738 .collect(() shared { p.atomicOp!"+="(1); }); 739 }) 740 .collect(()shared{}) 741 .withScheduler(worker.getScheduler); 742 743 auto driver = justFrom(() shared { 744 p.atomicLoad.should == 0; 745 worker.timeUntilNextEvent().should == 5.msecs; 746 747 worker.advance(5.msecs); 748 p.atomicLoad.should == 1; 749 worker.timeUntilNextEvent().should == 1.msecs; 750 751 worker.advance(1.msecs); 752 p.atomicLoad.should == 2; 753 754 worker.advance(1.msecs); 755 p.atomicLoad.should == 3; 756 757 worker.advance(1.msecs); 758 p.atomicLoad.should == 4; 759 760 worker.advance(1.msecs); 761 p.atomicLoad.should == 5; 762 763 worker.advance(1.msecs); 764 p.atomicLoad.should == 6; 765 766 worker.advance(1.msecs); 767 p.atomicLoad.should == 7; 768 769 worker.advance(1.msecs); 770 p.atomicLoad.should == 8; 771 772 worker.advance(1.msecs); 773 p.atomicLoad.should == 9; 774 775 worker.advance(1.msecs); 776 p.atomicLoad.should == 10; 777 778 worker.timeUntilNextEvent().should == null; 779 }); 780 whenAll(sender, driver).syncWait().assumeOk; 781 782 p.atomicLoad.should == 10; 783 } 784 785 @("flatmap.latest.intervalStream.intervalStream.sample") 786 @safe unittest { 787 import concurrency.sender : delay; 788 import concurrency.scheduler : ManualTimeWorker; 789 import concurrency.operations : withScheduler, whenAll; 790 import concurrency.sender : justFrom; 791 import core.time; 792 793 import core.atomic; 794 shared int p; 795 796 auto worker = new shared ManualTimeWorker(); 797 auto sender = 5.msecs 798 .intervalStream() 799 .take(2) 800 .flatMapLatest(() shared { 801 return 1.msecs 802 .intervalStream(true) 803 .scan((int i) => i + 1, 0) 804 .sample(2.msecs.intervalStream()) 805 .take(10) 806 .collect((int i) shared { p.atomicOp!"+="(1); }); 807 }) 808 .collect(()shared{}) 809 .withScheduler(worker.getScheduler); 810 811 auto driver = justFrom(() shared { 812 p.atomicLoad.should == 0; 813 worker.timeUntilNextEvent().should == 5.msecs; 814 815 worker.advance(5.msecs); 816 p.atomicLoad.should == 0; 817 worker.timeUntilNextEvent().should == 1.msecs; 818 819 worker.advance(1.msecs); 820 p.atomicLoad.should == 0; 821 822 worker.advance(1.msecs); 823 p.atomicLoad.should == 1; 824 825 worker.advance(1.msecs); 826 p.atomicLoad.should == 1; 827 828 worker.advance(1.msecs); 829 p.atomicLoad.should == 2; 830 831 worker.advance(1.msecs); 832 p.atomicLoad.should == 2; 833 834 worker.advance(2.msecs); 835 p.atomicLoad.should == 3; 836 837 worker.advance(2.msecs); 838 p.atomicLoad.should == 4; 839 840 worker.advance(2.msecs); 841 p.atomicLoad.should == 5; 842 843 worker.advance(2.msecs); 844 p.atomicLoad.should == 6; 845 846 worker.advance(2.msecs); 847 p.atomicLoad.should == 7; 848 849 worker.advance(2.msecs); 850 p.atomicLoad.should == 8; 851 852 worker.advance(2.msecs); 853 p.atomicLoad.should == 9; 854 855 worker.advance(2.msecs); 856 p.atomicLoad.should == 10; 857 858 worker.advance(2.msecs); 859 p.atomicLoad.should == 11; 860 861 worker.advance(2.msecs); 862 p.atomicLoad.should == 12; 863 864 worker.timeUntilNextEvent().should == null; 865 }); 866 whenAll(sender, driver).syncWait().assumeOk; 867 868 p.atomicLoad.should == 12; 869 } 870 871 @("deferStream.function") 872 @safe unittest { 873 import concurrency.stream.defer; 874 static auto getSender() @safe { 875 import concurrency.sender; 876 return just(1); 877 } 878 deferStream(&getSender).take(3).toList().syncWait().value.should == [1,1,1]; 879 } 880 881 @("deferStream.callable") 882 @safe unittest { 883 import concurrency.stream.defer; 884 static struct S { 885 auto opCall() shared @safe { 886 import concurrency.sender; 887 return just(1); 888 } 889 } 890 shared S s; 891 deferStream(s).take(3).toList().syncWait().value.should == [1,1,1]; 892 } 893 894 @("cron.timeTillNextMinute.Always") 895 @safe unittest { 896 import concurrency.stream.cron; 897 import core.time; 898 import std.datetime : SysTime, DateTime; 899 900 auto spec = Always().Spec; 901 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 1.minutes; 902 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 1.seconds; 903 } 904 905 @("cron.timeTillNextMinute.Exact") 906 @safe unittest { 907 import concurrency.stream.cron; 908 import core.time; 909 import std.datetime : SysTime, DateTime; 910 911 auto spec = Exact(5).Spec; 912 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 35.minutes; 913 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 34.minutes + 1.seconds; 914 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 5.minutes; 915 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 1.seconds; 916 } 917 918 @("cron.timeTillNextMinute.Every.basic") 919 @safe unittest { 920 import concurrency.stream.cron; 921 import core.time; 922 import std.datetime : SysTime, DateTime; 923 924 auto spec = Every(5).Spec; 925 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 5.minutes; 926 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 4.minutes + 1.seconds; 927 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 5.minutes; 928 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 1.seconds; 929 } 930 931 @("cron.timeTillNextMinute.Every.offset") 932 @safe unittest { 933 import concurrency.stream.cron; 934 import core.time; 935 import std.datetime : SysTime, DateTime; 936 937 auto spec = Every(5, 3).Spec; 938 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 3.minutes; 939 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 2.minutes + 1.seconds; 940 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 3.minutes; 941 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 3.minutes + 1.seconds; 942 } 943 944 @("cron.timeTillNextMinute.Each") 945 @safe unittest { 946 import concurrency.stream.cron; 947 import core.time; 948 import std.datetime : SysTime, DateTime; 949 950 auto spec = Each([1,15,19,44]).Spec; 951 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 14.minutes; 952 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 13.minutes + 1.seconds; 953 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 0, 0))).should == 1.minutes; 954 spec.timeTillNextMinute(SysTime(DateTime(2018, 1, 1, 10, 4, 59))).should == 10.minutes + 1.seconds; 955 } 956 957 @("cron.timeTillNextTrigger.Always") 958 @safe unittest { 959 import concurrency.stream.cron; 960 import core.time; 961 import std.datetime : SysTime, DateTime; 962 963 auto spec = CronSpec(Spec(Always()), Spec(Always())); 964 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 1.minutes; 965 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 1.seconds; 966 } 967 968 @("cron.timeTillNextTrigger.5.over.every.hour") 969 @safe unittest { 970 import concurrency.stream.cron; 971 import core.time; 972 import std.datetime : SysTime, DateTime; 973 974 auto spec = CronSpec(Spec(Always()), Spec(Exact(5))); 975 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0))).should == 35.minutes; 976 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59))).should == 34.minutes + 1.seconds; 977 } 978 979 @("cron.timeTillNextTrigger.5.over.5.hour") 980 @safe unittest { 981 import concurrency.stream.cron; 982 import core.time; 983 import std.datetime : SysTime, DateTime; 984 import std.datetime.timezone : UTC; 985 986 auto spec = CronSpec(Spec(Exact(5)), Spec(Exact(5))); 987 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 24.hours; 988 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 18.hours + 35.minutes; 989 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 18.hours + 34.minutes + 1.seconds; 990 } 991 992 @("cron.timeTillNextTrigger.5.over.every.2.hours") 993 @safe unittest { 994 import concurrency.stream.cron; 995 import core.time; 996 import std.datetime : SysTime, DateTime; 997 import std.datetime.timezone : UTC; 998 999 auto spec = CronSpec(Spec(Every(2)), Spec(Exact(5))); 1000 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 6, 5, 0), UTC())).should == 2.hours; 1001 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 1.hours; 1002 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 1.hours + 35.minutes; 1003 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 1.hours + 34.minutes + 1.seconds; 1004 } 1005 1006 @("cron.timeTillNextTrigger.every.5.over.every.2.hours") 1007 @safe unittest { 1008 import concurrency.stream.cron; 1009 import core.time; 1010 import std.datetime : SysTime, DateTime; 1011 import std.datetime.timezone : UTC; 1012 1013 auto spec = CronSpec(Spec(Every(2)), Spec(Every(5))); 1014 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 6, 5, 0), UTC())).should == 5.minutes; 1015 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 5, 5, 0), UTC())).should == 1.hours; 1016 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 0), UTC())).should == 5.minutes; 1017 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 30, 59), UTC())).should == 4.minutes + 1.seconds; 1018 spec.timeTillNextTrigger(SysTime(DateTime(2018, 1, 1, 10, 59, 59), UTC())).should == 1.hours + 5.minutes + 1.seconds; 1019 }