1 module concurrency.stream.sample;
2 
3 import concurrency.stream.stream;
4 import concurrency.sender : OpType;
5 import concepts;
6 
7 /// Forwards the latest value from the base stream every time the trigger stream produces a value. If the base stream hasn't produces a (new) value the trigger is ignored
8 auto sample(StreamBase, StreamTrigger)(StreamBase base, StreamTrigger trigger) if (models!(StreamBase, isStream) && models!(StreamTrigger, isStream)) {
9   alias PropertiesBase = StreamProperties!StreamBase;
10   alias PropertiesTrigger = StreamProperties!StreamTrigger;
11   static assert(!is(PropertiesBase.ElementType == void), "No point in sampling a stream that procudes no values. Might as well use trigger directly");
12   alias DG = PropertiesBase.DG;
13   return fromStreamOp!(PropertiesBase.ElementType, PropertiesBase.Value, SampleStreamOp!(StreamBase, StreamTrigger))(base, trigger);
14 }
15 
16 template SampleStreamOp(StreamBase, StreamTrigger) {
17   import concurrency.operations.raceall;
18   import concurrency.bitfield : SharedBitField;
19   enum Flags : size_t {
20     locked = 0x1,
21     valid = 0x2
22   }
23   alias PropertiesBase = StreamProperties!StreamBase;
24   alias PropertiesTrigger = StreamProperties!StreamTrigger;
25   alias DG = PropertiesBase.DG;
26   struct SampleStreamOp(Receiver) {
27     import std.traits : ReturnType;
28     alias RaceAllSender = ReturnType!(raceAll!(PropertiesBase.Sender, PropertiesTrigger.Sender));
29     alias Op = OpType!(RaceAllSender, Receiver);
30     DG dg;
31     Op op;
32     PropertiesBase.ElementType element;
33     shared SharedBitField!Flags state;
34     shared size_t sampleState;
35     @disable this(ref return scope inout typeof(this) rhs);
36     @disable this(this);
37     this(StreamBase base, StreamTrigger trigger, DG dg, return Receiver receiver) @trusted scope {
38       this.dg = dg;
39       op = raceAll(base.collect(cast(PropertiesBase.DG)&item),
40                    trigger.collect(cast(PropertiesTrigger.DG)&this.trigger)).connect(receiver);
41     }
42     void item(PropertiesBase.ElementType t) {
43       import core.atomic : atomicOp;
44       with(state.lock(Flags.valid)) {
45         element = t;
46       }
47     }
48     void trigger() {
49       import core.atomic : atomicOp;
50       with(state.lock()) {
51         if (was(Flags.valid)) {
52           auto localElement = element;
53           release(Flags.valid);
54           dg(localElement);
55         }
56       }
57     }
58     void start() {
59       op.start();
60     }
61   }
62 }