1 module concurrency.stream.sample; 2 3 import concurrency.stream.stream; 4 import concurrency.sender : OpType; 5 import concepts; 6 7 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored 8 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) { 9 alias PropertiesBase = StreamProperties!StreamBase; 10 alias PropertiesTrigger = StreamProperties!StreamTrigger; 11 static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly"); 12 alias DG = PropertiesBase.DG; 13 return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp!(StreamBase, StreamTrigger))(base, trigger); 14 } 15 16 template SampleStreamOp(StreamBase, StreamTrigger) { 17 import concurrency.operations.raceall; 18 import concurrency.bitfield : SharedBitField; 19 enum Flags : size_t { 20 locked = 0x1, 21 valid = 0x2 22 } 23 alias PropertiesBase = StreamProperties!StreamBase; 24 alias PropertiesTrigger = StreamProperties!StreamTrigger; 25 alias DG = PropertiesBase.DG; 26 struct SampleStreamOp(Receiver) { 27 import std.traits : ReturnType; 28 alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender)); 29 alias Op = OpType!(RaceAllSender, Receiver); 30 DG dg; 31 Op op; 32 PropertiesBase.ElementType element; 33 shared SharedBitField!Flags state; 34 shared size_t sampleState; 35 @disable this(ref return scope inout typeof(this) rhs); 36 @disable this(this); 37 this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope { 38 this.dg = dg; 39 op = raceAll(base.collect(cast(PropertiesBase.DG)&item), 40 trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver); 41 } 42 void item(PropertiesBase.ElementType t) { 43 import core.atomic : atomicOp; 44 with(state.lock(Flags.valid)) { 45 element = t; 46 } 47 } 48 void trigger() { 49 import core.atomic : atomicOp; 50 with(state.lock()) { 51 if (was(Flags.valid)) { 52 auto localElement = element; 53 release(Flags.valid); 54 dg(localElement); 55 } 56 } 57 } 58 void start() { 59 op.start(); 60 } 61 } 62 }