module concurrency.operations.forwardon;

// whenever the underlying Sender completes the forwardon forwards the completion on a specific scheduler

import concurrency;
import concurrency.receiver;
import concurrency.sender;
import concurrency.stoptoken;
import concepts;

auto forwardOn(Sender, Scheduler)(Sender sender, Scheduler scheduler) {
  return ForwardOnSender!(Sender, Scheduler)(sender, scheduler);
}

private struct ForwardOnReceiver(Receiver, Value, Scheduler) {
  import concurrency.operations : via;
  Receiver receiver;
  Scheduler scheduler;
  static if (is(Value == void)) {
    void setValue() @safe {
      VoidSender().via(scheduler.schedule()).connectHeap(receiver).start();
    }
  } else {
    void setValue(Value value) @safe {
      just(value).via(scheduler.schedule()).connectHeap(receiver).start();
    }
  }
  void setDone() @safe nothrow {
    DoneSender().via(scheduler.schedule()).connectHeap(receiver).start();
  }
  void setError(Throwable e) @safe nothrow {
    ErrorSender(e).via(scheduler.schedule()).connectHeap(receiver).start();
  }
  mixin ForwardExtensionPoints!receiver;
}

struct ForwardOnSender(Sender, Scheduler) if (models!(Sender, isSender)) {
  import std.traits : ReturnType;
  static assert(models!(typeof(this), isSender));
  alias Value = Sender.Value;
  Sender sender;
  Scheduler scheduler;
  auto connect(Receiver)(return Receiver receiver) @safe scope return {
    alias R = ForwardOnReceiver!(Receiver, Sender.Value, Scheduler);
    // ensure NRVO
    auto op = sender.connect(R(receiver, scheduler));
    return op;
  }
}