1 module concurrency.operations.repeat;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 auto repeat(Sender)(Sender sender) {
11   static assert(is(Sender.Value : void), "Can only repeat effectful Senders.");
12   return RepeatSender!(Sender)(sender);
13 }
14 
15 private struct RepeatReceiver(Receiver) {
16   private Receiver receiver;
17   private void delegate() @safe scope reset;
18   void setValue() @safe {
19     reset();
20   }
21   void setDone() @safe nothrow {
22     receiver.setDone();
23   }
24   void setError(Throwable e) @safe nothrow {
25     receiver.setError(e);
26   }
27   mixin ForwardExtensionPoints!receiver;
28 }
29 
30 private struct RepeatOp(Receiver, Sender) {
31   alias Op = OpType!(Sender, RepeatReceiver!(Receiver));
32   Sender sender;
33   Receiver receiver;
34   Op op;
35   @disable this(ref return scope typeof(this) rhs);
36   @disable this(this);
37   this(Sender sender, Receiver receiver) @trusted scope {
38     this.sender = sender;
39     this.receiver = receiver;
40   }
41   void start() @trusted nothrow scope {
42     try {
43       reset();
44     } catch (Exception e) {
45       receiver.setError(e);
46     }
47   }
48   private void reset() @trusted scope {
49     op = sender.connect(RepeatReceiver!(Receiver)(receiver, &reset));
50     op.start();
51   }
52 }
53 
54 struct RepeatSender(Sender) if (models!(Sender, isSender)) {
55   static assert(models!(typeof(this), isSender));
56   alias Value = Sender.Value;
57   Sender sender;
58   auto connect(Receiver)(return Receiver receiver) @safe return scope {
59     // ensure NRVO
60     auto op = RepeatOp!(Receiver, Sender)(sender, receiver);
61     return op;
62   }
63 }