1 module concurrency.operations.onresult; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concurrency.stoptoken; 7 import concepts; 8 9 /// runs a side-effect whenever the underlying sender completes with value or cancellation 10 auto onResult(Sender, SideEffect)(Sender sender, SideEffect effect) { 11 import concurrency.utils : isThreadSafeFunction; 12 static assert(isThreadSafeFunction!SideEffect); 13 return OnResultSender!(Sender, SideEffect)(sender, effect); 14 } 15 16 alias tee = onResult; 17 18 private struct OnResultReceiver(Value, SideEffect, Receiver) { 19 Receiver receiver; 20 SideEffect sideEffect; 21 static if (is(Value == void)) 22 void setValue() @safe { 23 sideEffect(Result!void()); 24 receiver.setValue(); 25 } 26 else 27 void setValue(Value value) @safe { 28 sideEffect(Result!(Value)(value)); 29 receiver.setValue(value); 30 } 31 32 void setDone() @trusted nothrow { 33 try { 34 sideEffect(Result!(Value)(Cancelled())); 35 } catch (Throwable t) { 36 return receiver.setError(t); 37 } 38 receiver.setDone(); 39 } 40 41 void setError(Throwable e) @trusted nothrow { 42 if (auto ex = cast(Exception) e) { 43 try { 44 sideEffect(Result!(Value)(ex)); 45 } catch (Exception e2) { 46 return receiver.setError(() @trusted { 47 return Throwable.chainTogether(e2, e); 48 }()); 49 } catch (Throwable t) { 50 return receiver.setError(t); 51 } 52 } 53 receiver.setError(e); 54 } 55 56 mixin ForwardExtensionPoints!receiver; 57 } 58 59 struct OnResultSender(Sender, SideEffect) if (models!(Sender, isSender)) { 60 static assert(models!(typeof(this), isSender)); 61 alias Value = Sender.Value; 62 Sender sender; 63 SideEffect effect; 64 auto connect(Receiver)(return Receiver receiver) @safe scope return { 65 // ensure NRVO 66 auto op = sender.connect(OnResultReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect)); 67 return op; 68 } 69 }