module concurrency.stream.cycle;

import concurrency.stream.stream;
import std.range : ElementType;

struct Cycle(Range) {
  alias T = ElementType!Range;
  alias DG = CollectDelegate!(T);
  Range range;
  void loop(StopToken)(DG emit, StopToken stopToken) @safe {
    for(;!stopToken.isStopRequested;) {
      foreach(item; range) {
        emit(item);
        if (stopToken.isStopRequested)
          return;
      }
    }
  }
}

/// Stream that cycles through a Range until cancelled
auto cycleStream(Range)(Range range) {
  alias T = ElementType!Range;
  return Cycle!Range(range).loopStream!T;
}