1 module concurrency.stream.take; 2 3 import concurrency.stream.stream; 4 import concurrency.sender : OpType; 5 import concurrency.receiver : ForwardExtensionPoints; 6 import concurrency.stoptoken : StopSource; 7 import concepts; 8 9 /// takes the first n values from a stream or until cancelled 10 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) { 11 alias Properties = StreamProperties!Stream; 12 import std.exception : enforce; 13 enforce(n > 0, "cannot take 0"); 14 return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp!Stream)(stream, n); 15 } 16 17 struct TakeReceiver(Receiver, Value) { 18 Receiver receiver; 19 StopSource stopSource; 20 static if (is(Value == void)) 21 void setValue() @safe { receiver.setValue(); } 22 else 23 void setValue(Value e) @safe { receiver.setValue(e); } 24 void setDone() nothrow @safe { 25 import concurrency.receiver : setValueOrError; 26 static if (is(Value == void)) { 27 if (stopSource.isStopRequested) 28 receiver.setValueOrError(); 29 else 30 receiver.setDone(); 31 } else 32 receiver.setDone(); 33 } 34 void setError(Throwable t) nothrow @safe { 35 receiver.setError(t); 36 } 37 mixin ForwardExtensionPoints!receiver; 38 } 39 40 template TakeOp(Stream) { 41 alias Properties = StreamProperties!Stream; 42 struct TakeOp(Receiver) { 43 import concurrency.operations : withStopSource; 44 import std.traits : ReturnType; 45 alias SS = ReturnType!(withStopSource!(Properties.Sender)); 46 alias Op = OpType!(SS, TakeReceiver!(Receiver, Properties.Sender.Value)); 47 size_t n; 48 Properties.DG dg; 49 StopSource stopSource; 50 Op op; 51 @disable this(ref return scope typeof(this) rhs); 52 @disable this(this); 53 this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope { 54 stopSource = new StopSource(); 55 this.dg = dg; 56 this.n = n; 57 op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!(Receiver,Properties.Sender.Value)(receiver, stopSource)); 58 } 59 static if (is(Properties.ElementType == void)) { 60 private void item() { 61 dg(); 62 /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that 63 n--; 64 if (n == 0) 65 stopSource.stop(); 66 } 67 } else { 68 private void item(Properties.ElementType t) { 69 dg(t); 70 n--; 71 if (n == 0) 72 stopSource.stop(); 73 } 74 } 75 void start() nothrow @trusted scope { 76 op.start(); 77 } 78 } 79 }