1 module concurrency.stream.cron;
2 
3 import mir.algebraic;
4 import std.datetime.systime : SysTime;
5 
6 struct Always {
7 }
8 
9 struct Exact {
10   uint value;
11 }
12 
13 struct Every {
14   uint step;
15   uint starting;
16 }
17 
18 struct Each {
19   uint[] values;
20 }
21 
22 alias Spec = Algebraic!(Always, Exact, Every, Each);
23 
24 struct CronSpec {
25   import std.datetime.timezone : TimeZone, UTC;
26   Spec hours;
27   Spec minutes;
28   immutable(TimeZone) timezone = UTC();
29 }
30 
31 struct Deferrer {
32   CronSpec schedule;
33   shared bool emitAtStart;
34   this(CronSpec schedule, bool emitAtStart) @safe shared {
35     this.schedule = schedule;
36   }
37   auto opCall() @safe shared {
38     import std.datetime.systime : Clock;
39     import concurrency.sender : delay;
40     if (emitAtStart) {
41       import core.time : msecs;
42       emitAtStart = false;
43       return delay(0.msecs);
44     }
45     auto sleep = timeTillNextTrigger(schedule, Clock.currTime());
46     return delay(sleep);
47   }
48 }
49 
50 auto cronStream(CronSpec schedule, bool emitAtStart) @safe {
51   auto d = shared Deferrer(schedule, emitAtStart);
52   import concurrency.stream.defer;
53   return deferStream(d);
54 }
55 
56 auto timeTillNextTrigger(CronSpec schedule, SysTime time) {
57   import core.time;
58   auto now = time.toOtherTZ(schedule.timezone);
59   Duration dur;
60   while (true) {
61     auto m = timeTillNextMinute(schedule.minutes, now);
62     now += m;
63     dur += m;
64     if (!now.hour.matches(schedule.hours)) {
65       auto h = timeTillNextHour(schedule.hours, now);
66       now += h;
67       dur += h;
68       continue;
69     }
70     return dur;
71   }
72 }
73 
74 auto matches(uint value, Spec spec) {
75   import std.algorithm;
76   return spec.match!((Always a) => true,
77                      (Exact e) => value == e.value,
78                      (Every e) => (value - e.starting) % e.step == 0,
79                      (Each e) => e.values.any!(v => v == value));
80 }
81 
82 auto timeTillNext(alias unit, uint cycle, alias selector)(Spec spec, SysTime now) {
83   import std.range : iota, empty, front;
84   import std.algorithm : find;
85   return spec.match!((Always a) {
86       return unit(1);
87     }, (Exact e) {
88       if (selector(now) >= e.value)
89         return unit(e.value + cycle - selector(now));
90       return unit(e.value - selector(now));
91     }, (Every e) {
92       auto next = iota(e.starting, uint.max, e.step).find!(v => v > selector(now));
93       return unit(next.front() - selector(now));
94     }, (Each e) {
95       auto next = e.values.find!(v => v > selector(now));
96       if (next.empty)
97         return unit(e.values.front + cycle - selector(now));
98       return unit(next.front() - selector(now));
99     });
100 }
101 
102 auto timeTillNextMinute(Spec spec, SysTime now) {
103   import core.time;
104   return timeTillNext!(minutes, 60, i => i.minute)(spec, now) - seconds(now.second);
105 }
106 
107 auto timeTillNextHour(Spec spec, SysTime now) {
108   import core.time;
109   return timeTillNext!(hours, 24, i => i.hour)(spec, now) - minutes(now.minute) - seconds(now.second);
110 }