1 module concurrency.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 import concurrency.sender : isSender, OpType; 6 import concepts; 7 import std.traits : hasFunctionAttributes; 8 9 /// A Stream is anything that has a `.collect` function that accepts a callable and returns a Sender. 10 /// Once the Sender is connected and started the Stream will call the callable zero or more times before one of the three terminal functions of the Receiver is called. 11 12 template CollectDelegate(ElementType) { 13 static if (is(ElementType == void)) { 14 alias CollectDelegate = void delegate() @safe shared; 15 } else { 16 alias CollectDelegate = void delegate(ElementType) @safe shared; 17 } 18 } 19 20 /// checks that T is a Stream 21 void checkStream(T)() { 22 import std.traits : ReturnType; 23 alias DG = CollectDelegate!(T.ElementType); 24 static if (is(typeof(T.collect!DG))) 25 alias Sender = ReturnType!(T.collect!(DG)); 26 else 27 alias Sender = ReturnType!(T.collect); 28 static assert (models!(Sender, isSender)); 29 } 30 enum isStream(T) = is(typeof(checkStream!T)); 31 32 /// A polymorphic stream with elements of type T 33 interface StreamObjectBase(T) { 34 import concurrency.sender : SenderObjectBase; 35 alias ElementType = T; 36 static assert (models!(typeof(this), isStream)); 37 alias DG = CollectDelegate!(ElementType); 38 39 SenderObjectBase!void collect(DG dg) @safe; 40 } 41 42 /// A class extending from StreamObjectBase that wraps any Stream 43 class StreamObjectImpl(Stream) : StreamObjectBase!(Stream.ElementType) if (models!(Stream, isStream)) { 44 import concurrency.receiver : ReceiverObjectBase; 45 static assert (models!(typeof(this), isStream)); 46 private Stream stream; 47 this(Stream stream) { 48 this.stream = stream; 49 } 50 alias DG = CollectDelegate!(Stream.ElementType); 51 52 SenderObjectBase!void collect(DG dg) @safe { 53 import concurrency.sender : toSenderObject; 54 return stream.collect(dg).toSenderObject(); 55 } 56 } 57 58 /// Converts any Stream to a polymorphic StreamObject 59 StreamObjectBase!(Stream.ElementType) toStreamObject(Stream)(Stream stream) if (models!(Stream, isStream)) { 60 return new StreamObjectImpl!(Stream)(stream); 61 } 62 63 /* 64 catch? 65 combineLatest 66 count 67 debounce 68 distinctUntilChanged 69 drop 70 dropWhile 71 filter 72 first 73 firstOrNull 74 flatMapConcat 75 flatMapLatest 76 flatMapMerge 77 fold 78 map 79 mapLatest 80 merge 81 onEach 82 onEmpty 83 onStart 84 onSubscription 85 reduce (fold with no seed) 86 retry 87 retryWhen 88 runningReduce 89 sample 90 scan (like runningReduce but with initial value) 91 take 92 takeWhile 93 toList 94 transform 95 transformLatest 96 zip 97 */ 98 99 /// Helper to construct a Stream, useful if the Stream you are modeling has a blocking loop 100 template loopStream(E) { 101 alias DG = CollectDelegate!(E); 102 auto loopStream(T)(T t) { 103 static struct LoopStream { 104 static assert(models!(typeof(this), isStream)); 105 alias ElementType = E; 106 static struct LoopOp(Receiver) { 107 T t; 108 DG dg; 109 Receiver receiver; 110 @disable this(ref return scope typeof(this) rhs); 111 @disable this(this); 112 this(T t, DG dg, Receiver receiver) { 113 this.t = t; 114 this.dg = dg; 115 this.receiver = receiver; 116 } 117 void start() @trusted nothrow scope { 118 try { 119 t.loop(dg, receiver.getStopToken); 120 } catch (Exception e) { 121 receiver.setError(e); 122 } 123 if (receiver.getStopToken().isStopRequested) 124 receiver.setDone(); 125 else 126 receiver.setValueOrError(); 127 } 128 } 129 static struct LoopSender { 130 alias Value = void; 131 T t; 132 DG dg; 133 auto connect(Receiver)(return Receiver receiver) @safe scope return { 134 // ensure NRVO 135 auto op = LoopOp!(Receiver)(t, dg, receiver); 136 return op; 137 } 138 } 139 T t; 140 auto collect(DG dg) @safe { 141 return LoopSender(t, dg); 142 } 143 } 144 return LoopStream(t); 145 } 146 } 147 148 /// Stream that emit the same value until cancelled 149 auto infiniteStream(T)(T t) { 150 alias DG = CollectDelegate!(T); 151 struct Loop { 152 T val; 153 void loop(StopToken)(DG emit, StopToken stopToken) { 154 while(!stopToken.isStopRequested) 155 emit(val); 156 } 157 } 158 return Loop(t).loopStream!T; 159 } 160 161 /// Stream that emits from start..end or until cancelled 162 auto iotaStream(T)(T start, T end) { 163 alias DG = CollectDelegate!(T); 164 struct Loop { 165 T b,e; 166 void loop(StopToken)(DG emit, StopToken stopToken) { 167 foreach(i; b..e) { 168 emit(i); 169 if (stopToken.isStopRequested) 170 break; 171 } 172 } 173 } 174 return Loop(start, end).loopStream!T; 175 } 176 177 /// Stream that emits each value from the array or until cancelled 178 auto arrayStream(T)(T[] arr) { 179 alias DG = CollectDelegate!(T); 180 struct Loop { 181 T[] arr; 182 void loop(StopToken)(DG emit, StopToken stopToken) @safe { 183 foreach(item; arr) { 184 emit(item); 185 if (stopToken.isStopRequested) 186 break; 187 } 188 } 189 } 190 return Loop(arr).loopStream!T; 191 } 192 193 import core.time : Duration; 194 195 auto intervalStream(Duration duration) { 196 alias DG = CollectDelegate!(void); 197 static struct ItemReceiver(Op) { 198 Op* op; 199 void setValue() @safe { 200 if (op.receiver.getStopToken.isStopRequested) { 201 op.receiver.setDone(); 202 return; 203 } 204 try { 205 op.dg(); 206 if (op.receiver.getStopToken.isStopRequested) { 207 op.receiver.setDone(); 208 return; 209 } 210 op.start(); 211 } catch (Exception e) { 212 op.receiver.setError(e); 213 } 214 } 215 void setDone() @safe nothrow { 216 op.receiver.setDone(); 217 } 218 void setError(Exception e) @safe nothrow { 219 op.receiver.setError(e); 220 } 221 auto getStopToken() @safe { 222 return op.receiver.getStopToken(); 223 } 224 auto getScheduler() @safe { 225 return op.receiver.getScheduler(); 226 } 227 } 228 static struct Op(Receiver) { 229 import std.traits : ReturnType; 230 Duration duration; 231 DG dg; 232 Receiver receiver; 233 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 234 alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this))); 235 Op op; 236 @disable this(this); 237 @disable this(ref return scope typeof(this) rhs); 238 this(Duration duration, DG dg, Receiver receiver) { 239 this.duration = duration; 240 this.dg = dg; 241 this.receiver = receiver; 242 } 243 void start() @trusted nothrow { 244 try { 245 op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); 246 op.start(); 247 } catch (Exception e) { 248 receiver.setError(e); 249 } 250 } 251 } 252 static struct Sender { 253 alias Value = void; 254 Duration duration; 255 DG dg; 256 auto connect(Receiver)(return Receiver receiver) @safe scope return { 257 // ensure NRVO 258 auto op = Op!(Receiver)(duration, dg, receiver); 259 return op; 260 } 261 } 262 static struct IntervalStream { 263 alias ElementType = void; 264 Duration duration; 265 auto collect(DG dg) @safe { 266 return Sender(duration, dg); 267 } 268 } 269 return IntervalStream(duration); 270 } 271 272 template StreamProperties(Stream) { 273 import std.traits : ReturnType; 274 alias ElementType = Stream.ElementType; 275 alias DG = CollectDelegate!(ElementType); 276 alias Sender = ReturnType!(Stream.collect); 277 alias Value = Sender.Value; 278 } 279 280 /// takes the first n values from a stream or until cancelled 281 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) { 282 alias Properties = StreamProperties!Stream; 283 static struct TakeReceiver(Receiver) { 284 Receiver receiver; 285 StopSource stopSource; 286 static if (is(Properties.Sender.Value == void)) 287 void setValue() @safe { receiver.setValue(); } 288 else 289 void setValue(Properties.Sender.Value e) @safe { receiver.setValue(e); } 290 void setDone() nothrow @safe { 291 import concurrency.receiver : setValueOrError; 292 static if (is(Properties.Sender.Value == void)) { 293 if (stopSource.isStopRequested) 294 receiver.setValueOrError(); 295 else 296 receiver.setDone(); 297 } else 298 receiver.setDone(); 299 } 300 void setError(Exception e) nothrow @safe { 301 receiver.setError(e); 302 } 303 mixin ForwardExtensionPoints!receiver; 304 } 305 static struct TakeOp(Receiver) { 306 import concurrency.operations : withStopSource; 307 import std.traits : ReturnType; 308 alias SS = ReturnType!(withStopSource!(Properties.Sender)); 309 alias Op = OpType!(SS, TakeReceiver!Receiver); 310 size_t n; 311 Properties.DG dg; 312 StopSource stopSource; 313 Op op; 314 @disable this(ref return scope typeof(this) rhs); 315 @disable this(this); 316 private this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope { 317 stopSource = new StopSource(); 318 this.dg = dg; 319 this.n = n; 320 op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!Receiver(receiver, stopSource)); 321 } 322 static if (is(Properties.ElementType == void)) { 323 private void item() { 324 dg(); 325 /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that 326 n--; 327 if (n == 0) 328 stopSource.stop(); 329 } 330 } else { 331 private void item(Properties.ElementType t) { 332 dg(t); 333 n--; 334 if (n == 0) 335 stopSource.stop(); 336 } 337 } 338 void start() nothrow @trusted scope { 339 op.start(); 340 } 341 } 342 import std.exception : enforce; 343 enforce(n > 0, "cannot take 0"); 344 return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp)(stream, n); 345 } 346 347 auto transform(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) { 348 import std.traits : ReturnType; 349 alias Properties = StreamProperties!Stream; 350 alias InnerElementType = ReturnType!Fun; 351 alias DG = CollectDelegate!(InnerElementType); 352 static struct TransformStreamOp(Receiver) { 353 alias Op = OpType!(Properties.Sender, Receiver); 354 Fun fun; 355 DG dg; 356 Op op; 357 @disable this(ref return scope typeof(this) rhs); 358 @disable this(this); 359 this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted { 360 this.fun = fun; 361 this.dg = dg; 362 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 363 } 364 static if (is(Properties.ElementType == void)) 365 void item() { 366 static if (is(InnerElementType == void)) { 367 fun(); 368 dg(); 369 } else 370 dg(fun()); 371 } 372 else 373 void item(Properties.ElementType t) { 374 static if (is(InnerElementType == void)) { 375 fun(t); 376 dg(); 377 } else 378 dg(fun(t)); 379 } 380 void start() nothrow @safe { 381 op.start(); 382 } 383 } 384 return fromStreamOp!(ReturnType!Fun, Properties.Value, TransformStreamOp)(stream, fun); 385 } 386 387 auto fromStreamOp(StreamElementType, SenderValue, alias Op, Args...)(Args args) { 388 alias DG = CollectDelegate!(StreamElementType); 389 static struct FromStreamSender { 390 alias Value = SenderValue; 391 Args args; 392 DG dg; 393 auto connect(Receiver)(return Receiver receiver) @safe scope return { 394 // ensure NRVO 395 auto op = Op!(Receiver)(args, dg, receiver); 396 return op; 397 } 398 } 399 static struct FromStream { 400 static assert(models!(typeof(this), isStream)); 401 alias ElementType = StreamElementType; 402 Args args; 403 auto collect(DG dg) @safe { 404 return FromStreamSender(args, dg); 405 } 406 } 407 return FromStream(args); 408 } 409 410 /// Applies an accumulator to each value from the source 411 auto scan(Stream, ScanFn, Seed)(Stream stream, scope ScanFn scanFn, Seed seed) if (models!(Stream, isStream)) { 412 import std.traits : ReturnType; 413 alias Properties = StreamProperties!Stream; 414 alias DG = CollectDelegate!(Seed); 415 static struct ScanStreamOp(Receiver) { 416 alias Op = OpType!(Properties.Sender, Receiver); 417 ScanFn scanFn; 418 Seed acc; 419 DG dg; 420 Op op; 421 @disable this(ref return scope typeof(this) rhs); 422 @disable this(this); 423 this(Stream stream, ScanFn scanFn, Seed seed, DG dg, Receiver receiver) @trusted { 424 this.scanFn = scanFn; 425 this.acc = seed; 426 this.dg = dg; 427 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 428 } 429 static if (is(Properties.ElementType == void)) 430 void item() { 431 acc = scanFn(acc); 432 dg(acc); 433 } 434 else 435 void item(Properties.ElementType t) { 436 acc = scanFn(acc, t); 437 dg(acc); 438 } 439 void start() nothrow @safe { 440 op.start(); 441 } 442 } 443 return fromStreamOp!(Seed, Properties.Value, ScanStreamOp)(stream, scanFn, seed); 444 } 445 446 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored 447 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) { 448 import concurrency.operations.raceall; 449 import concurrency.bitfield : SharedBitField; 450 enum Flags : size_t { 451 locked = 0x1, 452 valid = 0x2 453 } 454 alias PropertiesBase = StreamProperties!StreamBase; 455 alias PropertiesTrigger = StreamProperties!StreamTrigger; 456 static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly"); 457 alias DG = PropertiesBase.DG; 458 static struct SampleStreamOp(Receiver) { 459 import std.traits : ReturnType; 460 alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender)); 461 alias Op = OpType!(RaceAllSender, Receiver); 462 DG dg; 463 Op op; 464 PropertiesBase.ElementType element; 465 shared SharedBitField!Flags state; 466 shared size_t sampleState; 467 @disable this(ref return scope inout typeof(this) rhs); 468 @disable this(this); 469 this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope { 470 this.dg = dg; 471 op = raceAll(base.collect(cast(PropertiesBase.DG)&item), 472 trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver); 473 } 474 void item(PropertiesBase.ElementType t) { 475 import core.atomic : atomicOp; 476 with(state.lock(Flags.valid)) { 477 element = t; 478 } 479 } 480 void trigger() { 481 import core.atomic : atomicOp; 482 with(state.lock()) { 483 if (was(Flags.valid)) { 484 auto localElement = element; 485 release(Flags.valid); 486 dg(localElement); 487 } 488 } 489 } 490 void start() { 491 op.start(); 492 } 493 } 494 return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp)(base, trigger); 495 } 496 497 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) { 498 alias Properties = StreamProperties!Stream; 499 alias DG = Properties.DG; 500 static struct ViaStreamOp(Receiver) { 501 import std.traits : ReturnType; 502 import concurrency.operations.via : senderVia = via; 503 alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver); 504 Op op; 505 @disable this(ref return scope typeof(this) rhs); 506 @disable this(this); 507 this(Stream stream, Sender sender, DG dg, Receiver receiver) { 508 op = stream.collect(dg).senderVia(sender).connect(receiver); 509 } 510 void start() nothrow @safe { 511 op.start(); 512 } 513 } 514 return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender); 515 } 516 517 auto doneStream() { 518 alias DG = CollectDelegate!void; 519 static struct DoneStreamOp(Receiver) { 520 Receiver receiver; 521 @disable this(ref return scope typeof(this) rhs); 522 @disable this(this); 523 this(DG dg, Receiver receiver) { 524 this.receiver = receiver; 525 } 526 void start() nothrow @safe { 527 receiver.setDone(); 528 } 529 } 530 return fromStreamOp!(void, void, DoneStreamOp)(); 531 } 532 533 auto errorStream(Exception e) { 534 alias DG = CollectDelegate!void; 535 static struct ErrorStreamOp(Receiver) { 536 Exception e; 537 Receiver receiver; 538 @disable this(ref return scope typeof(this) rhs); 539 @disable this(this); 540 this(Exception e, DG dg, Receiver receiver) { 541 this.e = e; 542 this.receiver = receiver; 543 } 544 void start() nothrow @safe { 545 receiver.setError(e); 546 } 547 } 548 return fromStreamOp!(void, void, ErrorStreamOp)(e); 549 } 550 551 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection. 552 auto sharedStream(T)() { 553 import concurrency.slist; 554 alias DG = CollectDelegate!(T); 555 return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG)); 556 } 557 558 shared struct SharedStream(T) { 559 alias ElementType = T; 560 alias SubscriberDG = void delegate(T) nothrow @safe shared; 561 import concurrency.slist; 562 private { 563 alias DG = CollectDelegate!T; 564 static struct Op(Receiver) { 565 shared SharedStream!T source; 566 DG dg; 567 Receiver receiver; 568 StopCallback cb; 569 @disable this(ref return scope typeof(this) rhs); 570 @disable this(this); 571 void start() nothrow @trusted { 572 auto stopToken = receiver.getStopToken(); 573 cb = stopToken.onStop(&(cast(shared)this).onStop); 574 if (stopToken.isStopRequested) { 575 cb.dispose(); 576 receiver.setDone(); 577 } else { 578 source.add(&(cast(shared)this).onItem); 579 } 580 } 581 void onStop() nothrow @safe shared { 582 with(unshared) { 583 source.remove(&this.onItem); 584 receiver.setDone(); 585 } 586 } 587 void onItem(T element) nothrow @safe shared { 588 with(unshared) { 589 try { 590 dg(element); 591 } catch (Exception e) { 592 source.remove(&this.onItem); 593 cb.dispose(); 594 receiver.setError(e); 595 } 596 } 597 } 598 private auto ref unshared() nothrow @trusted shared { 599 return cast()this; 600 } 601 } 602 static struct SharedStreamSender { 603 alias Value = void; 604 shared SharedStream!T source; 605 DG dg; 606 auto connect(Receiver)(return Receiver receiver) @safe scope return { 607 // ensure NRVO 608 auto op = Op!(Receiver)(source, dg, receiver); 609 return op; 610 } 611 } 612 shared SList!SubscriberDG dgs; 613 } 614 this(shared SList!SubscriberDG dgs) { 615 this.dgs = dgs; 616 } 617 void emit(T t) nothrow @trusted { 618 foreach(dg; dgs[]) 619 dg(t); 620 } 621 private void remove(SubscriberDG dg) nothrow @trusted { 622 dgs.remove(dg); 623 } 624 private void add(SubscriberDG dg) nothrow @trusted { 625 dgs.pushBack(dg); 626 } 627 auto collect(DG dg) @safe { 628 return SharedStreamSender(this, dg); 629 } 630 } 631 632 template SchedulerType(Receiver) { 633 import std.traits : ReturnType; 634 alias SchedulerType = ReturnType!(Receiver.getScheduler); 635 } 636 637 private enum ThrottleFlags : size_t { 638 locked = 0x1, 639 value_produced = 0x2, 640 doneOrError_produced = 0x4, 641 timerArmed = 0x8, 642 timerRearming = 0x10, 643 counter = 0x20 644 } 645 646 enum ThrottleEmitLogic: uint { 647 first, // emit the first item in the window 648 last // emit the last item in the window 649 }; 650 enum ThrottleTimerLogic: uint { 651 noop, // don't reset the timer on new items 652 rearm // reset the timer on new items 653 }; 654 655 /// throttleFirst forwards one item and then enters a cooldown period during which it ignores items 656 auto throttleFirst(Stream)(Stream s, Duration d) { 657 return throttling!(Stream, ThrottleEmitLogic.first, ThrottleTimerLogic.noop)(s, d); 658 } 659 660 /// throttleLast starts a cooldown period when it receives an item, after which it forwards the lastest value from the cooldown period 661 auto throttleLast(Stream)(Stream s, Duration d) { 662 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.noop)(s, d); 663 } 664 665 /// debounce skips all items which are succeeded by another within the duration. Effectively it only emits items after a duration of silence 666 auto debounce(Stream)(Stream s, Duration d) { 667 return throttling!(Stream, ThrottleEmitLogic.last, ThrottleTimerLogic.rearm)(s, d); 668 } 669 670 auto throttling(Stream, ThrottleEmitLogic emitLogic, ThrottleTimerLogic timerLogic)(Stream stream, Duration dur) if (models!(Stream, isStream)) { 671 import std.traits : ReturnType; 672 import concurrency.bitfield : SharedBitField; 673 import core.atomic : MemoryOrder; 674 alias Properties = StreamProperties!Stream; 675 alias DG = Properties.DG; 676 static struct SenderReceiver(Op) { 677 Op* state; 678 static if (is(Properties.Value == void)) 679 void setValue() { 680 with (state.flags.update(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 681 state.process(newState); 682 } 683 } 684 else 685 void setValue(Properties.Value value) { 686 with (state.flags.lock(ThrottleFlags.value_produced, ThrottleFlags.counter)) { 687 state.value = value; 688 release(); 689 state.process(newState); 690 } 691 } 692 void setDone() { 693 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 694 state.process(newState); 695 } 696 } 697 void setError(Exception e) nothrow @safe { 698 state.setError(e); 699 } 700 auto getStopToken() { 701 return StopToken(state.stopSource); 702 } 703 auto getScheduler() { 704 return state.receiver.getScheduler(); 705 } 706 } 707 static struct TimerReceiver(Op) { 708 Op* state; 709 void setValue() @safe { 710 with (state.flags.lock()) { 711 if (was(ThrottleFlags.timerRearming)) 712 return; 713 714 static if (!is(Properties.ElementType == void) && emitLogic == ThrottleEmitLogic.last) 715 auto item = state.item; 716 release(ThrottleFlags.timerArmed); 717 static if (emitLogic == ThrottleEmitLogic.last) { 718 static if (!is(Properties.ElementType == void)) 719 state.push(item); 720 else 721 state.push(); 722 } 723 } 724 } 725 void setDone() @safe nothrow { 726 // TODO: would be nice if we can merge in next update... 727 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 728 return; 729 with (state.flags.update(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 730 state.process(newState); 731 } 732 } 733 void setError(Exception e) nothrow @safe { 734 // TODO: would be nice if we can merge in next lock... 735 if ((state.flags.load!(MemoryOrder.acq) & ThrottleFlags.timerRearming) > 0) 736 return; 737 state.setError(e); 738 } 739 auto getStopToken() { 740 return StopToken(state.timerStopSource); 741 } 742 auto getScheduler() { 743 return state.receiver.getScheduler(); 744 } 745 } 746 template ThrottleStreamOp(Stream) { 747 static struct ThrottleStreamOp(Receiver) { 748 Duration dur; 749 DG dg; 750 Receiver receiver; 751 static if (emitLogic == ThrottleEmitLogic.last) 752 static if (!is(Properties.ElementType == void)) 753 Properties.ElementType item; 754 static if (!is(Properties.Value == void)) 755 Properties.Value value; 756 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 757 StopSource stopSource; 758 StopSource timerStopSource; 759 StopCallback cb; 760 Exception exception; 761 alias Op = OpType!(Properties.Sender, SenderReceiver!(typeof(this))); 762 alias TimerOp = OpType!(SchedulerAfterSender, TimerReceiver!(typeof(this))); 763 Op op; 764 TimerOp timerOp; 765 shared SharedBitField!ThrottleFlags flags; 766 @disable this(ref return scope inout typeof(this) rhs); 767 @disable this(this); 768 this(return Stream stream, Duration dur, DG dg, Receiver receiver) @trusted scope { 769 this.dur = dur; 770 this.dg = dg; 771 this.receiver = receiver; 772 stopSource = new StopSource(); 773 timerStopSource = new StopSource(); 774 op = stream.collect(cast(Properties.DG)&onItem).connect(SenderReceiver!(typeof(this))(&this)); 775 } 776 static if (is(Properties.ElementType == void)) { 777 private void onItem() { 778 with (flags.update(ThrottleFlags.timerArmed)) { 779 if ((oldState & ThrottleFlags.timerArmed) == 0) { 780 static if (emitLogic == ThrottleEmitLogic.first) { 781 if (!push(t)) 782 return; 783 } 784 armTimer(); 785 } else { 786 static if (timerLogic == ThrottleTimerLogic.rearm) { 787 // release(); 788 rearmTimer(); 789 } 790 } 791 } 792 } 793 private bool push() { 794 try { 795 dg(); 796 return true; 797 } catch (Exception e) { 798 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 799 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 800 exception = e; 801 } 802 release(); 803 process(newState); 804 } 805 return false; 806 } 807 } 808 } else { 809 private void onItem(Properties.ElementType t) { 810 with (flags.lock(ThrottleFlags.timerArmed)) { 811 static if (emitLogic == ThrottleEmitLogic.last) 812 item = t; 813 release(); 814 if ((oldState & ThrottleFlags.timerArmed) == 0) { 815 static if (emitLogic == ThrottleEmitLogic.first) { 816 if (!push(t)) 817 return; 818 } 819 armTimer(); 820 } else { 821 static if (timerLogic == ThrottleTimerLogic.rearm) { 822 rearmTimer(); 823 } 824 } 825 } 826 } 827 private bool push(Properties.ElementType t) { 828 try { 829 dg(t); 830 return true; 831 } catch (Exception e) { 832 with (flags.lock(ThrottleFlags.doneOrError_produced)) { 833 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 834 exception = e; 835 } 836 release(); 837 process(newState); 838 } 839 return false; 840 } 841 } 842 } 843 private void setError(Exception e) { 844 with (flags.lock(ThrottleFlags.doneOrError_produced, ThrottleFlags.counter)) { 845 if ((oldState & ThrottleFlags.doneOrError_produced) == 0) { 846 exception = e; 847 } 848 release(); 849 process(newState); 850 } 851 } 852 void armTimer() { 853 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this)); 854 timerOp.start(); 855 } 856 void rearmTimer() @trusted { 857 flags.update(ThrottleFlags.timerRearming); 858 timerStopSource.stop(); 859 860 auto localFlags = flags.load!(MemoryOrder.acq); 861 // if old timer happens to trigger anyway (or the source is done) we can stop 862 if ((localFlags & ThrottleFlags.timerArmed) == 0 || (localFlags / ThrottleFlags.counter) > 0) 863 return; 864 865 timerStopSource.reset(); 866 867 flags.update(0,0,ThrottleFlags.timerRearming); 868 timerOp = receiver.getScheduler().scheduleAfter(dur).connect(TimerReceiver!(typeof(this))(&this)); 869 timerOp.start(); 870 } 871 void process(size_t newState) { 872 auto count = newState / ThrottleFlags.counter; 873 bool isDone = count == 2 || (count == 1 && (newState & ThrottleFlags.timerArmed) == 0); 874 875 if (!isDone) { 876 stopSource.stop(); 877 timerStopSource.stop(); 878 return; 879 } 880 881 cb.dispose(); 882 883 if (receiver.getStopToken().isStopRequested) 884 receiver.setDone(); 885 else if ((newState & ThrottleFlags.value_produced) > 0) { 886 static if (emitLogic == ThrottleEmitLogic.last) { 887 if ((newState & ThrottleFlags.timerArmed) > 0) { 888 try { 889 static if (!is(Properties.ElementType == void)) 890 dg(item); 891 else 892 dg(); 893 } catch (Exception e) { 894 receiver.setError(e); 895 return; 896 } 897 } 898 } 899 import concurrency.receiver : setValueOrError; 900 static if (is(Properties.Value == void)) 901 receiver.setValueOrError(); 902 else 903 receiver.setValueOrError(value); 904 } else if ((newState & ThrottleFlags.doneOrError_produced) > 0) { 905 if (exception) 906 receiver.setError(exception); 907 else 908 receiver.setDone(); 909 } 910 } 911 private void stop() @trusted nothrow { 912 stopSource.stop(); 913 timerStopSource.stop(); 914 } 915 void start() @trusted nothrow scope { 916 cb = receiver.getStopToken().onStop(cast(void delegate() nothrow @safe shared)&this.stop); // butt ugly cast, but it won't take the second overload 917 op.start(); 918 } 919 } 920 } 921 return fromStreamOp!(Properties.ElementType, Properties.Value, ThrottleStreamOp!(Stream))(stream, dur); 922 } 923 924 /// slides a window over a stream, emitting all items in the window as an array. The array is reused so you must duplicate if you want to access it beyond the stream. 925 auto slide(Stream)(Stream stream, size_t window, size_t step = 1) if (models!(Stream, isStream)) { 926 import std.traits : ReturnType; 927 alias Properties = StreamProperties!Stream; 928 static assert(!is(Properties.ElementType == void), "Need ElementType to be able to slide, void wont do."); 929 import std.exception : enforce; 930 enforce(window > 0, "window must be greater than 0."); 931 enforce(step <= window, "step can't be bigger than window."); 932 alias DG = CollectDelegate!(Properties.ElementType[]); 933 static struct SlideStreamOp(Receiver) { 934 alias Op = OpType!(Properties.Sender, Receiver); 935 size_t window, step; 936 Properties.ElementType[] arr; 937 DG dg; 938 Op op; 939 @disable this(ref return scope typeof(this) rhs); 940 @disable this(this); 941 this(Stream stream, size_t window, size_t step, DG dg, Receiver receiver) @trusted { 942 this.window = window; 943 this.step = step; 944 this.arr.reserve(window); 945 this.dg = dg; 946 op = stream.collect(cast(Properties.DG)&item).connect(receiver); 947 } 948 void item(Properties.ElementType t) { 949 import std.algorithm : moveAll; 950 if (arr.length == window) { 951 arr[window-1] = t; 952 } else { 953 arr ~= t; 954 if (arr.length < window) 955 return; 956 } 957 dg(arr); 958 if (step != window) { 959 moveAll(arr[step .. $], arr[0..$-step]); 960 if (step > 1) 961 arr.length -= step; 962 } 963 } 964 void start() nothrow @safe { 965 op.start(); 966 } 967 } 968 return fromStreamOp!(Properties.ElementType[], Properties.Value, SlideStreamOp)(stream, window, step); 969 } 970 971 /// toList collects all the stream's values and emits the array as a Sender 972 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) { 973 alias Properties = StreamProperties!Stream; 974 static assert(is(Properties.Value == void), "sender must produce void for toList to work"); 975 static struct ToListReceiver(State) { 976 State* state; 977 void setValue() @safe { 978 state.receiver.setValue(state.arr); 979 } 980 void setDone() @safe nothrow { 981 state.receiver.setDone(); 982 } 983 void setError(Exception e) nothrow @safe { 984 state.receiver.setError(e); 985 } 986 auto getStopToken() nothrow @safe { 987 return state.receiver.getStopToken(); 988 } 989 auto getScheduler() nothrow @safe { 990 return state.receiver.getScheduler(); 991 } 992 } 993 static struct State(Receiver) { 994 Receiver receiver; 995 Properties.ElementType[] arr; 996 } 997 static struct ToListOp(Receiver) { 998 State!Receiver state; 999 alias Op = OpType!(Properties.Sender, ToListReceiver!(State!Receiver)); 1000 Op op; 1001 @disable this(this); 1002 @disable this(ref return scope typeof(this) rhs); 1003 this(Stream stream, return Receiver receiver) @trusted scope return { 1004 state.receiver = receiver; 1005 op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(State!Receiver)(&state)); 1006 } 1007 void item(Properties.ElementType t) { 1008 state.arr ~= t; 1009 } 1010 void start() nothrow @safe { 1011 op.start(); 1012 } 1013 } 1014 static struct ToListSender { 1015 alias Value = Properties.ElementType[]; 1016 Stream stream; 1017 auto connect(Receiver)(return Receiver receiver) @safe scope return { 1018 // ensure NRVO 1019 auto op = ToListOp!(Receiver)(stream, receiver); 1020 return op; 1021 } 1022 } 1023 return ToListSender(stream); 1024 }