1 module concurrency.operations.onresult;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 
9 /// runs a side-effect whenever the underlying sender completes with value or cancellation
10 auto onResult(Sender, SideEffect)(Sender sender, SideEffect effect) {
11   import concurrency.utils : isThreadSafeFunction;
12   static assert(isThreadSafeFunction!SideEffect);
13   return OnResultSender!(Sender, SideEffect)(sender, effect);
14 }
15 
16 alias tee = onResult;
17 
18 private struct OnResultReceiver(Value, SideEffect, Receiver) {
19   Receiver receiver;
20   SideEffect sideEffect;
21   static if (is(Value == void))
22     void setValue() @safe {
23       sideEffect(Result!void());
24       receiver.setValue();
25     }
26   else
27     void setValue(Value value) @safe {
28       sideEffect(Result!(Value)(value));
29       receiver.setValue(value);
30     }
31 
32   void setDone() @trusted nothrow {
33     try {
34       sideEffect(Result!(Value)(Cancelled()));
35     } catch (Throwable t) {
36       return receiver.setError(t);
37     }
38     receiver.setDone();
39   }
40 
41   void setError(Throwable e) @trusted nothrow {
42     if (auto ex = cast(Exception) e) {
43       try {
44         sideEffect(Result!(Value)(ex));
45       } catch (Exception e2) {
46         return receiver.setError(() @trusted {
47             return Throwable.chainTogether(e2, e);
48           }());
49       } catch (Throwable t) {
50         return receiver.setError(t);
51       }
52     }
53     receiver.setError(e);
54   }
55 
56   mixin ForwardExtensionPoints!receiver;
57 }
58 
59 struct OnResultSender(Sender, SideEffect) if (models!(Sender, isSender)) {
60   static assert(models!(typeof(this), isSender));
61   alias Value = Sender.Value;
62   Sender sender;
63   SideEffect effect;
64   auto connect(Receiver)(return Receiver receiver) @safe scope return {
65     // ensure NRVO
66     auto op = sender.connect(OnResultReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect));
67     return op;
68   }
69 }