1 module concurrency.stream.tolist;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concepts;
6 
7 /// toList collects all the stream's values and emits the array as a Sender
8 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) {
9   alias Properties = StreamProperties!Stream;
10   static assert(is(Properties.Value == void), "sender must produce void for toList to work");
11   return ToListSender!Stream(stream);
12 }
13 
14 struct ToListSender(Stream) {
15   alias Properties = StreamProperties!Stream;
16   alias Value = Properties.ElementType[];
17   Stream stream;
18   auto connect(Receiver)(return Receiver receiver) @safe return scope {
19     // ensure NRVO
20     auto op = ToListOp!(Stream, Receiver)(stream, receiver);
21     return op;
22   }
23 }
24 
25 struct ToListOp(Stream, Receiver) {
26   alias Properties = StreamProperties!Stream;
27   alias State = ToListState!(Receiver, Properties.ElementType);
28   State state;
29   alias Op = OpType!(Properties.Sender, ToListReceiver!(State));
30   Op op;
31   @disable this(this);
32   @disable this(ref return scope typeof(this) rhs);
33   this(Stream stream, return Receiver receiver) @trusted return scope {
34     state.receiver = receiver;
35     op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(State)(&state));
36   }
37   void item(Properties.ElementType t) {
38     state.arr ~= t;
39   }
40   void start() nothrow @safe {
41     op.start();
42   }
43 }
44 
45 struct ToListState(Receiver, ElementType) {
46   Receiver receiver;
47   ElementType[] arr;
48 }
49 
50 struct ToListReceiver(State) {
51   State* state;
52   void setValue() @safe {
53     state.receiver.setValue(state.arr);
54   }
55   void setDone() @safe nothrow {
56     state.receiver.setDone();
57   }
58   void setError(Throwable t) nothrow @safe {
59     state.receiver.setError(t);
60   }
61   auto getStopToken() nothrow @safe {
62     return state.receiver.getStopToken();
63   }
64   auto getScheduler() nothrow @safe {
65     return state.receiver.getScheduler();
66   }
67 }