1 module ut.concurrency.stream; 2 3 import concurrency.stream; 4 import concurrency; 5 import unit_threaded; 6 import concurrency.stoptoken; 7 import core.atomic; 8 import concurrency.thread : ThreadSender; 9 10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. 11 12 @("arrayStream") 13 @safe unittest { 14 shared int p = 0; 15 [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 16 p.should == 6; 17 } 18 19 @("intervalStream") 20 @safe unittest { 21 import core.time; 22 import concurrency.operations : withScheduler, whenAll; 23 import concurrency.sender : justFrom; 24 import concurrency.scheduler : ManualTimeWorker; 25 26 auto worker = new shared ManualTimeWorker(); 27 auto interval = 5.msecs.intervalStream() 28 .take(2) 29 .collect(() shared {}) 30 .withScheduler(worker.getScheduler); 31 32 auto driver = justFrom(() shared { 33 worker.timeUntilNextEvent().should == 5.msecs; 34 worker.advance(5.msecs); 35 36 worker.timeUntilNextEvent().should == 5.msecs; 37 worker.advance(5.msecs); 38 39 worker.timeUntilNextEvent().should == null; 40 }); 41 42 whenAll(interval, driver).syncWait().isOk.should == true; 43 } 44 45 46 @("infiniteStream.stop") 47 @safe unittest { 48 import concurrency.operations : withStopSource; 49 shared int g = 0; 50 auto source = new shared StopSource(); 51 infiniteStream(5).collect((int n) shared { 52 if (g < 14) 53 g.atomicOp!"+="(n); 54 else 55 source.stop(); 56 }) 57 .withStopSource(source).syncWait.isCancelled.should == true; 58 g.should == 15; 59 }; 60 61 @("infiniteStream.take") 62 @safe unittest { 63 shared int g = 0; 64 infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 65 g.should == 20; 66 } 67 68 @("iotaStream") 69 @safe unittest { 70 import concurrency.stoptoken; 71 shared int g = 0; 72 iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 73 g.should == 10; 74 } 75 76 @("loopStream") 77 @safe unittest { 78 struct Loop { 79 size_t b,e; 80 void loop(DG, StopToken)(DG emit, StopToken stopToken) { 81 foreach(i; b..e) 82 emit(i); 83 } 84 } 85 shared int g = 0; 86 Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 87 g.should == 6; 88 } 89 90 @("toStreamObject") 91 @safe unittest { 92 import core.atomic : atomicOp; 93 94 static StreamObjectBase!int getStream() { 95 return [1,2,3].arrayStream().toStreamObject(); 96 } 97 shared int p; 98 99 getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 100 101 p.should == 6; 102 } 103 104 105 @("toStreamObject.take") 106 @safe unittest { 107 static StreamObjectBase!int getStream() { 108 return [1,2,3].arrayStream().toStreamObject(); 109 } 110 shared int p; 111 112 getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 113 114 p.should == 3; 115 } 116 117 @("toStreamObject.void") 118 @safe unittest { 119 import core.time : msecs; 120 shared bool p = false; 121 122 1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true; 123 124 p.should == true; 125 } 126 127 @("transform.int.double") 128 @safe unittest { 129 shared int p = 0; 130 [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 131 p.should == 18; 132 } 133 134 @("transform.int.bool") 135 @safe unittest { 136 shared int p = 0; 137 [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true; 138 p.should == 1; 139 } 140 141 @("scan") 142 @safe unittest { 143 shared int p = 0; 144 [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 145 p.should == 10; 146 } 147 148 @("scan.void-value") 149 @safe unittest { 150 import core.time; 151 shared int p = 0; 152 5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 153 p.should == 6; 154 } 155 156 @("take.enough") 157 @safe unittest { 158 shared int p = 0; 159 160 [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 161 p.should == 3; 162 } 163 164 @("take.too-few") 165 @safe unittest { 166 shared int p = 0; 167 168 [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 169 p.should == 6; 170 } 171 172 @("take.donestream") 173 @safe unittest { 174 doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true; 175 } 176 177 @("take.errorstream") 178 @safe unittest { 179 errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad"); 180 } 181 182 @("sample.trigger.stop") 183 @safe unittest { 184 import core.time; 185 auto sampler = 7.msecs.intervalStream() 186 .scan((int acc) => acc+1, 0) 187 .sample(10.msecs.intervalStream().take(3)) 188 .collect((int i) shared {}) 189 .syncWait().isOk.should == true; 190 } 191 192 @("sample.slower") 193 @safe unittest { 194 import core.time; 195 import concurrency.operations : withScheduler, whenAll; 196 import concurrency.sender : justFrom; 197 198 shared int p = 0; 199 import concurrency.scheduler : ManualTimeWorker; 200 201 auto worker = new shared ManualTimeWorker(); 202 203 auto sampler = 7.msecs 204 .intervalStream() 205 .scan((int acc) => acc+1, 0) 206 .sample(10.msecs.intervalStream()) 207 .take(3) 208 .collect((int i) shared { p.atomicOp!"+="(i); }) 209 .withScheduler(worker.getScheduler); 210 211 auto driver = justFrom(() shared { 212 worker.advance(7.msecs); 213 p.atomicLoad.should == 0; 214 worker.timeUntilNextEvent().should == 3.msecs; 215 216 worker.advance(3.msecs); 217 p.atomicLoad.should == 1; 218 worker.timeUntilNextEvent().should == 4.msecs; 219 220 worker.advance(4.msecs); 221 p.atomicLoad.should == 1; 222 worker.timeUntilNextEvent().should == 6.msecs; 223 224 worker.advance(6.msecs); 225 p.atomicLoad.should == 3; 226 worker.timeUntilNextEvent().should == 1.msecs; 227 228 worker.advance(1.msecs); 229 p.atomicLoad.should == 3; 230 worker.timeUntilNextEvent().should == 7.msecs; 231 232 worker.advance(7.msecs); 233 p.atomicLoad.should == 3; 234 worker.timeUntilNextEvent().should == 2.msecs; 235 236 worker.advance(2.msecs); 237 p.atomicLoad.should == 7; 238 worker.timeUntilNextEvent().should == null; 239 }); 240 241 whenAll(sampler, driver).syncWait().isOk.should == true; 242 243 p.should == 7; 244 } 245 246 @("sample.faster") 247 @safe unittest { 248 import core.time; 249 import concurrency.operations : withScheduler, whenAll; 250 import concurrency.sender : justFrom; 251 252 shared int p = 0; 253 import concurrency.scheduler : ManualTimeWorker; 254 255 auto worker = new shared ManualTimeWorker(); 256 257 auto sampler = 7.msecs 258 .intervalStream() 259 .scan((int acc) => acc+1, 0) 260 .sample(3.msecs.intervalStream()) 261 .take(3) 262 .collect((int i) shared { p.atomicOp!"+="(i); }) 263 .withScheduler(worker.getScheduler); 264 265 auto driver = justFrom(() shared { 266 worker.advance(3.msecs); 267 p.atomicLoad.should == 0; 268 worker.timeUntilNextEvent().should == 3.msecs; 269 270 worker.advance(3.msecs); 271 p.atomicLoad.should == 0; 272 worker.timeUntilNextEvent().should == 1.msecs; 273 274 worker.advance(1.msecs); 275 p.atomicLoad.should == 0; 276 worker.timeUntilNextEvent().should == 2.msecs; 277 278 worker.advance(2.msecs); 279 p.atomicLoad.should == 1; 280 worker.timeUntilNextEvent().should == 3.msecs; 281 282 worker.advance(3.msecs); 283 p.atomicLoad.should == 1; 284 worker.timeUntilNextEvent().should == 2.msecs; 285 286 worker.advance(2.msecs); 287 p.atomicLoad.should == 1; 288 worker.timeUntilNextEvent().should == 1.msecs; 289 290 worker.advance(1.msecs); 291 p.atomicLoad.should == 3; 292 worker.timeUntilNextEvent().should == 3.msecs; 293 294 worker.advance(3.msecs); 295 p.atomicLoad.should == 3; 296 worker.timeUntilNextEvent().should == 3.msecs; 297 298 worker.advance(3.msecs); 299 // NOTE: normally `p` ought to be 6 since 3*7 msecs have already elapsed. 300 // but due to the way slots work in the timingwheels implementation 301 // timers that are added later are executed earlier. 302 // see https://github.com/symmetryinvestments/concurrency/issues/35 303 p.atomicLoad.should == 3; 304 worker.timeUntilNextEvent().should == 3.msecs; 305 306 worker.advance(3.msecs); 307 p.atomicLoad.should == 6; 308 worker.timeUntilNextEvent().should == null; 309 }); 310 311 whenAll(sampler, driver).syncWait().assumeOk; 312 313 p.should == 6; 314 } 315 316 @("sharedStream") 317 @safe unittest { 318 import concurrency.operations : then, race; 319 320 auto source = sharedStream!int; 321 322 shared int p = 0; 323 324 auto emitter = ThreadSender().then(() shared { 325 source.emit(6); 326 source.emit(12); 327 }); 328 auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); }); 329 330 race(collector, emitter).syncWait().isOk.should == true; 331 332 p.atomicLoad.should == 18; 333 } 334 335 @("throttling.throttleLast") 336 @safe unittest { 337 import core.time; 338 339 shared int p = 0; 340 341 1.msecs 342 .intervalStream() 343 .scan((int acc) => acc+1, 0) 344 .throttleLast(3.msecs) 345 .take(6) 346 .collect((int i) shared { p.atomicOp!"+="(i); }) 347 .syncWait().isOk.should == true; 348 349 p.atomicLoad.shouldBeGreaterThan(40); 350 } 351 352 @("throttling.throttleLast.arrayStream") 353 @safe unittest { 354 import core.time; 355 356 shared int p = 0; 357 358 [1,2,3].arrayStream() 359 .throttleLast(30.msecs) 360 .collect((int i) shared { p.atomicOp!"+="(i); }) 361 .syncWait().isOk.should == true; 362 363 p.atomicLoad.should == 3; 364 } 365 366 @("throttling.throttleLast.exception") 367 @safe unittest { 368 import core.time; 369 370 1.msecs 371 .intervalStream() 372 .throttleLast(10.msecs) 373 .collect(() shared { throw new Exception("Bla"); }) 374 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 375 } 376 377 @("throttling.throttleLast.thread") 378 @safe unittest { 379 import core.time; 380 381 shared int p = 0; 382 383 1.msecs 384 .intervalStream() 385 .via(ThreadSender()) 386 .scan((int acc) => acc+1, 0) 387 .throttleLast(3.msecs) 388 .take(6) 389 .collect((int i) shared { p.atomicOp!"+="(i); }) 390 .syncWait().isOk.should == true; 391 392 p.atomicLoad.shouldBeGreaterThan(40); 393 } 394 395 @("throttling.throttleLast.thread.arrayStream") 396 @safe unittest { 397 import core.time; 398 399 shared int p = 0; 400 401 [1,2,3].arrayStream() 402 .via(ThreadSender()) 403 .throttleLast(30.msecs) 404 .collect((int i) shared { p.atomicOp!"+="(i); }) 405 .syncWait().isOk.should == true; 406 407 p.atomicLoad.should == 3; 408 } 409 410 @("throttling.throttleLast.thread.exception") 411 @safe unittest { 412 import core.time; 413 414 1.msecs 415 .intervalStream() 416 .via(ThreadSender()) 417 .throttleLast(10.msecs) 418 .collect(() shared { throw new Exception("Bla"); }) 419 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 420 } 421 422 @("throttling.throttleFirst") 423 @safe unittest { 424 import core.time; 425 import concurrency.scheduler : ManualTimeWorker; 426 import concurrency.operations : withScheduler, whenAll; 427 import concurrency.sender : justFrom; 428 429 shared int p = 0; 430 auto worker = new shared ManualTimeWorker(); 431 432 auto throttled = 1.msecs 433 .intervalStream() 434 .scan((int acc) => acc+1, 0) 435 .throttleFirst(3.msecs) 436 .take(2) 437 .collect((int i) shared { p.atomicOp!"+="(i); }) 438 .withScheduler(worker.getScheduler); 439 440 auto driver = justFrom(() shared { 441 p.atomicLoad.should == 0; 442 443 worker.advance(1.msecs); 444 p.atomicLoad.should == 1; 445 446 worker.advance(1.msecs); 447 p.atomicLoad.should == 1; 448 449 worker.advance(1.msecs); 450 p.atomicLoad.should == 1; 451 452 worker.advance(1.msecs); 453 p.atomicLoad.should == 5; 454 455 worker.timeUntilNextEvent().should == null; 456 }); 457 whenAll(throttled, driver).syncWait().isOk.should == true; 458 459 p.should == 5; 460 } 461 462 @("throttling.debounce") 463 @safe unittest { 464 import core.time; 465 import concurrency.scheduler : ManualTimeWorker; 466 import concurrency.operations : withScheduler, whenAll; 467 import concurrency.sender : justFrom; 468 469 shared int p = 0; 470 auto worker = new shared ManualTimeWorker(); 471 auto source = sharedStream!int; 472 473 auto throttled = source 474 .debounce(3.msecs) 475 .take(2) 476 .collect((int i) shared { p.atomicOp!"+="(i); }) 477 .withScheduler(worker.getScheduler); 478 479 auto driver = justFrom(() shared { 480 source.emit(1); 481 p.atomicLoad.should == 0; 482 worker.timeUntilNextEvent().should == 3.msecs; 483 484 worker.advance(3.msecs); 485 p.atomicLoad.should == 1; 486 487 source.emit(2); 488 p.atomicLoad.should == 1; 489 worker.timeUntilNextEvent().should == 3.msecs; 490 491 source.emit(3); 492 p.atomicLoad.should == 1; 493 worker.timeUntilNextEvent().should == 3.msecs; 494 495 worker.advance(1.msecs); 496 p.atomicLoad.should == 1; 497 worker.timeUntilNextEvent().should == 2.msecs; 498 499 source.emit(4); 500 p.atomicLoad.should == 1; 501 worker.timeUntilNextEvent().should == 3.msecs; 502 503 worker.advance(3.msecs); 504 p.atomicLoad.should == 5; 505 506 worker.timeUntilNextEvent().should == null; 507 }); 508 whenAll(throttled, driver).syncWait().isOk.should == true; 509 510 p.should == 5; 511 } 512 513 @("slide") 514 @safe unittest { 515 import std.stdio; 516 import std.functional : toDelegate; 517 import std.algorithm : sum; 518 shared int p; 519 520 [1,2,3,4,5,6,7].arrayStream 521 .slide(3) 522 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 523 .syncWait.isOk.should == true; 524 525 p.should == 60; 526 527 [1,2].arrayStream 528 .slide(3) 529 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 530 .syncWait.isOk.should == true; 531 532 p.should == 60; 533 } 534 535 @("slide.step") 536 @safe unittest { 537 import std.stdio; 538 import std.functional : toDelegate; 539 import std.algorithm : sum; 540 shared int p; 541 542 [1,2,3,4,5,6,7].arrayStream 543 .slide(3, 2) 544 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 545 .syncWait.isOk.should == true; 546 547 p.should == 36; 548 549 [1,2].arrayStream 550 .slide(2, 2) 551 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 552 .syncWait.isOk.should == true; 553 554 p.should == 39; 555 } 556 557 @("toList.arrayStream") 558 @safe unittest { 559 [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3]; 560 } 561 562 @("toList.arrayStream.whenAll") 563 @safe unittest { 564 import concurrency.operations : withScheduler, whenAll; 565 import std.typecons : tuple; 566 auto s1 = [1,2,3].arrayStream.toList; 567 auto s2 = [2,3,4].arrayStream.toList; 568 whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]); 569 } 570 571 @("filter") 572 unittest { 573 [1,2,3,4].arrayStream 574 .filter((int i) => i % 2 == 0) 575 .toList 576 .syncWait 577 .value.should == [2,4]; 578 } 579 580 @("cycle") 581 unittest { 582 "-/|\\".cycleStream().take(6).toList.syncWait.value.should == "-/|\\-/"; 583 }