1 module concurrency.operations.retry;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 struct Times {
11   int max = 5;
12   int n = 0;
13   bool failure(Throwable e) @safe nothrow {
14     n++;
15     return n >= max;
16   }
17 }
18 
19 auto retry(Sender, Logic)(Sender sender, Logic logic) {
20   return RetrySender!(Sender, Logic)(sender, logic);
21 }
22 
23 private struct RetryReceiver(Receiver, Sender, Logic) {
24   private {
25     Sender sender;
26     Receiver receiver;
27     Logic logic;
28     alias Value = Sender.Value;
29   }
30   static if (is(Value == void)) {
31     void setValue() @safe {
32       receiver.setValueOrError();
33     }
34   } else {
35     void setValue(Value value) @safe {
36       receiver.setValueOrError(value);
37     }
38   }
39   void setDone() @safe nothrow {
40     receiver.setDone();
41   }
42   void setError(Throwable e) @safe nothrow {
43     if (logic.failure(e))
44       receiver.setError(e);
45     else {
46       try {
47         // TODO: we connect on the heap here but we can probably do something smart...
48         // Maybe we can store the new Op in the RetryOp struct
49         // From what I gathered that is what libunifex does
50         sender.connectHeap(this).start();
51       } catch (Exception e) {
52         receiver.setError(e);
53       }
54     }
55   }
56   mixin ForwardExtensionPoints!receiver;
57 }
58 
59 private struct RetryOp(Receiver, Sender, Logic) {
60   alias Op = OpType!(Sender, RetryReceiver!(Receiver, Sender, Logic));
61   Op op;
62   @disable this(ref return scope typeof(this) rhs);
63   @disable this(this);
64   this(Sender sender, return RetryReceiver!(Receiver, Sender, Logic) receiver) @trusted scope {
65     op = sender.connect(receiver);
66   }
67   void start() @trusted nothrow scope {
68     op.start();
69   }
70 }
71 
72 struct RetrySender(Sender, Logic) if (models!(Sender, isSender)) {
73   static assert(models!(typeof(this), isSender));
74   alias Value = Sender.Value;
75   Sender sender;
76   Logic logic;
77   auto connect(Receiver)(return Receiver receiver) @safe scope return {
78     // ensure NRVO
79     auto op = RetryOp!(Receiver, Sender, Logic)(sender, RetryReceiver!(Receiver, Sender, Logic)(sender, receiver, logic));
80     return op;
81   }
82 }