1 module ut.concurrency.stream; 2 3 import concurrency.stream; 4 import concurrency; 5 import unit_threaded; 6 import concurrency.stoptoken; 7 import core.atomic; 8 import concurrency.thread : ThreadSender; 9 10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. 11 12 @("arrayStream") 13 @safe unittest { 14 shared int p = 0; 15 [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 16 p.should == 6; 17 } 18 19 @("intervalStream") 20 @safe unittest { 21 import core.time; 22 import concurrency.operations : withScheduler, whenAll; 23 import concurrency.sender : justFrom; 24 import concurrency.scheduler : ManualTimeWorker; 25 26 auto worker = new shared ManualTimeWorker(); 27 auto interval = 5.msecs.intervalStream() 28 .take(2) 29 .collect(() shared {}) 30 .withScheduler(worker.getScheduler); 31 32 auto driver = justFrom(() shared { 33 worker.timeUntilNextEvent().should == 5.msecs; 34 worker.advance(5.msecs); 35 36 worker.timeUntilNextEvent().should == 5.msecs; 37 worker.advance(5.msecs); 38 39 worker.timeUntilNextEvent().should == null; 40 }); 41 42 whenAll(interval, driver).syncWait().assumeOk; 43 } 44 45 46 @("infiniteStream.stop") 47 @safe unittest { 48 import concurrency.operations : withStopSource; 49 shared int g = 0; 50 auto source = new shared StopSource(); 51 infiniteStream(5).collect((int n) shared { 52 if (g < 14) 53 g.atomicOp!"+="(n); 54 else 55 source.stop(); 56 }) 57 .withStopSource(source).syncWait.isCancelled.should == true; 58 g.should == 15; 59 }; 60 61 @("infiniteStream.take") 62 @safe unittest { 63 shared int g = 0; 64 infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 65 g.should == 20; 66 } 67 68 @("iotaStream") 69 @safe unittest { 70 import concurrency.stoptoken; 71 shared int g = 0; 72 iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 73 g.should == 10; 74 } 75 76 @("loopStream") 77 @safe unittest { 78 struct Loop { 79 size_t b,e; 80 void loop(DG, StopToken)(DG emit, StopToken stopToken) { 81 foreach(i; b..e) 82 emit(i); 83 } 84 } 85 shared int g = 0; 86 Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().assumeOk; 87 g.should == 6; 88 } 89 90 @("toStreamObject") 91 @safe unittest { 92 import core.atomic : atomicOp; 93 94 static StreamObjectBase!int getStream() { 95 return [1,2,3].arrayStream().toStreamObject(); 96 } 97 shared int p; 98 99 getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().assumeOk; 100 101 p.should == 6; 102 } 103 104 105 @("toStreamObject.take") 106 @safe unittest { 107 static StreamObjectBase!int getStream() { 108 return [1,2,3].arrayStream().toStreamObject(); 109 } 110 shared int p; 111 112 getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().assumeOk; 113 114 p.should == 3; 115 } 116 117 @("toStreamObject.void") 118 @safe unittest { 119 import core.time : msecs; 120 shared bool p = false; 121 122 1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().assumeOk; 123 124 p.should == true; 125 } 126 127 @("transform.int.double") 128 @safe unittest { 129 shared int p = 0; 130 [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 131 p.should == 18; 132 } 133 134 @("transform.int.bool") 135 @safe unittest { 136 shared int p = 0; 137 [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().assumeOk; 138 p.should == 1; 139 } 140 141 @("scan") 142 @safe unittest { 143 shared int p = 0; 144 [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 145 p.should == 10; 146 } 147 148 @("scan.void-value") 149 @safe unittest { 150 import core.time; 151 shared int p = 0; 152 5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().assumeOk; 153 p.should == 6; 154 } 155 156 @("take.enough") 157 @safe unittest { 158 shared int p = 0; 159 160 [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk; 161 p.should == 3; 162 } 163 164 @("take.too-few") 165 @safe unittest { 166 shared int p = 0; 167 168 [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.assumeOk; 169 p.should == 6; 170 } 171 172 @("take.donestream") 173 @safe unittest { 174 doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true; 175 } 176 177 @("take.errorstream") 178 @safe unittest { 179 errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad"); 180 } 181 182 @("sample.trigger.stop") 183 @safe unittest { 184 import core.time; 185 7.msecs.intervalStream() 186 .scan((int acc) => acc+1, 0) 187 .sample(10.msecs.intervalStream().take(3)) 188 .collect((int i) shared {}) 189 .syncWait().assumeOk; 190 } 191 192 @("sample.slower") 193 @safe unittest { 194 import core.time; 195 import concurrency.operations : withScheduler, whenAll; 196 import concurrency.sender : justFrom; 197 198 shared int p = 0; 199 import concurrency.scheduler : ManualTimeWorker; 200 201 auto worker = new shared ManualTimeWorker(); 202 203 auto sampler = 7.msecs 204 .intervalStream() 205 .scan((int acc) => acc+1, 0) 206 .sample(10.msecs.intervalStream()) 207 .take(3) 208 .collect((int i) shared { p.atomicOp!"+="(i); }) 209 .withScheduler(worker.getScheduler); 210 211 auto driver = justFrom(() shared { 212 worker.advance(7.msecs); 213 p.atomicLoad.should == 0; 214 worker.timeUntilNextEvent().should == 3.msecs; 215 216 worker.advance(3.msecs); 217 p.atomicLoad.should == 1; 218 worker.timeUntilNextEvent().should == 4.msecs; 219 220 worker.advance(4.msecs); 221 p.atomicLoad.should == 1; 222 worker.timeUntilNextEvent().should == 6.msecs; 223 224 worker.advance(6.msecs); 225 p.atomicLoad.should == 3; 226 worker.timeUntilNextEvent().should == 1.msecs; 227 228 worker.advance(1.msecs); 229 p.atomicLoad.should == 3; 230 worker.timeUntilNextEvent().should == 7.msecs; 231 232 worker.advance(7.msecs); 233 p.atomicLoad.should == 3; 234 worker.timeUntilNextEvent().should == 2.msecs; 235 236 worker.advance(2.msecs); 237 p.atomicLoad.should == 7; 238 worker.timeUntilNextEvent().should == null; 239 }); 240 241 whenAll(sampler, driver).syncWait().assumeOk; 242 243 p.should == 7; 244 } 245 246 @("sample.faster") 247 @safe unittest { 248 import core.time; 249 import concurrency.operations : withScheduler, whenAll; 250 import concurrency.sender : justFrom; 251 252 shared int p = 0; 253 import concurrency.scheduler : ManualTimeWorker; 254 255 auto worker = new shared ManualTimeWorker(); 256 257 auto sampler = 7.msecs 258 .intervalStream() 259 .scan((int acc) => acc+1, 0) 260 .sample(3.msecs.intervalStream()) 261 .take(3) 262 .collect((int i) shared { p.atomicOp!"+="(i); }) 263 .withScheduler(worker.getScheduler); 264 265 auto driver = justFrom(() shared { 266 worker.advance(3.msecs); 267 p.atomicLoad.should == 0; 268 worker.timeUntilNextEvent().should == 3.msecs; 269 270 worker.advance(3.msecs); 271 p.atomicLoad.should == 0; 272 worker.timeUntilNextEvent().should == 1.msecs; 273 274 worker.advance(1.msecs); 275 p.atomicLoad.should == 0; 276 worker.timeUntilNextEvent().should == 2.msecs; 277 278 worker.advance(2.msecs); 279 p.atomicLoad.should == 1; 280 worker.timeUntilNextEvent().should == 3.msecs; 281 282 worker.advance(3.msecs); 283 p.atomicLoad.should == 1; 284 worker.timeUntilNextEvent().should == 2.msecs; 285 286 worker.advance(2.msecs); 287 p.atomicLoad.should == 1; 288 worker.timeUntilNextEvent().should == 1.msecs; 289 290 worker.advance(1.msecs); 291 p.atomicLoad.should == 3; 292 worker.timeUntilNextEvent().should == 3.msecs; 293 294 worker.advance(3.msecs); 295 p.atomicLoad.should == 3; 296 worker.timeUntilNextEvent().should == 3.msecs; 297 298 worker.advance(3.msecs); 299 p.atomicLoad.should == 6; 300 worker.timeUntilNextEvent().should == null; 301 }); 302 303 whenAll(sampler, driver).syncWait().assumeOk; 304 305 p.should == 6; 306 } 307 308 @("sharedStream") 309 @safe unittest { 310 import concurrency.operations : then, race; 311 312 auto source = sharedStream!int; 313 314 shared int p = 0; 315 316 auto emitter = ThreadSender().then(() shared { 317 source.emit(6); 318 source.emit(12); 319 }); 320 auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); }); 321 322 race(collector, emitter).syncWait().assumeOk; 323 324 p.atomicLoad.should == 18; 325 } 326 327 @("throttling.throttleLast") 328 @safe unittest { 329 import core.time; 330 331 shared int p = 0; 332 333 1.msecs 334 .intervalStream() 335 .scan((int acc) => acc+1, 0) 336 .throttleLast(3.msecs) 337 .take(6) 338 .collect((int i) shared { p.atomicOp!"+="(i); }) 339 .syncWait().assumeOk; 340 341 p.atomicLoad.shouldBeGreaterThan(40); 342 } 343 344 @("throttling.throttleLast.arrayStream") 345 @safe unittest { 346 import core.time; 347 348 shared int p = 0; 349 350 [1,2,3].arrayStream() 351 .throttleLast(30.msecs) 352 .collect((int i) shared { p.atomicOp!"+="(i); }) 353 .syncWait().assumeOk; 354 355 p.atomicLoad.should == 3; 356 } 357 358 @("throttling.throttleLast.exception") 359 @safe unittest { 360 import core.time; 361 362 1.msecs 363 .intervalStream() 364 .throttleLast(10.msecs) 365 .collect(() shared { throw new Exception("Bla"); }) 366 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 367 } 368 369 @("throttling.throttleLast.thread") 370 @safe unittest { 371 import core.time; 372 373 shared int p = 0; 374 375 1.msecs 376 .intervalStream() 377 .via(ThreadSender()) 378 .scan((int acc) => acc+1, 0) 379 .throttleLast(3.msecs) 380 .take(6) 381 .collect((int i) shared { p.atomicOp!"+="(i); }) 382 .syncWait().assumeOk; 383 384 p.atomicLoad.shouldBeGreaterThan(40); 385 } 386 387 @("throttling.throttleLast.thread.arrayStream") 388 @safe unittest { 389 import core.time; 390 391 shared int p = 0; 392 393 [1,2,3].arrayStream() 394 .via(ThreadSender()) 395 .throttleLast(30.msecs) 396 .collect((int i) shared { p.atomicOp!"+="(i); }) 397 .syncWait().assumeOk; 398 399 p.atomicLoad.should == 3; 400 } 401 402 @("throttling.throttleLast.thread.exception") 403 @safe unittest { 404 import core.time; 405 406 1.msecs 407 .intervalStream() 408 .via(ThreadSender()) 409 .throttleLast(10.msecs) 410 .collect(() shared { throw new Exception("Bla"); }) 411 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 412 } 413 414 @("throttling.throttleFirst") 415 @safe unittest { 416 import core.time; 417 import concurrency.scheduler : ManualTimeWorker; 418 import concurrency.operations : withScheduler, whenAll; 419 import concurrency.sender : justFrom; 420 421 shared int p = 0; 422 auto worker = new shared ManualTimeWorker(); 423 424 auto throttled = 1.msecs 425 .intervalStream() 426 .scan((int acc) => acc+1, 0) 427 .throttleFirst(3.msecs) 428 .take(2) 429 .collect((int i) shared { p.atomicOp!"+="(i); }) 430 .withScheduler(worker.getScheduler); 431 432 auto driver = justFrom(() shared { 433 p.atomicLoad.should == 0; 434 435 worker.advance(1.msecs); 436 p.atomicLoad.should == 1; 437 438 worker.advance(1.msecs); 439 p.atomicLoad.should == 1; 440 441 worker.advance(1.msecs); 442 p.atomicLoad.should == 1; 443 444 worker.advance(1.msecs); 445 p.atomicLoad.should == 5; 446 447 worker.timeUntilNextEvent().should == null; 448 }); 449 whenAll(throttled, driver).syncWait().assumeOk; 450 451 p.should == 5; 452 } 453 454 @("throttling.debounce") 455 @safe unittest { 456 import core.time; 457 import concurrency.scheduler : ManualTimeWorker; 458 import concurrency.operations : withScheduler, whenAll; 459 import concurrency.sender : justFrom; 460 461 shared int p = 0; 462 auto worker = new shared ManualTimeWorker(); 463 auto source = sharedStream!int; 464 465 auto throttled = source 466 .debounce(3.msecs) 467 .take(2) 468 .collect((int i) shared { p.atomicOp!"+="(i); }) 469 .withScheduler(worker.getScheduler); 470 471 auto driver = justFrom(() shared { 472 source.emit(1); 473 p.atomicLoad.should == 0; 474 worker.timeUntilNextEvent().should == 3.msecs; 475 476 worker.advance(3.msecs); 477 p.atomicLoad.should == 1; 478 479 source.emit(2); 480 p.atomicLoad.should == 1; 481 worker.timeUntilNextEvent().should == 3.msecs; 482 483 source.emit(3); 484 p.atomicLoad.should == 1; 485 worker.timeUntilNextEvent().should == 3.msecs; 486 487 worker.advance(1.msecs); 488 p.atomicLoad.should == 1; 489 worker.timeUntilNextEvent().should == 2.msecs; 490 491 source.emit(4); 492 p.atomicLoad.should == 1; 493 worker.timeUntilNextEvent().should == 3.msecs; 494 495 worker.advance(3.msecs); 496 p.atomicLoad.should == 5; 497 498 worker.timeUntilNextEvent().should == null; 499 }); 500 whenAll(throttled, driver).syncWait().assumeOk; 501 502 p.should == 5; 503 } 504 505 @("slide") 506 @safe unittest { 507 import std.stdio; 508 import std.functional : toDelegate; 509 import std.algorithm : sum; 510 shared int p; 511 512 [1,2,3,4,5,6,7].arrayStream 513 .slide(3) 514 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 515 .syncWait.assumeOk; 516 517 p.should == 60; 518 519 [1,2].arrayStream 520 .slide(3) 521 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 522 .syncWait.assumeOk; 523 524 p.should == 60; 525 } 526 527 @("slide.step") 528 @safe unittest { 529 import std.stdio; 530 import std.functional : toDelegate; 531 import std.algorithm : sum; 532 shared int p; 533 534 [1,2,3,4,5,6,7].arrayStream 535 .slide(3, 2) 536 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 537 .syncWait.assumeOk; 538 539 p.should == 36; 540 541 [1,2].arrayStream 542 .slide(2, 2) 543 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 544 .syncWait.assumeOk; 545 546 p.should == 39; 547 } 548 549 @("toList.arrayStream") 550 @safe unittest { 551 [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3]; 552 } 553 554 @("toList.arrayStream.whenAll") 555 @safe unittest { 556 import concurrency.operations : withScheduler, whenAll; 557 import std.typecons : tuple; 558 auto s1 = [1,2,3].arrayStream.toList; 559 auto s2 = [2,3,4].arrayStream.toList; 560 whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]); 561 } 562 563 @("filter") 564 unittest { 565 [1,2,3,4].arrayStream 566 .filter((int i) => i % 2 == 0) 567 .toList 568 .syncWait 569 .value.should == [2,4]; 570 } 571 572 @("cycle") 573 unittest { 574 "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/"; 575 } 576 577 @("flatmap.concat.just") 578 @safe unittest { 579 import concurrency.sender : just; 580 581 [1,2,3].arrayStream 582 .flatMapConcat((int i) => just(i)) 583 .toList 584 .syncWait 585 .value 586 .should == [1,2,3]; 587 } 588 589 @("flatmap.concat.thread") 590 @safe unittest { 591 import concurrency.sender : just; 592 import concurrency.operations : via; 593 594 [1,2,3].arrayStream 595 .flatMapConcat((int i) => just(i).via(ThreadSender())) 596 .toList 597 .syncWait 598 .value 599 .should == [1,2,3]; 600 } 601 602 @("flatmap.concat.error") 603 @safe unittest { 604 import concurrency.sender : just, ErrorSender; 605 import concurrency.operations : via; 606 607 [1,2,3].arrayStream 608 .flatMapConcat((int i) => ErrorSender()) 609 .collect(()shared{}) 610 .syncWait 611 .assumeOk 612 .shouldThrow(); 613 } 614 615 @("flatmap.concat.thread.on.thread") 616 @safe unittest { 617 import concurrency.sender : just; 618 import concurrency.operations : via; 619 620 [1,2,3].arrayStream 621 .flatMapConcat((int i) => just(i).via(ThreadSender())) 622 .toList 623 .via(ThreadSender()) 624 .syncWait 625 .value 626 .should == [1,2,3]; 627 } 628 629 @("flatmap.latest.just") 630 @safe unittest { 631 import concurrency.sender : just; 632 633 [1,2,3].arrayStream 634 .flatMapLatest((int i) => just(i)) 635 .toList 636 .syncWait 637 .value 638 .should == [1,2,3]; 639 } 640 641 @("flatmap.latest.delay") 642 @safe unittest { 643 import concurrency.sender : just, delay; 644 import concurrency.operations : via, onTermination; 645 import core.time; 646 647 import std.stdio; 648 [1,2,3].arrayStream 649 .flatMapLatest((int i) => just(i).via(delay(50.msecs))) 650 .toList 651 .via(ThreadSender()) 652 .syncWait 653 .value 654 .should == [3]; 655 } 656 657 @("flatmap.latest.error") 658 @safe unittest { 659 import concurrency.sender : just, ErrorSender; 660 import concurrency.operations : via; 661 662 [1,2,3].arrayStream 663 .flatMapLatest((int i) => ErrorSender()) 664 .collect(()shared{}) 665 .syncWait 666 .assumeOk 667 .shouldThrow(); 668 }