1 module concurrency.operations.completewitherror;
2 
3 import concurrency;
4 import concurrency.receiver;
5 import concurrency.sender;
6 import concepts;
7 import std.traits;
8 
9 auto completeWithError(Sender)(Sender sender, Throwable t) {
10   return CompleteWithErrorSender!(Sender)(sender, t);
11 }
12 
13 template isA(T) if (is(T == class)) {
14   bool isA(P)(auto ref P p) if (is(P == class)) {
15     return cast(T)p !is null;
16   }
17 }
18 
19 template isNotA(T) if (is(T == class)) {
20   bool isNotA(P)(auto ref P p) {
21     return !p.isA!T;
22   }
23 }
24 
25 private struct CompleteWithErrorReceiver(Receiver) {
26   Receiver receiver;
27   Throwable error;
28   void setValue() nothrow @safe {
29     receiver.setError(error);
30   }
31   void setValue(T)(auto ref T t) nothrow @safe {
32     receiver.setError(error);
33   }
34   void setDone() nothrow @safe {
35     receiver.setError(error);
36   }
37   void setError(Throwable e) nothrow @safe {
38     if (error.isA!Exception && e.isNotA!Exception) {
39       // if e is a throwable and t just a regular exception, forward throwable
40       receiver.setError(e);
41       return;
42     }
43     receiver.setError(error);
44   }
45   mixin ForwardExtensionPoints!receiver;
46 }
47 
48 struct CompleteWithErrorSender(Sender) if (models!(Sender, isSender)) {
49   static assert (models!(typeof(this), isSender));
50   alias Value = Sender.Value;
51   Sender sender;
52   Throwable t;
53   auto connect(Receiver)(return Receiver receiver) @safe scope return {
54     /// ensure NRVO
55     auto op = sender.connect(CompleteWithErrorReceiver!(Receiver)(receiver, t));
56     return op;
57   }
58 }