1 module concurrency.operations.repeat; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 auto repeat(Sender)(Sender sender) { 11 static assert(is(Sender.Value : void), "Can only repeat effectful Senders."); 12 return RepeatSender!(Sender)(sender); 13 } 14 15 private struct RepeatReceiver(Receiver) { 16 private Receiver receiver; 17 private void delegate() @safe scope reset; 18 void setValue() @safe { 19 reset(); 20 } 21 void setDone() @safe nothrow { 22 receiver.setDone(); 23 } 24 void setError(Throwable e) @safe nothrow { 25 receiver.setError(e); 26 } 27 mixin ForwardExtensionPoints!receiver; 28 } 29 30 private struct RepeatOp(Receiver, Sender) { 31 alias Op = OpType!(Sender, RepeatReceiver!(Receiver)); 32 Sender sender; 33 Receiver receiver; 34 Op op; 35 @disable this(ref return scope typeof(this) rhs); 36 @disable this(this); 37 this(Sender sender, Receiver receiver) @trusted scope { 38 this.sender = sender; 39 this.receiver = receiver; 40 } 41 void start() @trusted nothrow scope { 42 try { 43 reset(); 44 } catch (Exception e) { 45 receiver.setError(e); 46 } 47 } 48 private void reset() @trusted scope { 49 op = sender.connect(RepeatReceiver!(Receiver)(receiver, &reset)); 50 op.start(); 51 } 52 } 53 54 struct RepeatSender(Sender) if (models!(Sender, isSender)) { 55 static assert(models!(typeof(this), isSender)); 56 alias Value = Sender.Value; 57 Sender sender; 58 auto connect(Receiver)(return Receiver receiver) @safe return scope { 59 // ensure NRVO 60 auto op = RepeatOp!(Receiver, Sender)(sender, receiver); 61 return op; 62 } 63 }