1 module concurrency.operations.completewitherror; 2 3 import concurrency; 4 import concurrency.receiver; 5 import concurrency.sender; 6 import concepts; 7 import std.traits; 8 9 auto completeWithError(Sender)(Sender sender, Throwable t) { 10 return CompleteWithErrorSender!(Sender)(sender, t); 11 } 12 13 template isA(T) if (is(T == class)) { 14 bool isA(P)(auto ref P p) if (is(P == class)) { 15 return cast(T)p !is null; 16 } 17 } 18 19 template isNotA(T) if (is(T == class)) { 20 bool isNotA(P)(auto ref P p) { 21 return !p.isA!T; 22 } 23 } 24 25 private struct CompleteWithErrorReceiver(Receiver) { 26 Receiver receiver; 27 Throwable error; 28 void setValue() nothrow @safe { 29 receiver.setError(error); 30 } 31 void setValue(T)(auto ref T t) nothrow @safe { 32 receiver.setError(error); 33 } 34 void setDone() nothrow @safe { 35 receiver.setError(error); 36 } 37 void setError(Throwable e) nothrow @safe { 38 if (error.isA!Exception && e.isNotA!Exception) { 39 // if e is a throwable and t just a regular exception, forward throwable 40 receiver.setError(e); 41 return; 42 } 43 receiver.setError(error); 44 } 45 mixin ForwardExtensionPoints!receiver; 46 } 47 48 struct CompleteWithErrorSender(Sender) if (models!(Sender, isSender)) { 49 static assert (models!(typeof(this), isSender)); 50 alias Value = Sender.Value; 51 Sender sender; 52 Throwable t; 53 auto connect(Receiver)(return Receiver receiver) @safe scope return { 54 /// ensure NRVO 55 auto op = sender.connect(CompleteWithErrorReceiver!(Receiver)(receiver, t)); 56 return op; 57 } 58 }