1 module concurrency.operations.onerror; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 9 /// runs a side-effect whenever the underlying sender errors 10 auto onError(Sender, SideEffect)(Sender sender, SideEffect effect) { 11 import concurrency.utils : isThreadSafeFunction; 12 alias T = Exception; 13 static assert(isThreadSafeFunction!SideEffect); 14 return OnErrorSender!(Sender, SideEffect)(sender, effect); 15 } 16 17 private struct OnErrorReceiver(Value, SideEffect, Receiver) { 18 Receiver receiver; 19 SideEffect sideEffect; 20 static if (is(Value == void)) 21 void setValue() @safe { 22 receiver.setValue(); 23 } 24 else 25 void setValue(Value value) @safe { 26 receiver.setValue(value); 27 } 28 void setDone() @safe nothrow { 29 receiver.setDone(); 30 } 31 void setError(Throwable t) @safe nothrow { 32 if (auto e = cast(Exception)t) 33 { 34 try 35 sideEffect(e); 36 catch (Exception e2) { 37 receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ()); 38 return; 39 } 40 } 41 receiver.setError(t); 42 } 43 mixin ForwardExtensionPoints!receiver; 44 } 45 46 struct OnErrorSender(Sender, SideEffect) if (models!(Sender, isSender)) { 47 static assert (models!(typeof(this), isSender)); 48 alias Value = Sender.Value; 49 Sender sender; 50 SideEffect effect; 51 auto connect(Receiver)(return Receiver receiver) @safe scope return { 52 // ensure NRVO 53 auto op = sender.connect(OnErrorReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect)); 54 return op; 55 } 56 }