1 module concurrency.stream.tolist;
2
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concepts;
6
7 /// toList collects all the stream's values and emits the array as a Sender
8 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) {
9 alias Properties = StreamProperties!Stream;
10 static assert(is(Properties.Value == void), "sender must produce void for toList to work");
11 return ToListSender!Stream(stream);
12 }
13
14 struct ToListSender(Stream) {
15 alias Properties = StreamProperties!Stream;
16 alias Value = Properties.ElementType[];
17 Stream stream;
18 auto connect(Receiver)(return Receiver receiver) @safe scope return {
19 // ensure NRVO
20 auto op = ToListOp!(Stream, Receiver)(stream, receiver);
21 return op;
22 }
23 }
24
25 struct ToListOp(Stream, Receiver) {
26 alias Properties = StreamProperties!Stream;
27 alias State = ToListState!(Receiver, Properties.ElementType);
28 State state;
29 alias Op = OpType!(Properties.Sender, ToListReceiver!(State));
30 Op op;
31 @disable this(this);
32 @disable this(ref return scope typeof(this) rhs);
33 this(Stream stream, return Receiver receiver) @trusted scope return {
34 state.receiver = receiver;
35 op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(State)(&state));
36 }
37 void item(Properties.ElementType t) {
38 state.arr ~= t;
39 }
40 void start() nothrow @safe {
41 op.start();
42 }
43 }
44
45 struct ToListState(Receiver, ElementType) {
46 Receiver receiver;
47 ElementType[] arr;
48 }
49
50 struct ToListReceiver(State) {
51 State* state;
52 void setValue() @safe {
53 state.receiver.setValue(state.arr);
54 }
55 void setDone() @safe nothrow {
56 state.receiver.setDone();
57 }
58 void setError(Throwable t) nothrow @safe {
59 state.receiver.setError(t);
60 }
61 auto getStopToken() nothrow @safe {
62 return state.receiver.getStopToken();
63 }
64 auto getScheduler() nothrow @safe {
65 return state.receiver.getScheduler();
66 }
67 }