1 module concurrency.stream.take;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concurrency.receiver : ForwardExtensionPoints;
6 import concurrency.stoptoken : StopSource;
7 import concepts;
8 
9 /// takes the first n values from a stream or until cancelled
10 auto take(Stream)(Stream stream, size_t n) if (models!(Stream, isStream)) {
11   alias Properties = StreamProperties!Stream;
12   import std.exception : enforce;
13   enforce(n > 0, "cannot take 0");
14   return fromStreamOp!(Properties.ElementType, Properties.Value, TakeOp!Stream)(stream, n);
15 }
16 
17 struct TakeReceiver(Receiver, Value) {
18   Receiver receiver;
19   StopSource stopSource;
20   static if (is(Value == void))
21     void setValue() @safe { receiver.setValue(); }
22   else
23     void setValue(Value e) @safe { receiver.setValue(e); }
24   void setDone() nothrow @safe {
25     import concurrency.receiver : setValueOrError;
26     static if (is(Value == void)) {
27       if (stopSource.isStopRequested)
28         receiver.setValueOrError();
29       else
30         receiver.setDone();
31     } else
32       receiver.setDone();
33   }
34   void setError(Throwable t) nothrow @safe {
35     receiver.setError(t);
36   }
37   mixin ForwardExtensionPoints!receiver;
38 }
39 
40 template TakeOp(Stream) {
41   alias Properties = StreamProperties!Stream;
42   struct TakeOp(Receiver) {
43     import concurrency.operations : withStopSource;
44     import std.traits : ReturnType;
45     alias SS = ReturnType!(withStopSource!(Properties.Sender));
46     alias Op = OpType!(SS, TakeReceiver!(Receiver, Properties.Sender.Value));
47     size_t n;
48     Properties.DG dg;
49     StopSource stopSource;
50     Op op;
51     @disable this(ref return scope typeof(this) rhs);
52     @disable this(this);
53     this(return Stream stream, size_t n, Properties.DG dg, return Receiver receiver) @trusted scope {
54       stopSource = new StopSource();
55       this.dg = dg;
56       this.n = n;
57       op = stream.collect(cast(Properties.DG)&item).withStopSource(stopSource).connect(TakeReceiver!(Receiver,Properties.Sender.Value)(receiver, stopSource));
58     }
59     static if (is(Properties.ElementType == void)) {
60       private void item() {
61         dg();
62         /// TODO: this implies the stream will only call emit from a single execution context, we might need to enforce that
63         n--;
64         if (n == 0)
65           stopSource.stop();
66       }
67     } else {
68       private void item(Properties.ElementType t) {
69         dg(t);
70         n--;
71         if (n == 0)
72           stopSource.stop();
73       }
74     }
75     void start() nothrow @trusted scope {
76       op.start();
77     }
78   }
79 }