module concurrency.stream.filter;

import concurrency.stream.stream;
import concurrency.sender : OpType;
import concepts;

auto filter(Stream, Fun)(Stream stream, Fun fun) if (models!(Stream, isStream)) {
  alias Properties = StreamProperties!Stream;
  return fromStreamOp!(Properties.ElementType, Properties.Value, FilterStreamOp!(Stream, Fun))(stream, fun);
}

template FilterStreamOp(Stream, Fun) {
  import concurrency.utils : isThreadSafeFunction;
  static assert(isThreadSafeFunction!Fun);
  struct FilterStreamOp(Receiver) {
    alias Properties = StreamProperties!Stream;
    alias DG = Properties.DG;
    alias Op = OpType!(Properties.Sender, Receiver);
    Fun fun;
    DG dg;
    Op op;
    @disable this(ref return scope typeof(this) rhs);
    @disable this(this);
    this(Stream stream, Fun fun, DG dg, Receiver receiver) @trusted {
      this.fun = fun;
      this.dg = dg;
      op = stream.collect(cast(Properties.DG)&item).connect(receiver);
    }
    static if (is(Properties.ElementType == void))
      void item() {
        if (fun())
          dg();
      }
    else
      void item(Properties.ElementType t) {
        if (fun(t))
          dg(t);
      }
    void start() nothrow @safe {
      op.start();
    }
  }
}