1 module concurrency.operations.onerror;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concurrency.stoptoken;
7 import concepts;
8 
9 auto onError(Sender, SideEffect)(Sender sender, SideEffect effect) {
10   import concurrency.utils : isThreadSafeFunction;
11   alias T = Exception;
12   static assert(isThreadSafeFunction!SideEffect);
13   return OnErrorSender!(Sender, SideEffect)(sender, effect);
14 }
15 
16 private struct OnErrorReceiver(Value, SideEffect, Receiver) {
17   Receiver receiver;
18   SideEffect sideEffect;
19   static if (is(Value == void))
20     void setValue() @safe {
21       receiver.setValue();
22     }
23   else
24     void setValue(Value value) @safe {
25       receiver.setValue(value);
26     }
27   void setDone() @safe nothrow {
28     receiver.setDone();
29   }
30   void setError(Throwable t) @safe nothrow {
31     if (auto e = cast(Exception)t)
32     {
33       try
34         sideEffect(e);
35       catch (Exception e2) {
36         receiver.setError(() @trusted { return Throwable.chainTogether(e2, e); } ());
37         return;
38       }
39     }
40     receiver.setError(t);
41   }
42   mixin ForwardExtensionPoints!receiver;
43 }
44 
45 struct OnErrorSender(Sender, SideEffect) if (models!(Sender, isSender)) {
46   static assert (models!(typeof(this), isSender));
47   alias Value = Sender.Value;
48   Sender sender;
49   SideEffect effect;
50   auto connect(Receiver)(return Receiver receiver) @safe scope return {
51     // ensure NRVO
52     auto op = sender.connect(OnErrorReceiver!(Sender.Value, SideEffect, Receiver)(receiver, effect));
53     return op;
54   }
55 }