module concurrency.stream.transform; import concurrency.stream.stream; import concurrency.sender : OpType; import concurrency.receiver : ForwardExtensionPoints; import concurrency.stoptoken : StopSource; import std.traits : ReturnType; import concurrency.utils : isThreadSafeFunction; import concepts; auto transform(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) { static assert(isThreadSafeFunction!Fun); alias Properties = StreamProperties!Stream; return fromStreamOp!(ReturnType!Fun, Properties.Value, TransformStreamOp!(Stream, Fun))(stream, fun); } template TransformStreamOp(Stream, Fun) { static assert(isThreadSafeFunction!Fun); alias Properties = StreamProperties!Stream; alias InnerElementType = ReturnType!Fun; alias DG = CollectDelegate!(InnerElementType); struct TransformStreamOp(Receiver) { alias Op = OpType!(Properties.Sender, Receiver); Fun fun; DG dg; Op op; @disable this(ref return scope typeof(this) rhs); @disable this(this); this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted { this.fun = fun; this.dg = dg; op = stream.collect(cast(Properties.DG)&item).connect(receiver); } static if (is(Properties.ElementType == void)) void item() { static if (is(InnerElementType == void)) { fun(); dg(); } else dg(fun()); } else void item(Properties.ElementType t) { static if (is(InnerElementType == void)) { fun(t); dg(); } else dg(fun(t)); } void start() nothrow @safe { op.start(); } } }