module concurrency.stream.scan;

import concurrency.stream.stream;
import concurrency.sender : OpType;
import concepts;
import concurrency.utils : isThreadSafeFunction;

/// Applies an accumulator to each value from the source
auto scan(Stream, Fun, Seed)(Stream stream, scope Fun scanFn, Seed seed) if (models!(Stream, isStream)) {
  static assert(isThreadSafeFunction!Fun);
  alias Properties = StreamProperties!Stream;
  return fromStreamOp!(Seed, Properties.Value, ScanStreamOp!(Stream, Fun, Seed))(stream, scanFn, seed);
 }

template ScanStreamOp(Stream, Fun, Seed) {
  static assert(isThreadSafeFunction!Fun);
  alias Properties = StreamProperties!Stream;
  alias DG = CollectDelegate!(Seed);
  struct ScanStreamOp(Receiver) {
    alias Op = OpType!(Properties.Sender, Receiver);
    Fun scanFn;
    Seed acc;
    DG dg;
    Op op;
    @disable this(ref return scope typeof(this) rhs);
    @disable this(this);
    this(Stream stream, Fun scanFn, Seed seed, DG dg, Receiver receiver) @trusted {
      this.scanFn = scanFn;
      this.acc = seed;
      this.dg = dg;
      op = stream.collect(cast(Properties.DG)&item).connect(receiver);
    }
    static if (is(Properties.ElementType == void))
      void item() {
        acc = scanFn(acc);
        dg(acc);
      }
    else
      void item(Properties.ElementType t) {
        acc = scanFn(acc, t);
        dg(acc);
      }
    void start() nothrow @safe {
      op.start();
    }
  }
}