1 module concurrency.operations.onerror;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 
9 /// runs a side-effect whenever the underlying sender errors
10 auto onError(Sender, SideEffect)(Sender sender, SideEffect effect) {
11   import concurrency.utils : isThreadSafeFunction;
12   alias T = Exception;
13   static assert(isThreadSafeFunction!SideEffect);
14   return OnErrorSender!(Sender, SideEffect)(sender, effect);
15 }
16 
17 private struct OnErrorReceiver(Value, SideEffect, Receiver) {
18   Receiver receiver;
19   SideEffect sideEffect;
20   static if (is(Value == void))
21     void setValue() @safe {
22       receiver.setValue();
23     }
24   else
25     void setValue(Value value) @safe {
26       receiver.setValue(value);
27     }
28   void setDone() @safe nothrow {
29     receiver.setDone();
30   }
31   void setError(Throwable t) @safe nothrow {
32     if (auto e = cast(Exception)t)
33     {
34       try
35         sideEffect(e);
36       catch (Exception e2) {
37         receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ());
38         return;
39       }
40     }
41     receiver.setError(t);
42   }
43   mixin ForwardExtensionPoints!receiver;
44 }
45 
46 struct OnErrorSender(Sender, SideEffect) if (models!(Sender, isSender)) {
47   static assert (models!(typeof(this), isSender));
48   alias Value = Sender.Value;
49   Sender sender;
50   SideEffect effect;
51   auto connect(Receiver)(return Receiver receiver) @safe scope return {
52     // ensure NRVO
53     auto op = sender.connect(OnErrorReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect));
54     return op;
55   }
56 }