1 module concurrency.operations.retry;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 import std.traits;
9 
10 struct Times {
11   int max = 5;
12   int n = 0;
13   bool failure(Throwable e) @safe nothrow {
14     n++;
15     return n >= max;
16   }
17 }
18 
19 
20 // Checks T is retry logic
21 void checkRetryLogic(T)() {
22   T t = T.init;
23   alias Ret = typeof((() nothrow => t.failure(Throwable.init))());
24   static assert(is(Ret == bool), T.stringof ~ ".failure(Throwable) should return a bool, but it returns a " ~ Ret.stringof);
25 }
26 
27 enum isRetryLogic(T) = is(typeof(checkRetryLogic!T));
28 
29 auto retry(Sender, Logic)(Sender sender, Logic logic) {
30   return RetrySender!(Sender, Logic)(sender, logic);
31 }
32 
33 private struct RetryReceiver(Receiver, Sender, Logic) {
34   private {
35     Sender sender;
36     Receiver receiver;
37     Logic logic;
38     alias Value = Sender.Value;
39   }
40   static if (is(Value == void)) {
41     void setValue() @safe {
42       receiver.setValueOrError();
43     }
44   } else {
45     void setValue(Value value) @safe {
46       receiver.setValueOrError(value);
47     }
48   }
49   void setDone() @safe nothrow {
50     receiver.setDone();
51   }
52   void setError(Throwable e) @safe nothrow {
53     if (logic.failure(e))
54       receiver.setError(e);
55     else {
56       try {
57         // TODO: we connect on the heap here but we can probably do something smart...
58         // Maybe we can store the new Op in the RetryOp struct
59         // From what I gathered that is what libunifex does
60         sender.connectHeap(this).start();
61       } catch (Exception e) {
62         receiver.setError(e);
63       }
64     }
65   }
66   mixin ForwardExtensionPoints!receiver;
67 }
68 
69 private struct RetryOp(Receiver, Sender, Logic) {
70   alias Op = OpType!(Sender, RetryReceiver!(Receiver, Sender, Logic));
71   Op op;
72   @disable this(ref return scope typeof(this) rhs);
73   @disable this(this);
74   this(Sender sender, return RetryReceiver!(Receiver, Sender, Logic) receiver) @trusted scope {
75     op = sender.connect(receiver);
76   }
77   void start() @trusted nothrow scope {
78     op.start();
79   }
80 }
81 
82 struct RetrySender(Sender, Logic) if (models!(Sender, isSender) && models!(Logic, isRetryLogic)) {
83   static assert(models!(typeof(this), isSender));
84   alias Value = Sender.Value;
85   Sender sender;
86   Logic logic;
87   auto connect(Receiver)(return Receiver receiver) @safe return scope {
88     // ensure NRVO
89     auto op = RetryOp!(Receiver, Sender, Logic)(sender, RetryReceiver!(Receiver, Sender, Logic)(sender, receiver, logic));
90     return op;
91   }
92 }