1 module concurrency.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 import concurrency.sender : isSender, OpType; 6 import concepts; 7 import std.traits : hasFunctionAttributes; 8 9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender. 10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called. 11 12 template CollectDelegate(ElementType) { 13 static if (is(ElementType == void)) { 14 alias CollectDelegate = void delegate() @safe shared; 15 } else { 16 alias CollectDelegate = void delegate(ElementType) @safe shared; 17 } 18 } 19 20 /// checks that T is a Stream 21 void checkStream(T)() { 22 import std.traits : ReturnType; 23 alias DG = CollectDelegate!(T.ElementType); 24 static if (is(typeof(T.collect!DG))) 25 alias Sender = ReturnType!(T.collect!(DG)); 26 else 27 alias Sender = ReturnType!(T.collect); 28 static assert (models!(Sender, isSender)); 29 } 30 enum isStream(T) = is(typeof(checkStream!T)); 31 32 /// A polymorphic stream with elements of type T 33 interface StreamObjectBase(T) { 34 import concurrency.sender : SenderObjectBase; 35 alias ElementType = T; 36 static assert (models!(typeof(this), isStream)); 37 alias DG = CollectDelegate!(ElementType); 38 39 SenderObjectBase!void collect(DG dg) @safe; 40 } 41 42 /// A class extending from StreamObjectBase that wraps any Stream 43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) { 44 import concurrency.receiver : ReceiverObjectBase; 45 static assert (models!(typeof(this), isStream)); 46 private Stream stream; 47 this(Stream stream) { 48 this.stream = stream; 49 } 50 alias DG = CollectDelegate!(Stream.ElementType); 51 52 SenderObjectBase!void collect(DG dg) @safe { 53 import concurrency.sender : toSenderObject; 54 return stream.collect(dg).toSenderObject(); 55 } 56 } 57 58 /// Converts any Stream to a polymorphic StreamObject 59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) { 60 return new StreamObjectImpl!(Stream)(stream); 61 } 62 63 /* 64 catch? 65 combineLatest 66 count 67 debounce 68 distinctUntilChanged 69 drop 70 dropWhile 71 filter 72 first 73 firstOrNull 74 flatMapConcat 75 flatMapLatest 76 flatMapMerge 77 fold 78 map 79 mapLatest 80 merge 81 onEach 82 onEmpty 83 onStart 84 onSubscription 85 reduce (fold with no seed) 86 retry 87 retryWhen 88 runningReduce 89 sample 90 scan (like runningReduce but with initial value) 91 take 92 takeWhile 93 toList 94 transform 95 transformLatest 96 zip 97 */ 98 99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop 100 template loopStream(E) { 101 alias DG = CollectDelegate!(E); 102 auto loopStream(T)(T t) { 103 static struct LoopStream { 104 static assert(models!(typeof(this), isStream)); 105 alias ElementType = E; 106 static struct LoopOp(Receiver) { 107 T t; 108 DG dg; 109 Receiver receiver; 110 @disable this(ref return scope typeof(this) rhs); 111 @disable this(this); 112 this(T t, DG dg, Receiver receiver) { 113 this.t = t; 114 this.dg = dg; 115 this.receiver = receiver; 116 } 117 void start() @trusted nothrow scope { 118 try { 119 t.loop(dg, receiver.getStopToken); 120 } catch (Exception e) { 121 receiver.setError(e); 122 } 123 if (receiver.getStopToken().isStopRequested) 124 receiver.setDone(); 125 else 126 receiver.setValueOrError(); 127 } 128 } 129 static struct LoopSender { 130 alias Value = void; 131 T t; 132 DG dg; 133 auto connect(Receiver)(return Receiver receiver) @safe scope return { 134 // ensure NRVO 135 auto op = LoopOp!(Receiver)(t, dg, receiver); 136 return op; 137 } 138 } 139 T t; 140 auto collect(DG dg) @safe { 141 return LoopSender(t, dg); 142 } 143 } 144 return LoopStream(t); 145 } 146 } 147 148 /// Stream that emit the same value until cancelled 149 auto infiniteStream(T)(T t) { 150 alias DG = CollectDelegate!(T); 151 struct Loop { 152 T val; 153 void loop(StopToken)(DG emit, StopToken stopToken) { 154 while(!stopToken.isStopRequested) 155 emit(val); 156 } 157 } 158 return Loop(t).loopStream!T; 159 } 160 161 /// Stream that emits from start..end or until cancelled 162 auto iotaStream(T)(T start, T end) { 163 alias DG = CollectDelegate!(T); 164 struct Loop { 165 T b,e; 166 void loop(StopToken)(DG emit, StopToken stopToken) { 167 foreach(i; b..e) { 168 emit(i); 169 if (stopToken.isStopRequested) 170 break; 171 } 172 } 173 } 174 return Loop(start, end).loopStream!T; 175 } 176 177 /// Stream that emits each value from the array or until cancelled 178 auto arrayStream(T)(T[] arr) { 179 alias DG = CollectDelegate!(T); 180 struct Loop { 181 T[] arr; 182 void loop(StopToken)(DG emit, StopToken stopToken) @safe { 183 foreach(item; arr) { 184 emit(item); 185 if (stopToken.isStopRequested) 186 break; 187 } 188 } 189 } 190 return Loop(arr).loopStream!T; 191 } 192 193 import core.time : Duration; 194 195 auto intervalStream(Duration duration) { 196 alias DG = CollectDelegate!(void); 197 static struct ItemReceiver(Op) { 198 Op* op; 199 void setValue() @safe { 200 if (op.receiver.getStopToken.isStopRequested) { 201 op.receiver.setDone(); 202 return; 203 } 204 try { 205 op.dg(); 206 if (op.receiver.getStopToken.isStopRequested) { 207 op.receiver.setDone(); 208 return; 209 } 210 op.start(); 211 } catch (Exception e) { 212 op.receiver.setError(e); 213 } 214 } 215 void setDone() @safe nothrow { 216 op.receiver.setDone(); 217 } 218 void setError(Exception e) @safe nothrow { 219 op.receiver.setError(e); 220 } 221 auto getStopToken() @safe { 222 return op.receiver.getStopToken(); 223 } 224 auto getScheduler() @safe { 225 return op.receiver.getScheduler(); 226 } 227 } 228 static struct Op(Receiver) { 229 import std.traits : ReturnType; 230 Duration duration; 231 DG dg; 232 Receiver receiver; 233 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 234 alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this))); 235 Op op; 236 @disable this(this); 237 @disable this(ref return scope typeof(this) rhs); 238 this(Duration duration, DG dg, Receiver receiver) { 239 this.duration = duration; 240 this.dg = dg; 241 this.receiver = receiver; 242 } 243 void start() @trusted nothrow { 244 try { 245 op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); 246 op.start(); 247 } catch (Exception e) { 248 receiver.setError(e); 249 } 250 } 251 } 252 static struct Sender { 253 alias Value = void; 254 Duration duration; 255 DG dg; 256 auto connect(Receiver)(return Receiver receiver) @safe scope return { 257 // ensure NRVO 258 auto op = Op!(Receiver)(duration, dg, receiver); 259 return op; 260 } 261 } 262 static struct IntervalStream { 263 alias ElementType = void; 264 Duration duration; 265 auto collect(DG dg) @safe { 266 return Sender(duration, dg); 267 } 268 } 269 return IntervalStream(duration); 270 } 271 272 template StreamProperties(Stream) { 273 import std.traits : ReturnType; 274 alias ElementType = Stream.ElementType; 275 alias DG = CollectDelegate!(ElementType); 276 alias Sender = ReturnType!(Stream.collect); 277 alias Value = Sender.Value; 278 } 279 280 /// takes the first n values from a stream or until cancelled 281 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) { 282 alias Properties = StreamProperties!Stream; 283 static struct TakeReceiver(Receiver) { 284 Receiver receiver; 285 StopSource stopSource; 286 static if (is(Properties.Sender.Value == void)) 287 void setValue() @safe { receiver.setValue(); } 288 else 289 void setValue(Properties.Sender.Value e) @safe { receiver.setValue(e); } 290 void setDone() nothrow @safe { 291 import concurrency.receiver : setValueOrError; 292 static if (is(Properties.Sender.Value == void)) { 293 if (stopSource.isStopRequested) 294 receiver.setValueOrError(); 295 else 296 receiver.setDone(); 297 } else 298 receiver.setDone(); 299 } 300 void setError(Exception e) nothrow @safe { 301 receiver.setError(e); 302 } 303 mixin ForwardExtensionPoints!receiver; 304 } 305 static struct TakeOp(Receiver) { 306 import concurrency.operations : withStopSource; 307 import std.traits : ReturnType; 308 alias SS = ReturnType!(withStopSource!(Properties.Sender)); 309 alias Op = OpType!(SS, TakeReceiver!Receiver); 310 size_t n; 311 Properties.DG dg; 312 StopSource stopSource; 313 Op op; 314 @disable this(ref return scope typeof(this) rhs); 315 @disable this(this); 316 private this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope { 317 stopSource = new StopSource(); 318 this.dg = dg; 319 this.n = n; 320 op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!Receiver(receiver, stopSource)); 321 } 322 static if (is(Properties.ElementType == void)) { 323 private void item() { 324 dg(); 325 /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that 326 n--; 327 if (n == 0) 328 stopSource.stop(); 329 } 330 } else { 331 private void item(Properties.ElementType t) { 332 dg(t); 333 n--; 334 if (n == 0) 335 stopSource.stop(); 336 } 337 } 338 void start() nothrow @trusted scope { 339 op.start(); 340 } 341 } 342 import std.exception : enforce; 343 enforce(n > 0, "cannot take 0"); 344 return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp)(stream, n); 345 } 346 347 auto transform(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) { 348 import std.traits : ReturnType; 349 alias Properties = StreamProperties!Stream; 350 alias DG = CollectDelegate!(ReturnType!Fun); 351 static struct TransformStreamOp(Receiver) { 352 alias Op = OpType!(Properties.Sender, Receiver); 353 Fun fun; 354 DG dg; 355 Op op; 356 @disable this(ref return scope typeof(this) rhs); 357 @disable this(this); 358 this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted { 359 this.fun = fun; 360 this.dg = dg; 361 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 362 } 363 static if (is(Properties.ElementType == void)) 364 void item() { 365 dg(fun()); 366 } 367 else 368 void item(Properties.ElementType t) { 369 dg(fun(t)); 370 } 371 void start() nothrow @safe { 372 op.start(); 373 } 374 } 375 return fromStreamOp!(ReturnType!Fun, Properties.Value, TransformStreamOp)(stream, fun); 376 } 377 378 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) { 379 alias DG = CollectDelegate!(StreamElementType); 380 static struct FromStreamSender { 381 alias Value = SenderValue; 382 Args args; 383 DG dg; 384 auto connect(Receiver)(return Receiver receiver) @safe scope return { 385 // ensure NRVO 386 auto op = Op!(Receiver)(args, dg, receiver); 387 return op; 388 } 389 } 390 static struct FromStream { 391 static assert(models!(typeof(this), isStream)); 392 alias ElementType = StreamElementType; 393 Args args; 394 auto collect(DG dg) @safe { 395 return FromStreamSender(args, dg); 396 } 397 } 398 return FromStream(args); 399 } 400 401 /// Applies an accumulator to each value from the source 402 auto scan(Stream, ScanFn, Seed)(Stream stream, scope ScanFn scanFn, Seed seed) if (models!(Stream, isStream)) { 403 import std.traits : ReturnType; 404 alias Properties = StreamProperties!Stream; 405 alias DG = CollectDelegate!(Seed); 406 static struct ScanStreamOp(Receiver) { 407 alias Op = OpType!(Properties.Sender, Receiver); 408 ScanFn scanFn; 409 Seed acc; 410 DG dg; 411 Op op; 412 @disable this(ref return scope typeof(this) rhs); 413 @disable this(this); 414 this(Stream stream, ScanFn scanFn, Seed seed, DG dg, Receiver receiver) @trusted { 415 this.scanFn = scanFn; 416 this.acc = seed; 417 this.dg = dg; 418 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 419 } 420 static if (is(Properties.ElementType == void)) 421 void item() { 422 acc = scanFn(acc); 423 dg(acc); 424 } 425 else 426 void item(Properties.ElementType t) { 427 acc = scanFn(acc, t); 428 dg(acc); 429 } 430 void start() nothrow @safe { 431 op.start(); 432 } 433 } 434 return fromStreamOp!(Seed, Properties.Value, ScanStreamOp)(stream, scanFn, seed); 435 } 436 437 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored 438 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) { 439 import concurrency.operations.raceall; 440 import concurrency.bitfield : SharedBitField; 441 enum Flags : size_t { 442 locked = 0x1, 443 valid = 0x2 444 } 445 alias PropertiesBase = StreamProperties!StreamBase; 446 alias PropertiesTrigger = StreamProperties!StreamTrigger; 447 static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly"); 448 alias DG = PropertiesBase.DG; 449 static struct SampleStreamOp(Receiver) { 450 import std.traits : ReturnType; 451 alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender)); 452 alias Op = OpType!(RaceAllSender, Receiver); 453 DG dg; 454 Op op; 455 PropertiesBase.ElementType element; 456 shared SharedBitField!Flags state; 457 shared size_t sampleState; 458 @disable this(ref return scope inout typeof(this) rhs); 459 @disable this(this); 460 this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope { 461 this.dg = dg; 462 op = raceAll(base.collect(cast(PropertiesBase.DG)&item), 463 trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver); 464 } 465 void item(PropertiesBase.ElementType t) { 466 import core.atomic : atomicOp; 467 with(state.lock(Flags.valid)) { 468 element = t; 469 } 470 } 471 void trigger() { 472 import core.atomic : atomicOp; 473 with(state.lock()) { 474 if (was(Flags.valid)) { 475 auto localElement = element; 476 release(Flags.valid); 477 dg(localElement); 478 } 479 } 480 } 481 void start() { 482 op.start(); 483 } 484 } 485 return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp)(base, trigger); 486 } 487 488 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) { 489 alias Properties = StreamProperties!Stream; 490 alias DG = Properties.DG; 491 static struct ViaStreamOp(Receiver) { 492 import std.traits : ReturnType; 493 import concurrency.operations.via : senderVia = via; 494 alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver); 495 Op op; 496 @disable this(ref return scope typeof(this) rhs); 497 @disable this(this); 498 this(Stream stream, Sender sender, DG dg, Receiver receiver) { 499 op = stream.collect(dg).senderVia(sender).connect(receiver); 500 } 501 void start() nothrow @safe { 502 op.start(); 503 } 504 } 505 return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender); 506 } 507 508 auto doneStream() { 509 alias DG = CollectDelegate!void; 510 static struct DoneStreamOp(Receiver) { 511 Receiver receiver; 512 @disable this(ref return scope typeof(this) rhs); 513 @disable this(this); 514 this(DG dg, Receiver receiver) { 515 this.receiver = receiver; 516 } 517 void start() nothrow @safe { 518 receiver.setDone(); 519 } 520 } 521 return fromStreamOp!(void, void, DoneStreamOp)(); 522 } 523 524 auto errorStream(Exception e) { 525 alias DG = CollectDelegate!void; 526 static struct ErrorStreamOp(Receiver) { 527 Exception e; 528 Receiver receiver; 529 @disable this(ref return scope typeof(this) rhs); 530 @disable this(this); 531 this(Exception e, DG dg, Receiver receiver) { 532 this.e = e; 533 this.receiver = receiver; 534 } 535 void start() nothrow @safe { 536 receiver.setError(e); 537 } 538 } 539 return fromStreamOp!(void, void, ErrorStreamOp)(e); 540 } 541 542 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection. 543 auto sharedStream(T)() { 544 import concurrency.slist; 545 alias DG = CollectDelegate!(T); 546 return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG)); 547 } 548 549 shared struct SharedStream(T) { 550 alias ElementType = T; 551 alias SubscriberDG = void delegate(T) nothrow @safe shared; 552 import concurrency.slist; 553 private { 554 alias DG = CollectDelegate!T; 555 static struct Op(Receiver) { 556 shared SharedStream!T source; 557 DG dg; 558 Receiver receiver; 559 StopCallback cb; 560 @disable this(ref return scope typeof(this) rhs); 561 @disable this(this); 562 void start() nothrow @trusted { 563 auto stopToken = receiver.getStopToken(); 564 cb = stopToken.onStop(&(cast(shared)this).onStop); 565 if (stopToken.isStopRequested) { 566 cb.dispose(); 567 receiver.setDone(); 568 } else { 569 source.add(&(cast(shared)this).onItem); 570 } 571 } 572 void onStop() nothrow @safe shared { 573 with(unshared) { 574 source.remove(&this.onItem); 575 receiver.setDone(); 576 } 577 } 578 void onItem(T element) nothrow @safe shared { 579 with(unshared) { 580 try { 581 dg(element); 582 } catch (Exception e) { 583 source.remove(&this.onItem); 584 cb.dispose(); 585 receiver.setError(e); 586 } 587 } 588 } 589 private auto ref unshared() nothrow @trusted shared { 590 return cast()this; 591 } 592 } 593 static struct SharedStreamSender { 594 alias Value = void; 595 shared SharedStream!T source; 596 DG dg; 597 auto connect(Receiver)(return Receiver receiver) @safe scope return { 598 // ensure NRVO 599 auto op = Op!(Receiver)(source, dg, receiver); 600 return op; 601 } 602 } 603 shared SList!SubscriberDG dgs; 604 } 605 this(shared SList!SubscriberDG dgs) { 606 this.dgs = dgs; 607 } 608 void emit(T t) nothrow @trusted { 609 foreach(dg; dgs[]) 610 dg(t); 611 } 612 private void remove(SubscriberDG dg) nothrow @trusted { 613 dgs.remove(dg); 614 } 615 private void add(SubscriberDG dg) nothrow @trusted { 616 dgs.pushBack(dg); 617 } 618 auto collect(DG dg) @safe { 619 return SharedStreamSender(this, dg); 620 } 621 } 622 623 template SchedulerType(Receiver) { 624 import std.traits : ReturnType; 625 alias SchedulerType = ReturnType!(Receiver.getScheduler); 626 } 627 628 private enum ThrottleFlags : size_t { 629 locked = 0x1, 630 value_produced = 0x2, 631 doneOrError_produced = 0x4, 632 timerArmed = 0x8, 633 timerRearming = 0x10, 634 counter = 0x20 635 } 636 637 enum ThrottleEmitLogic: uint { 638 first, // emit the first item in the window 639 last // emit the last item in the window 640 }; 641 enum ThrottleTimerLogic: uint { 642 noop, // don't reset the timer on new items 643 rearm // reset the timer on new items 644 }; 645 646 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items 647 auto throttleFirst(Stream)(Stream s, Duration d) { 648 return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d); 649 } 650 651 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period 652 auto throttleLast(Stream)(Stream s, Duration d) { 653 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d); 654 } 655 656 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence 657 auto debounce(Stream)(Stream s, Duration d) { 658 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d); 659 } 660 661 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) { 662 import std.traits : ReturnType; 663 import concurrency.bitfield : SharedBitField; 664 import core.atomic : MemoryOrder; 665 alias Properties = StreamProperties!Stream; 666 alias DG = Properties.DG; 667 static struct SenderReceiver(Op) { 668 Op* state; 669 static if (is(Properties.Value == void)) 670 void setValue() { 671 with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 672 state.process(newState); 673 } 674 } 675 else 676 void setValue(Properties.Value value) { 677 with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 678 state.value = value; 679 release(); 680 state.process(newState); 681 } 682 } 683 void setDone() { 684 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 685 state.process(newState); 686 } 687 } 688 void setError(Exception e) nothrow @safe { 689 state.setError(e); 690 } 691 auto getStopToken() { 692 return StopToken(state.stopSource); 693 } 694 auto getScheduler() { 695 return state.receiver.getScheduler(); 696 } 697 } 698 static struct TimerReceiver(Op) { 699 Op* state; 700 void setValue() @safe { 701 with (state.flags.lock()) { 702 if (was(ThrottleFlags.timerRearming)) 703 return; 704 705 static if (!is(Properties.ElementType == void) && emitLogic == ThrottleEmitLogic.last) 706 auto item = state.item; 707 release(ThrottleFlags.timerArmed); 708 static if (emitLogic == ThrottleEmitLogic.last) { 709 static if (!is(Properties.ElementType == void)) 710 state.push(item); 711 else 712 state.push(); 713 } 714 } 715 } 716 void setDone() @safe nothrow { 717 // TODO: would be nice if we can merge in next update... 718 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 719 return; 720 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 721 state.process(newState); 722 } 723 } 724 void setError(Exception e) nothrow @safe { 725 // TODO: would be nice if we can merge in next lock... 726 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 727 return; 728 state.setError(e); 729 } 730 auto getStopToken() { 731 return StopToken(state.timerStopSource); 732 } 733 auto getScheduler() { 734 return state.receiver.getScheduler(); 735 } 736 } 737 template ThrottleStreamOp(Stream) { 738 static struct ThrottleStreamOp(Receiver) { 739 Duration dur; 740 DG dg; 741 Receiver receiver; 742 static if (emitLogic == ThrottleEmitLogic.last) 743 static if (!is(Properties.ElementType == void)) 744 Properties.ElementType item; 745 static if (!is(Properties.Value == void)) 746 Properties.Value value; 747 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 748 StopSource stopSource; 749 StopSource timerStopSource; 750 StopCallback cb; 751 Exception exception; 752 alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this))); 753 alias TimerOp = OpType!(SchedulerAfterSender, TimerReceiver!(typeof(this))); 754 Op op; 755 TimerOp timerOp; 756 shared SharedBitField!ThrottleFlags flags; 757 @disable this(ref return scope inout typeof(this) rhs); 758 @disable this(this); 759 this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope { 760 this.dur = dur; 761 this.dg = dg; 762 this.receiver = receiver; 763 stopSource = new StopSource(); 764 timerStopSource = new StopSource(); 765 op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this))(&this)); 766 } 767 static if (is(Properties.ElementType == void)) { 768 private void onItem() { 769 with (flags.update(ThrottleFlags.timerArmed)) { 770 if ((oldState & ThrottleFlags.timerArmed) == 0) { 771 static if (emitLogic == ThrottleEmitLogic.first) { 772 if (!push(t)) 773 return; 774 } 775 armTimer(); 776 } else { 777 static if (timerLogic == ThrottleTimerLogic.rearm) { 778 // release(); 779 rearmTimer(); 780 } 781 } 782 } 783 } 784 private bool push() { 785 try { 786 dg(); 787 return true; 788 } catch (Exception e) { 789 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 790 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 791 exception = e; 792 } 793 release(); 794 process(newState); 795 } 796 return false; 797 } 798 } 799 } else { 800 private void onItem(Properties.ElementType t) { 801 with (flags.lock(ThrottleFlags.timerArmed)) { 802 static if (emitLogic == ThrottleEmitLogic.last) 803 item = t; 804 release(); 805 if ((oldState & ThrottleFlags.timerArmed) == 0) { 806 static if (emitLogic == ThrottleEmitLogic.first) { 807 if (!push(t)) 808 return; 809 } 810 armTimer(); 811 } else { 812 static if (timerLogic == ThrottleTimerLogic.rearm) { 813 rearmTimer(); 814 } 815 } 816 } 817 } 818 private bool push(Properties.ElementType t) { 819 try { 820 dg(t); 821 return true; 822 } catch (Exception e) { 823 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 824 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 825 exception = e; 826 } 827 release(); 828 process(newState); 829 } 830 return false; 831 } 832 } 833 } 834 private void setError(Exception e) { 835 with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 836 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 837 exception = e; 838 } 839 release(); 840 process(newState); 841 } 842 } 843 void armTimer() { 844 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this)); 845 timerOp.start(); 846 } 847 void rearmTimer() @trusted { 848 flags.update(ThrottleFlags.timerRearming); 849 timerStopSource.stop(); 850 851 auto localFlags = flags.load!(MemoryOrder.acq); 852 // if old timer happens to trigger anyway (or the source is done) we can stop 853 if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0) 854 return; 855 856 timerStopSource.reset(); 857 858 flags.update(0,0,ThrottleFlags.timerRearming); 859 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this)); 860 timerOp.start(); 861 } 862 void process(size_t newState) { 863 auto count = newState / ThrottleFlags.counter; 864 bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0); 865 866 if (!isDone) { 867 stopSource.stop(); 868 timerStopSource.stop(); 869 return; 870 } 871 872 cb.dispose(); 873 874 if (receiver.getStopToken().isStopRequested) 875 receiver.setDone(); 876 else if ((newState & ThrottleFlags.value_produced) > 0) { 877 static if (emitLogic == ThrottleEmitLogic.last) { 878 if ((newState & ThrottleFlags.timerArmed) > 0) { 879 try { 880 static if (!is(Properties.ElementType == void)) 881 dg(item); 882 else 883 dg(); 884 } catch (Exception e) { 885 receiver.setError(e); 886 return; 887 } 888 } 889 } 890 import concurrency.receiver : setValueOrError; 891 static if (is(Properties.Value == void)) 892 receiver.setValueOrError(); 893 else 894 receiver.setValueOrError(value); 895 } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) { 896 if (exception) 897 receiver.setError(exception); 898 else 899 receiver.setDone(); 900 } 901 } 902 private void stop() @trusted nothrow { 903 stopSource.stop(); 904 timerStopSource.stop(); 905 } 906 void start() @trusted nothrow scope { 907 cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload 908 op.start(); 909 } 910 } 911 } 912 return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream))(stream, dur); 913 } 914 915 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream. 916 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) { 917 import std.traits : ReturnType; 918 alias Properties = StreamProperties!Stream; 919 static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do."); 920 import std.exception : enforce; 921 enforce(window > 0, "window must be greater than 0."); 922 enforce(step <= window, "step can't be bigger than window."); 923 alias DG = CollectDelegate!(Properties.ElementType[]); 924 static struct SlideStreamOp(Receiver) { 925 alias Op = OpType!(Properties.Sender, Receiver); 926 size_t window, step; 927 Properties.ElementType[] arr; 928 DG dg; 929 Op op; 930 @disable this(ref return scope typeof(this) rhs); 931 @disable this(this); 932 this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted { 933 this.window = window; 934 this.step = step; 935 this.arr.reserve(window); 936 this.dg = dg; 937 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 938 } 939 void item(Properties.ElementType t) { 940 import std.algorithm : moveAll; 941 if (arr.length == window) { 942 arr[window-1] = t; 943 } else { 944 arr ~= t; 945 if (arr.length < window) 946 return; 947 } 948 dg(arr); 949 if (step != window) { 950 moveAll(arr[step .. $], arr[0..$-step]); 951 if (step > 1) 952 arr.length -= step; 953 } 954 } 955 void start() nothrow @safe { 956 op.start(); 957 } 958 } 959 return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp)(stream, window, step); 960 } 961 962 /// toList collects all the stream's values and emits the array as a Sender 963 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) { 964 alias Properties = StreamProperties!Stream; 965 static assert(is(Properties.Value == void), "sender must produce void for toList to work"); 966 static struct ToListReceiver(Op) { 967 Op* op; 968 void setValue() @safe { 969 op.receiver.setValue(op.arr); 970 } 971 void setDone() @safe nothrow { 972 op.receiver.setDone(); 973 } 974 void setError(Exception e) nothrow @safe { 975 op.receiver.setError(e); 976 } 977 auto getStopToken() nothrow @safe { 978 return op.receiver.getStopToken(); 979 } 980 auto getScheduler() nothrow @safe { 981 return op.receiver.getScheduler(); 982 } 983 } 984 static struct ToListOp(Receiver) { 985 alias Op = OpType!(Properties.Sender, ToListReceiver!(typeof(this))); 986 Op op; 987 Receiver receiver; 988 Properties.ElementType[] arr; 989 @disable this(this); 990 @disable this(ref return scope typeof(this) rhs); 991 this(Stream stream, return Receiver receiver) @trusted scope return { 992 this.receiver = receiver; 993 op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(typeof(this))(&this)); 994 } 995 void item(Properties.ElementType t) { 996 arr ~= t; 997 } 998 void start() nothrow @safe { 999 op.start(); 1000 } 1001 } 1002 static struct ToListSender { 1003 alias Value = Properties.ElementType[]; 1004 Stream stream; 1005 auto connect(Receiver)(return Receiver receiver) @safe scope return { 1006 // ensure NRVO 1007 auto op = ToListOp!(Receiver)(stream, receiver); 1008 return op; 1009 } 1010 } 1011 return ToListSender(stream); 1012 }