1 module ut.concurrency.stream; 2 3 import concurrency.stream; 4 import concurrency; 5 import unit_threaded; 6 import concurrency.stoptoken; 7 import core.atomic; 8 import concurrency.thread : ThreadSender; 9 10 // TODO: it would be good if we can get the Sender .collect returns to be scoped if the delegates are. 11 12 @("arrayStream") 13 @safe unittest { 14 shared int p = 0; 15 [1,2,3].arrayStream().collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 16 p.should == 6; 17 } 18 19 @("timerStream") 20 @safe unittest { 21 import concurrency.operations : withStopSource, whenAll, via; 22 import core.time : msecs; 23 shared int s = 0, f = 0; 24 auto source = new shared StopSource(); 25 auto slow = 10.msecs.intervalStream().collect(() shared { s.atomicOp!"+="(1); source.stop(); }).withStopSource(source); 26 auto fast = 3.msecs.intervalStream().collect(() shared { f.atomicOp!"+="(1); }); 27 whenAll(slow, fast).syncWait(source).isCancelled.should == true; 28 s.should == 1; 29 f.shouldBeGreaterThan(1); 30 } 31 32 33 @("infiniteStream.stop") 34 @safe unittest { 35 import concurrency.operations : withStopSource; 36 shared int g = 0; 37 auto source = new shared StopSource(); 38 infiniteStream(5).collect((int n) shared { 39 if (g < 14) 40 g.atomicOp!"+="(n); 41 else 42 source.stop(); 43 }) 44 .withStopSource(source).syncWait.isCancelled.should == true; 45 g.should == 15; 46 }; 47 48 @("infiniteStream.take") 49 @safe unittest { 50 shared int g = 0; 51 infiniteStream(4).take(5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 52 g.should == 20; 53 } 54 55 @("iotaStream") 56 @safe unittest { 57 import concurrency.stoptoken; 58 shared int g = 0; 59 iotaStream(0, 5).collect((int n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 60 g.should == 10; 61 } 62 63 @("loopStream") 64 @safe unittest { 65 struct Loop { 66 size_t b,e; 67 void loop(DG, StopToken)(DG emit, StopToken stopToken) { 68 foreach(i; b..e) 69 emit(i); 70 } 71 } 72 shared int g = 0; 73 Loop(0,4).loopStream!size_t.collect((size_t n) shared { g.atomicOp!"+="(n); }).syncWait().isOk.should == true; 74 g.should == 6; 75 } 76 77 @("toStreamObject") 78 @safe unittest { 79 import core.atomic : atomicOp; 80 81 static StreamObjectBase!int getStream() { 82 return [1,2,3].arrayStream().toStreamObject(); 83 } 84 shared int p; 85 86 getStream().collect((int i) @safe shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 87 88 p.should == 6; 89 } 90 91 92 @("toStreamObject.take") 93 @safe unittest { 94 static StreamObjectBase!int getStream() { 95 return [1,2,3].arrayStream().toStreamObject(); 96 } 97 shared int p; 98 99 getStream().take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait().isOk.should == true; 100 101 p.should == 3; 102 } 103 104 @("toStreamObject.void") 105 @safe unittest { 106 import core.time : msecs; 107 shared bool p = false; 108 109 1.msecs.intervalStream().toStreamObject().take(1).collect(() shared { p = true; }).syncWait().isOk.should == true; 110 111 p.should == true; 112 } 113 114 @("transform.int.double") 115 @safe unittest { 116 shared int p = 0; 117 [1,2,3].arrayStream().transform((int i) => i * 3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 118 p.should == 18; 119 } 120 121 @("transform.int.bool") 122 @safe unittest { 123 shared int p = 0; 124 [1,2,3].arrayStream().transform((int i) => i % 2 == 0).collect((bool t) shared { if (t) p.atomicOp!"+="(1); }).syncWait().isOk.should == true; 125 p.should == 1; 126 } 127 128 @("scan") 129 @safe unittest { 130 shared int p = 0; 131 [1,2,3].arrayStream().scan((int acc, int i) => acc += i, 0).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 132 p.should == 10; 133 } 134 135 @("scan.void-value") 136 @safe unittest { 137 import core.time; 138 shared int p = 0; 139 5.msecs.intervalStream.scan((int acc) => acc += 1, 0).take(3).collect((int t) shared { p.atomicOp!"+="(t); }).syncWait().isOk.should == true; 140 p.should == 6; 141 } 142 143 @("take.enough") 144 @safe unittest { 145 shared int p = 0; 146 147 [1,2,3].arrayStream.take(2).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 148 p.should == 3; 149 } 150 151 @("take.too-few") 152 @safe unittest { 153 shared int p = 0; 154 155 [1,2,3].arrayStream.take(4).collect((int i) shared { p.atomicOp!"+="(i); }).syncWait.isOk.should == true; 156 p.should == 6; 157 } 158 159 @("take.donestream") 160 @safe unittest { 161 doneStream().take(1).collect(()shared{}).syncWait.isCancelled.should == true; 162 } 163 164 @("take.errorstream") 165 @safe unittest { 166 errorStream(new Exception("Too bad")).take(1).collect(()shared{}).syncWait.assumeOk.shouldThrowWithMessage("Too bad"); 167 } 168 169 @("sample.trigger.stop") 170 @safe unittest { 171 import core.time; 172 auto sampler = 7.msecs.intervalStream() 173 .scan((int acc) => acc+1, 0) 174 .sample(10.msecs.intervalStream().take(3)) 175 .collect((int i) shared {}) 176 .syncWait().isOk.should == true; 177 } 178 179 @("sample.slower") 180 @safe unittest { 181 import core.time; 182 import concurrency.operations : withScheduler, whenAll; 183 import concurrency.sender : justFrom; 184 185 shared int p = 0; 186 import concurrency.scheduler : ManualTimeWorker; 187 188 auto worker = new shared ManualTimeWorker(); 189 190 auto sampler = 7.msecs 191 .intervalStream() 192 .scan((int acc) => acc+1, 0) 193 .sample(10.msecs.intervalStream()) 194 .take(3) 195 .collect((int i) shared { p.atomicOp!"+="(i); }) 196 .withScheduler(worker.getScheduler); 197 198 auto driver = justFrom(() shared { 199 worker.advance(7.msecs); 200 p.atomicLoad.should == 0; 201 worker.timeUntilNextEvent().should == 3.msecs; 202 203 worker.advance(3.msecs); 204 p.atomicLoad.should == 1; 205 worker.timeUntilNextEvent().should == 4.msecs; 206 207 worker.advance(4.msecs); 208 p.atomicLoad.should == 1; 209 worker.timeUntilNextEvent().should == 6.msecs; 210 211 worker.advance(6.msecs); 212 p.atomicLoad.should == 3; 213 worker.timeUntilNextEvent().should == 1.msecs; 214 215 worker.advance(1.msecs); 216 p.atomicLoad.should == 3; 217 worker.timeUntilNextEvent().should == 7.msecs; 218 219 worker.advance(7.msecs); 220 p.atomicLoad.should == 3; 221 worker.timeUntilNextEvent().should == 2.msecs; 222 223 worker.advance(2.msecs); 224 p.atomicLoad.should == 7; 225 worker.timeUntilNextEvent().should == null; 226 }); 227 228 whenAll(sampler, driver).syncWait().isOk.should == true; 229 230 p.should == 7; 231 } 232 233 @("sample.faster") 234 @safe unittest { 235 import core.time; 236 237 shared int p = 0; 238 239 7.msecs 240 .intervalStream() 241 .scan((int acc) => acc+1, 0) 242 .sample(3.msecs.intervalStream()) 243 .take(3) 244 .collect((int i) shared { p.atomicOp!"+="(i); }) 245 .syncWait().isOk.should == true; 246 247 p.should == 6; 248 } 249 250 @("sharedStream") 251 @safe unittest { 252 import concurrency.operations : then, race; 253 254 auto source = sharedStream!int; 255 256 shared int p = 0; 257 258 auto emitter = ThreadSender().then(() shared { 259 source.emit(6); 260 source.emit(12); 261 }); 262 auto collector = source.collect((int t) shared { p.atomicOp!"+="(t); }); 263 264 race(collector, emitter).syncWait().isOk.should == true; 265 266 p.atomicLoad.should == 18; 267 } 268 269 @("throttling.throttleLast") 270 @safe unittest { 271 import core.time; 272 273 shared int p = 0; 274 275 1.msecs 276 .intervalStream() 277 .scan((int acc) => acc+1, 0) 278 .throttleLast(3.msecs) 279 .take(6) 280 .collect((int i) shared { p.atomicOp!"+="(i); }) 281 .syncWait().isOk.should == true; 282 283 p.atomicLoad.shouldBeGreaterThan(40); 284 } 285 286 @("throttling.throttleLast.arrayStream") 287 @safe unittest { 288 import core.time; 289 290 shared int p = 0; 291 292 [1,2,3].arrayStream() 293 .throttleLast(30.msecs) 294 .collect((int i) shared { p.atomicOp!"+="(i); }) 295 .syncWait().isOk.should == true; 296 297 p.atomicLoad.should == 3; 298 } 299 300 @("throttling.throttleLast.exception") 301 @safe unittest { 302 import core.time; 303 304 1.msecs 305 .intervalStream() 306 .throttleLast(10.msecs) 307 .collect(() shared { throw new Exception("Bla"); }) 308 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 309 } 310 311 @("throttling.throttleLast.thread") 312 @safe unittest { 313 import core.time; 314 315 shared int p = 0; 316 317 1.msecs 318 .intervalStream() 319 .via(ThreadSender()) 320 .scan((int acc) => acc+1, 0) 321 .throttleLast(3.msecs) 322 .take(6) 323 .collect((int i) shared { p.atomicOp!"+="(i); }) 324 .syncWait().isOk.should == true; 325 326 p.atomicLoad.shouldBeGreaterThan(40); 327 } 328 329 @("throttling.throttleLast.thread.arrayStream") 330 @safe unittest { 331 import core.time; 332 333 shared int p = 0; 334 335 [1,2,3].arrayStream() 336 .via(ThreadSender()) 337 .throttleLast(30.msecs) 338 .collect((int i) shared { p.atomicOp!"+="(i); }) 339 .syncWait().isOk.should == true; 340 341 p.atomicLoad.should == 3; 342 } 343 344 @("throttling.throttleLast.thread.exception") 345 @safe unittest { 346 import core.time; 347 348 1.msecs 349 .intervalStream() 350 .via(ThreadSender()) 351 .throttleLast(10.msecs) 352 .collect(() shared { throw new Exception("Bla"); }) 353 .syncWait.assumeOk.shouldThrowWithMessage("Bla"); 354 } 355 356 @("throttling.throttleFirst") 357 @safe unittest { 358 import core.time; 359 import concurrency.scheduler : ManualTimeWorker; 360 import concurrency.operations : withScheduler, whenAll; 361 import concurrency.sender : justFrom; 362 363 shared int p = 0; 364 auto worker = new shared ManualTimeWorker(); 365 366 auto throttled = 1.msecs 367 .intervalStream() 368 .scan((int acc) => acc+1, 0) 369 .throttleFirst(3.msecs) 370 .take(2) 371 .collect((int i) shared { p.atomicOp!"+="(i); }) 372 .withScheduler(worker.getScheduler); 373 374 auto driver = justFrom(() shared { 375 p.atomicLoad.should == 0; 376 377 worker.advance(1.msecs); 378 p.atomicLoad.should == 1; 379 380 worker.advance(1.msecs); 381 p.atomicLoad.should == 1; 382 383 worker.advance(1.msecs); 384 p.atomicLoad.should == 1; 385 386 worker.advance(1.msecs); 387 p.atomicLoad.should == 5; 388 389 worker.timeUntilNextEvent().should == null; 390 }); 391 whenAll(throttled, driver).syncWait().isOk.should == true; 392 393 p.should == 5; 394 } 395 396 @("throttling.debounce") 397 @safe unittest { 398 import core.time; 399 import concurrency.scheduler : ManualTimeWorker; 400 import concurrency.operations : withScheduler, whenAll; 401 import concurrency.sender : justFrom; 402 403 shared int p = 0; 404 auto worker = new shared ManualTimeWorker(); 405 auto source = sharedStream!int; 406 407 auto throttled = source 408 .debounce(3.msecs) 409 .take(2) 410 .collect((int i) shared { p.atomicOp!"+="(i); }) 411 .withScheduler(worker.getScheduler); 412 413 auto driver = justFrom(() shared { 414 source.emit(1); 415 p.atomicLoad.should == 0; 416 worker.timeUntilNextEvent().should == 3.msecs; 417 418 worker.advance(3.msecs); 419 p.atomicLoad.should == 1; 420 421 source.emit(2); 422 p.atomicLoad.should == 1; 423 worker.timeUntilNextEvent().should == 3.msecs; 424 425 source.emit(3); 426 p.atomicLoad.should == 1; 427 worker.timeUntilNextEvent().should == 3.msecs; 428 429 worker.advance(1.msecs); 430 p.atomicLoad.should == 1; 431 worker.timeUntilNextEvent().should == 2.msecs; 432 433 source.emit(4); 434 p.atomicLoad.should == 1; 435 worker.timeUntilNextEvent().should == 3.msecs; 436 437 worker.advance(3.msecs); 438 p.atomicLoad.should == 5; 439 440 worker.timeUntilNextEvent().should == null; 441 }); 442 whenAll(throttled, driver).syncWait().isOk.should == true; 443 444 p.should == 5; 445 } 446 447 @("slide") 448 @safe unittest { 449 import std.stdio; 450 import std.functional : toDelegate; 451 import std.algorithm : sum; 452 shared int p; 453 454 [1,2,3,4,5,6,7].arrayStream 455 .slide(3) 456 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 457 .syncWait.isOk.should == true; 458 459 p.should == 60; 460 461 [1,2].arrayStream 462 .slide(3) 463 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 464 .syncWait.isOk.should == true; 465 466 p.should == 60; 467 } 468 469 @("slide.step") 470 @safe unittest { 471 import std.stdio; 472 import std.functional : toDelegate; 473 import std.algorithm : sum; 474 shared int p; 475 476 [1,2,3,4,5,6,7].arrayStream 477 .slide(3, 2) 478 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 479 .syncWait.isOk.should == true; 480 481 p.should == 36; 482 483 [1,2].arrayStream 484 .slide(2, 2) 485 .collect((int[] a) @safe shared { p.atomicOp!"+="(a.sum); }) 486 .syncWait.isOk.should == true; 487 488 p.should == 39; 489 } 490 491 @("toList.arrayStream") 492 @safe unittest { 493 [1,2,3].arrayStream.toList.syncWait.value.should == [1,2,3]; 494 }