1 module concurrency.operations.retry; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 struct Times { 11 int max = 5; 12 int n = 0; 13 bool failure(Throwable e) @safe nothrow { 14 n++; 15 return n >= max; 16 } 17 } 18 19 20 // Checks T is retry logic 21 void checkRetryLogic(T)() { 22 T t = T.init; 23 alias Ret = typeof((() nothrow => t.failure(Throwable.init))()); 24 static assert(is(Ret == bool), T.stringof ~ ".failure(Throwable) should return a bool, but it returns a " ~ Ret.stringof); 25 } 26 27 enum isRetryLogic(T) = is(typeof(checkRetryLogic!T)); 28 29 auto retry(Sender, Logic)(Sender sender, Logic logic) { 30 return RetrySender!(Sender, Logic)(sender, logic); 31 } 32 33 private struct RetryReceiver(Receiver, Sender, Logic) { 34 private { 35 Sender sender; 36 Receiver receiver; 37 Logic logic; 38 alias Value = Sender.Value; 39 } 40 static if (is(Value == void)) { 41 void setValue() @safe { 42 receiver.setValueOrError(); 43 } 44 } else { 45 void setValue(Value value) @safe { 46 receiver.setValueOrError(value); 47 } 48 } 49 void setDone() @safe nothrow { 50 receiver.setDone(); 51 } 52 void setError(Throwable e) @safe nothrow { 53 if (logic.failure(e)) 54 receiver.setError(e); 55 else { 56 try { 57 // TODO: we connect on the heap here but we can probably do something smart... 58 // Maybe we can store the new Op in the RetryOp struct 59 // From what I gathered that is what libunifex does 60 sender.connectHeap(this).start(); 61 } catch (Exception e) { 62 receiver.setError(e); 63 } 64 } 65 } 66 mixin ForwardExtensionPoints!receiver; 67 } 68 69 private struct RetryOp(Receiver, Sender, Logic) { 70 alias Op = OpType!(Sender, RetryReceiver!(Receiver, Sender, Logic)); 71 Op op; 72 @disable this(ref return scope typeof(this) rhs); 73 @disable this(this); 74 this(Sender sender, return RetryReceiver!(Receiver, Sender, Logic) receiver) @trusted scope { 75 op = sender.connect(receiver); 76 } 77 void start() @trusted nothrow scope { 78 op.start(); 79 } 80 } 81 82 struct RetrySender(Sender, Logic) if (models!(Sender, isSender) && models!(Logic, isRetryLogic)) { 83 static assert(models!(typeof(this), isSender)); 84 alias Value = Sender.Value; 85 Sender sender; 86 Logic logic; 87 auto connect(Receiver)(return Receiver receiver) @safe return scope { 88 // ensure NRVO 89 auto op = RetryOp!(Receiver, Sender, Logic)(sender, RetryReceiver!(Receiver, Sender, Logic)(sender, receiver, logic)); 90 return op; 91 } 92 }