1 module ut.concurrency.stream; 2 3 import concurrency.stream; 4 import concurrency; 5 import unit_threaded; 6 import concurrency.stoptoken; 7 import core.atomic; 8 import concurrency.thread : ThreadSender; 9 10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. 11 12 @("arrayStream") 13 @safe unittest { 14 shared int p = 0; 15 [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 16 p.should == 6; 17 } 18 19 @("intervalStream") 20 @safe unittest { 21 import core.time; 22 import concurrency.operations : withScheduler, whenAll; 23 import concurrency.sender : justFrom; 24 import concurrency.scheduler : ManualTimeWorker; 25 26 auto worker = new shared ManualTimeWorker(); 27 auto interval = 5.msecs.intervalStream() 28 .take(2) 29 .collect(() shared {}) 30 .withScheduler(worker.getScheduler); 31 32 auto driver = justFrom(() shared { 33 worker.timeUntilNextEvent().should == 5.msecs; 34 worker.advance(5.msecs); 35 36 worker.timeUntilNextEvent().should == 5.msecs; 37 worker.advance(5.msecs); 38 39 worker.timeUntilNextEvent().should == null; 40 }); 41 42 whenAll(interval, driver).syncWait().isOk.should == true; 43 } 44 45 46 @("infiniteStream.stop") 47 @safe unittest { 48 import concurrency.operations : withStopSource; 49 shared int g = 0; 50 auto source = new shared StopSource(); 51 infiniteStream(5).collect((int n) shared { 52 if (g < 14) 53 g.atomicOp!"+="(n); 54 else 55 source.stop(); 56 }) 57 .withStopSource(source).syncWait.isCancelled.should == true; 58 g.should == 15; 59 }; 60 61 @("infiniteStream.take") 62 @safe unittest { 63 shared int g = 0; 64 infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 65 g.should == 20; 66 } 67 68 @("iotaStream") 69 @safe unittest { 70 import concurrency.stoptoken; 71 shared int g = 0; 72 iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 73 g.should == 10; 74 } 75 76 @("loopStream") 77 @safe unittest { 78 struct Loop { 79 size_t b,e; 80 void loop(DG, StopToken)(DG emit, StopToken stopToken) { 81 foreach(i; b..e) 82 emit(i); 83 } 84 } 85 shared int g = 0; 86 Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 87 g.should == 6; 88 } 89 90 @("toStreamObject") 91 @safe unittest { 92 import core.atomic : atomicOp; 93 94 static StreamObjectBase!int getStream() { 95 return [1,2,3].arrayStream().toStreamObject(); 96 } 97 shared int p; 98 99 getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 100 101 p.should == 6; 102 } 103 104 105 @("toStreamObject.take") 106 @safe unittest { 107 static StreamObjectBase!int getStream() { 108 return [1,2,3].arrayStream().toStreamObject(); 109 } 110 shared int p; 111 112 getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 113 114 p.should == 3; 115 } 116 117 @("toStreamObject.void") 118 @safe unittest { 119 import core.time : msecs; 120 shared bool p = false; 121 122 1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true; 123 124 p.should == true; 125 } 126 127 @("transform.int.double") 128 @safe unittest { 129 shared int p = 0; 130 [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 131 p.should == 18; 132 } 133 134 @("transform.int.bool") 135 @safe unittest { 136 shared int p = 0; 137 [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true; 138 p.should == 1; 139 } 140 141 @("scan") 142 @safe unittest { 143 shared int p = 0; 144 [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 145 p.should == 10; 146 } 147 148 @("scan.void-value") 149 @safe unittest { 150 import core.time; 151 shared int p = 0; 152 5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 153 p.should == 6; 154 } 155 156 @("take.enough") 157 @safe unittest { 158 shared int p = 0; 159 160 [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 161 p.should == 3; 162 } 163 164 @("take.too-few") 165 @safe unittest { 166 shared int p = 0; 167 168 [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 169 p.should == 6; 170 } 171 172 @("take.donestream") 173 @safe unittest { 174 doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true; 175 } 176 177 @("take.errorstream") 178 @safe unittest { 179 errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad"); 180 } 181 182 @("sample.trigger.stop") 183 @safe unittest { 184 import core.time; 185 auto sampler = 7.msecs.intervalStream() 186 .scan((int acc) => acc+1, 0) 187 .sample(10.msecs.intervalStream().take(3)) 188 .collect((int i) shared {}) 189 .syncWait().isOk.should == true; 190 } 191 192 @("sample.slower") 193 @safe unittest { 194 import core.time; 195 import concurrency.operations : withScheduler, whenAll; 196 import concurrency.sender : justFrom; 197 198 shared int p = 0; 199 import concurrency.scheduler : ManualTimeWorker; 200 201 auto worker = new shared ManualTimeWorker(); 202 203 auto sampler = 7.msecs 204 .intervalStream() 205 .scan((int acc) => acc+1, 0) 206 .sample(10.msecs.intervalStream()) 207 .take(3) 208 .collect((int i) shared { p.atomicOp!"+="(i); }) 209 .withScheduler(worker.getScheduler); 210 211 auto driver = justFrom(() shared { 212 worker.advance(7.msecs); 213 p.atomicLoad.should == 0; 214 worker.timeUntilNextEvent().should == 3.msecs; 215 216 worker.advance(3.msecs); 217 p.atomicLoad.should == 1; 218 worker.timeUntilNextEvent().should == 4.msecs; 219 220 worker.advance(4.msecs); 221 p.atomicLoad.should == 1; 222 worker.timeUntilNextEvent().should == 6.msecs; 223 224 worker.advance(6.msecs); 225 p.atomicLoad.should == 3; 226 worker.timeUntilNextEvent().should == 1.msecs; 227 228 worker.advance(1.msecs); 229 p.atomicLoad.should == 3; 230 worker.timeUntilNextEvent().should == 7.msecs; 231 232 worker.advance(7.msecs); 233 p.atomicLoad.should == 3; 234 worker.timeUntilNextEvent().should == 2.msecs; 235 236 worker.advance(2.msecs); 237 p.atomicLoad.should == 7; 238 worker.timeUntilNextEvent().should == null; 239 }); 240 241 whenAll(sampler, driver).syncWait().isOk.should == true; 242 243 p.should == 7; 244 } 245 246 @("sample.faster") 247 @safe unittest { 248 import core.time; 249 250 shared int p = 0; 251 252 7.msecs 253 .intervalStream() 254 .scan((int acc) => acc+1, 0) 255 .sample(3.msecs.intervalStream()) 256 .take(3) 257 .collect((int i) shared { p.atomicOp!"+="(i); }) 258 .syncWait().isOk.should == true; 259 260 p.should == 6; 261 } 262 263 @("sharedStream") 264 @safe unittest { 265 import concurrency.operations : then, race; 266 267 auto source = sharedStream!int; 268 269 shared int p = 0; 270 271 auto emitter = ThreadSender().then(() shared { 272 source.emit(6); 273 source.emit(12); 274 }); 275 auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); }); 276 277 race(collector, emitter).syncWait().isOk.should == true; 278 279 p.atomicLoad.should == 18; 280 } 281 282 @("throttling.throttleLast") 283 @safe unittest { 284 import core.time; 285 286 shared int p = 0; 287 288 1.msecs 289 .intervalStream() 290 .scan((int acc) => acc+1, 0) 291 .throttleLast(3.msecs) 292 .take(6) 293 .collect((int i) shared { p.atomicOp!"+="(i); }) 294 .syncWait().isOk.should == true; 295 296 p.atomicLoad.shouldBeGreaterThan(40); 297 } 298 299 @("throttling.throttleLast.arrayStream") 300 @safe unittest { 301 import core.time; 302 303 shared int p = 0; 304 305 [1,2,3].arrayStream() 306 .throttleLast(30.msecs) 307 .collect((int i) shared { p.atomicOp!"+="(i); }) 308 .syncWait().isOk.should == true; 309 310 p.atomicLoad.should == 3; 311 } 312 313 @("throttling.throttleLast.exception") 314 @safe unittest { 315 import core.time; 316 317 1.msecs 318 .intervalStream() 319 .throttleLast(10.msecs) 320 .collect(() shared { throw new Exception("Bla"); }) 321 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 322 } 323 324 @("throttling.throttleLast.thread") 325 @safe unittest { 326 import core.time; 327 328 shared int p = 0; 329 330 1.msecs 331 .intervalStream() 332 .via(ThreadSender()) 333 .scan((int acc) => acc+1, 0) 334 .throttleLast(3.msecs) 335 .take(6) 336 .collect((int i) shared { p.atomicOp!"+="(i); }) 337 .syncWait().isOk.should == true; 338 339 p.atomicLoad.shouldBeGreaterThan(40); 340 } 341 342 @("throttling.throttleLast.thread.arrayStream") 343 @safe unittest { 344 import core.time; 345 346 shared int p = 0; 347 348 [1,2,3].arrayStream() 349 .via(ThreadSender()) 350 .throttleLast(30.msecs) 351 .collect((int i) shared { p.atomicOp!"+="(i); }) 352 .syncWait().isOk.should == true; 353 354 p.atomicLoad.should == 3; 355 } 356 357 @("throttling.throttleLast.thread.exception") 358 @safe unittest { 359 import core.time; 360 361 1.msecs 362 .intervalStream() 363 .via(ThreadSender()) 364 .throttleLast(10.msecs) 365 .collect(() shared { throw new Exception("Bla"); }) 366 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 367 } 368 369 @("throttling.throttleFirst") 370 @safe unittest { 371 import core.time; 372 import concurrency.scheduler : ManualTimeWorker; 373 import concurrency.operations : withScheduler, whenAll; 374 import concurrency.sender : justFrom; 375 376 shared int p = 0; 377 auto worker = new shared ManualTimeWorker(); 378 379 auto throttled = 1.msecs 380 .intervalStream() 381 .scan((int acc) => acc+1, 0) 382 .throttleFirst(3.msecs) 383 .take(2) 384 .collect((int i) shared { p.atomicOp!"+="(i); }) 385 .withScheduler(worker.getScheduler); 386 387 auto driver = justFrom(() shared { 388 p.atomicLoad.should == 0; 389 390 worker.advance(1.msecs); 391 p.atomicLoad.should == 1; 392 393 worker.advance(1.msecs); 394 p.atomicLoad.should == 1; 395 396 worker.advance(1.msecs); 397 p.atomicLoad.should == 1; 398 399 worker.advance(1.msecs); 400 p.atomicLoad.should == 5; 401 402 worker.timeUntilNextEvent().should == null; 403 }); 404 whenAll(throttled, driver).syncWait().isOk.should == true; 405 406 p.should == 5; 407 } 408 409 @("throttling.debounce") 410 @safe unittest { 411 import core.time; 412 import concurrency.scheduler : ManualTimeWorker; 413 import concurrency.operations : withScheduler, whenAll; 414 import concurrency.sender : justFrom; 415 416 shared int p = 0; 417 auto worker = new shared ManualTimeWorker(); 418 auto source = sharedStream!int; 419 420 auto throttled = source 421 .debounce(3.msecs) 422 .take(2) 423 .collect((int i) shared { p.atomicOp!"+="(i); }) 424 .withScheduler(worker.getScheduler); 425 426 auto driver = justFrom(() shared { 427 source.emit(1); 428 p.atomicLoad.should == 0; 429 worker.timeUntilNextEvent().should == 3.msecs; 430 431 worker.advance(3.msecs); 432 p.atomicLoad.should == 1; 433 434 source.emit(2); 435 p.atomicLoad.should == 1; 436 worker.timeUntilNextEvent().should == 3.msecs; 437 438 source.emit(3); 439 p.atomicLoad.should == 1; 440 worker.timeUntilNextEvent().should == 3.msecs; 441 442 worker.advance(1.msecs); 443 p.atomicLoad.should == 1; 444 worker.timeUntilNextEvent().should == 2.msecs; 445 446 source.emit(4); 447 p.atomicLoad.should == 1; 448 worker.timeUntilNextEvent().should == 3.msecs; 449 450 worker.advance(3.msecs); 451 p.atomicLoad.should == 5; 452 453 worker.timeUntilNextEvent().should == null; 454 }); 455 whenAll(throttled, driver).syncWait().isOk.should == true; 456 457 p.should == 5; 458 } 459 460 @("slide") 461 @safe unittest { 462 import std.stdio; 463 import std.functional : toDelegate; 464 import std.algorithm : sum; 465 shared int p; 466 467 [1,2,3,4,5,6,7].arrayStream 468 .slide(3) 469 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 470 .syncWait.isOk.should == true; 471 472 p.should == 60; 473 474 [1,2].arrayStream 475 .slide(3) 476 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 477 .syncWait.isOk.should == true; 478 479 p.should == 60; 480 } 481 482 @("slide.step") 483 @safe unittest { 484 import std.stdio; 485 import std.functional : toDelegate; 486 import std.algorithm : sum; 487 shared int p; 488 489 [1,2,3,4,5,6,7].arrayStream 490 .slide(3, 2) 491 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 492 .syncWait.isOk.should == true; 493 494 p.should == 36; 495 496 [1,2].arrayStream 497 .slide(2, 2) 498 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 499 .syncWait.isOk.should == true; 500 501 p.should == 39; 502 } 503 504 @("toList.arrayStream") 505 @safe unittest { 506 [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3]; 507 } 508 509 @("toList.arrayStream.whenAll") 510 @safe unittest { 511 import concurrency.operations : withScheduler, whenAll; 512 import std.typecons : tuple; 513 auto s1 = [1,2,3].arrayStream.toList; 514 auto s2 = [2,3,4].arrayStream.toList; 515 whenAll(s1,s2).syncWait.value.should == tuple([1,2,3],[2,3,4]); 516 }