1 module concurrency.operations.retry; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 import std.traits; 9 10 struct Times { 11 int max = 5; 12 int n = 0; 13 bool failure(Throwable e) @safe nothrow { 14 n++; 15 return n >= max; 16 } 17 } 18 19 auto retry(Sender, Logic)(Sender sender, Logic logic) { 20 return RetrySender!(Sender, Logic)(sender, logic); 21 } 22 23 private struct RetryReceiver(Receiver, Sender, Logic) { 24 private { 25 Sender sender; 26 Receiver receiver; 27 Logic logic; 28 alias Value = Sender.Value; 29 } 30 static if (is(Value == void)) { 31 void setValue() @safe { 32 receiver.setValueOrError(); 33 } 34 } else { 35 void setValue(Value value) @safe { 36 receiver.setValueOrError(value); 37 } 38 } 39 void setDone() @safe nothrow { 40 receiver.setDone(); 41 } 42 void setError(Throwable e) @safe nothrow { 43 if (logic.failure(e)) 44 receiver.setError(e); 45 else { 46 try { 47 // TODO: we connect on the heap here but we can probably do something smart... 48 // Maybe we can store the new Op in the RetryOp struct 49 // From what I gathered that is what libunifex does 50 sender.connectHeap(this).start(); 51 } catch (Exception e) { 52 receiver.setError(e); 53 } 54 } 55 } 56 mixin ForwardExtensionPoints!receiver; 57 } 58 59 private struct RetryOp(Receiver, Sender, Logic) { 60 alias Op = OpType!(Sender, RetryReceiver!(Receiver, Sender, Logic)); 61 Op op; 62 @disable this(ref return scope typeof(this) rhs); 63 @disable this(this); 64 this(Sender sender, return RetryReceiver!(Receiver, Sender, Logic) receiver) @trusted scope { 65 op = sender.connect(receiver); 66 } 67 void start() @trusted nothrow scope { 68 op.start(); 69 } 70 } 71 72 struct RetrySender(Sender, Logic) if (models!(Sender, isSender)) { 73 static assert(models!(typeof(this), isSender)); 74 alias Value = Sender.Value; 75 Sender sender; 76 Logic logic; 77 auto connect(Receiver)(return Receiver receiver) @safe scope return { 78 // ensure NRVO 79 auto op = RetryOp!(Receiver, Sender, Logic)(sender, RetryReceiver!(Receiver, Sender, Logic)(sender, receiver, logic)); 80 return op; 81 } 82 }