1 module concurrency.operations.onerror; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 9 auto onError(Sender, SideEffect)(Sender sender, SideEffect effect) { 10 import concurrency.utils : isThreadSafeFunction; 11 alias T = Exception; 12 static assert(isThreadSafeFunction!SideEffect); 13 return OnErrorSender!(Sender, SideEffect)(sender, effect); 14 } 15 16 private struct OnErrorReceiver(Value, SideEffect, Receiver) { 17 Receiver receiver; 18 SideEffect sideEffect; 19 static if (is(Value == void)) 20 void setValue() @safe { 21 receiver.setValue(); 22 } 23 else 24 void setValue(Value value) @safe { 25 receiver.setValue(value); 26 } 27 void setDone() @safe nothrow { 28 receiver.setDone(); 29 } 30 void setError(Throwable t) @safe nothrow { 31 if (auto e = cast(Exception)t) 32 { 33 try 34 sideEffect(e); 35 catch (Exception e2) { 36 receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ()); 37 return; 38 } 39 } 40 receiver.setError(t); 41 } 42 mixin ForwardExtensionPoints!receiver; 43 } 44 45 struct OnErrorSender(Sender, SideEffect) if (models!(Sender, isSender)) { 46 static assert (models!(typeof(this), isSender)); 47 alias Value = Sender.Value; 48 Sender sender; 49 SideEffect effect; 50 auto connect(Receiver)(return Receiver receiver) @safe scope return { 51 // ensure NRVO 52 auto op = sender.connect(OnErrorReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect)); 53 return op; 54 } 55 }