1 module concurrency.operations.repeat;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 auto repeat(Sender)(Sender sender) {
11   static assert(is(Sender.Value : void), "Can only repeat effectful Senders.");
12   return RepeatSender!(Sender)(sender);
13 }
14 
15 private struct RepeatReceiver(Receiver) {
16   private Receiver receiver;
17   private void delegate() @safe nothrow scope reset;
18   void setValue() @safe {
19     reset();
20   }
21   void setDone() @safe nothrow {
22     receiver.setDone();
23   }
24   void setError(Throwable e) @safe nothrow {
25     receiver.setError(e);
26   }
27   mixin ForwardExtensionPoints!receiver;
28 }
29 
30 private struct RepeatOp(Receiver, Sender) {
31   alias Op = OpType!(Sender, RepeatReceiver!(Receiver));
32   Sender sender;
33   Receiver receiver;
34   Op op;
35   @disable this(ref return scope typeof(this) rhs);
36   @disable this(this);
37   this(Sender sender, Receiver receiver) @trusted scope {
38     this.sender = sender;
39     this.receiver = receiver;
40   }
41   void start() @trusted nothrow scope {
42     op = sender.connect(RepeatReceiver!(Receiver)(receiver, &start));
43     op.start();
44   }
45 }
46 
47 struct RepeatSender(Sender) if (models!(Sender, isSender)) {
48   static assert(models!(typeof(this), isSender));
49   alias Value = Sender.Value;
50   Sender sender;
51   auto connect(Receiver)(return Receiver receiver) @safe scope return {
52     // ensure NRVO
53     auto op = RepeatOp!(Receiver, Sender)(sender, receiver);
54     return op;
55   }
56 }