1 module concurrency.operations.repeat; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 auto repeat(Sender)(Sender sender) { 11 static assert(is(Sender.Value : void), "Can only repeat effectful Senders."); 12 return RepeatSender!(Sender)(sender); 13 } 14 15 private struct RepeatReceiver(Receiver) { 16 private Receiver receiver; 17 private void delegate() @safe nothrow scope reset; 18 void setValue() @safe { 19 reset(); 20 } 21 void setDone() @safe nothrow { 22 receiver.setDone(); 23 } 24 void setError(Throwable e) @safe nothrow { 25 receiver.setError(e); 26 } 27 mixin ForwardExtensionPoints!receiver; 28 } 29 30 private struct RepeatOp(Receiver, Sender) { 31 alias Op = OpType!(Sender, RepeatReceiver!(Receiver)); 32 Sender sender; 33 Receiver receiver; 34 Op op; 35 @disable this(ref return scope typeof(this) rhs); 36 @disable this(this); 37 this(Sender sender, Receiver receiver) @trusted scope { 38 this.sender = sender; 39 this.receiver = receiver; 40 } 41 void start() @trusted nothrow scope { 42 op = sender.connect(RepeatReceiver!(Receiver)(receiver, &start)); 43 op.start(); 44 } 45 } 46 47 struct RepeatSender(Sender) if (models!(Sender, isSender)) { 48 static assert(models!(typeof(this), isSender)); 49 alias Value = Sender.Value; 50 Sender sender; 51 auto connect(Receiver)(return Receiver receiver) @safe scope return { 52 // ensure NRVO 53 auto op = RepeatOp!(Receiver, Sender)(sender, receiver); 54 return op; 55 } 56 }