1 module concurrency.stream.tolist; 2 3 import concurrency.stream.stream; 4 import concurrency.sender : OpType; 5 import concepts; 6 7 /// toList collects all the stream's values and emits the array as a Sender 8 auto toList(Stream)(Stream stream) if (models!(Stream, isStream)) { 9 alias Properties = StreamProperties!Stream; 10 static assert(is(Properties.Value == void), "sender must produce void for toList to work"); 11 return ToListSender!Stream(stream); 12 } 13 14 struct ToListSender(Stream) { 15 alias Properties = StreamProperties!Stream; 16 alias Value = Properties.ElementType[]; 17 Stream stream; 18 auto connect(Receiver)(return Receiver receiver) @safe return scope { 19 // ensure NRVO 20 auto op = ToListOp!(Stream, Receiver)(stream, receiver); 21 return op; 22 } 23 } 24 25 struct ToListOp(Stream, Receiver) { 26 alias Properties = StreamProperties!Stream; 27 alias State = ToListState!(Receiver, Properties.ElementType); 28 State state; 29 alias Op = OpType!(Properties.Sender, ToListReceiver!(State)); 30 Op op; 31 @disable this(this); 32 @disable this(ref return scope typeof(this) rhs); 33 this(Stream stream, return Receiver receiver) @trusted return scope { 34 state.receiver = receiver; 35 op = stream.collect(cast(Properties.DG)&item).connect(ToListReceiver!(State)(&state)); 36 } 37 void item(Properties.ElementType t) { 38 state.arr ~= t; 39 } 40 void start() nothrow @safe { 41 op.start(); 42 } 43 } 44 45 struct ToListState(Receiver, ElementType) { 46 Receiver receiver; 47 ElementType[] arr; 48 } 49 50 struct ToListReceiver(State) { 51 State* state; 52 void setValue() @safe { 53 state.receiver.setValue(state.arr); 54 } 55 void setDone() @safe nothrow { 56 state.receiver.setDone(); 57 } 58 void setError(Throwable t) nothrow @safe { 59 state.receiver.setError(t); 60 } 61 auto getStopToken() nothrow @safe { 62 return state.receiver.getStopToken(); 63 } 64 auto getScheduler() nothrow @safe { 65 return state.receiver.getScheduler(); 66 } 67 }