1 module concurrency.stream.cron; 2 3 import mir.algebraic; 4 import std.datetime.systime : SysTime; 5 6 struct Always { 7 } 8 9 struct Exact { 10 uint value; 11 } 12 13 struct Every { 14 uint step; 15 uint starting; 16 } 17 18 struct Each { 19 uint[] values; 20 } 21 22 alias Spec = Algebraic!(Always, Exact, Every, Each); 23 24 struct CronSpec { 25 import std.datetime.timezone : TimeZone, UTC; 26 Spec hours; 27 Spec minutes; 28 immutable(TimeZone) timezone = UTC(); 29 } 30 31 struct Deferrer { 32 CronSpec schedule; 33 shared bool emitAtStart; 34 this(CronSpec schedule, bool emitAtStart) @safe shared { 35 this.schedule = schedule; 36 } 37 auto opCall() @safe shared { 38 import std.datetime.systime : Clock; 39 import concurrency.sender : delay; 40 if (emitAtStart) { 41 import core.time : msecs; 42 emitAtStart = false; 43 return delay(0.msecs); 44 } 45 auto sleep = timeTillNextTrigger(schedule, Clock.currTime()); 46 return delay(sleep); 47 } 48 } 49 50 auto cronStream(CronSpec schedule, bool emitAtStart) @safe { 51 auto d = shared Deferrer(schedule, emitAtStart); 52 import concurrency.stream.defer; 53 return deferStream(d); 54 } 55 56 auto timeTillNextTrigger(CronSpec schedule, SysTime time) { 57 import core.time; 58 auto now = time.toOtherTZ(schedule.timezone); 59 Duration dur; 60 while (true) { 61 auto m = timeTillNextMinute(schedule.minutes, now); 62 now += m; 63 dur += m; 64 if (!now.hour.matches(schedule.hours)) { 65 auto h = timeTillNextHour(schedule.hours, now); 66 now += h; 67 dur += h; 68 continue; 69 } 70 return dur; 71 } 72 } 73 74 auto matches(uint value, Spec spec) { 75 import std.algorithm; 76 return spec.match!((Always a) => true, 77 (Exact e) => value == e.value, 78 (Every e) => (value - e.starting) % e.step == 0, 79 (Each e) => e.values.any!(v => v == value)); 80 } 81 82 auto timeTillNext(alias unit, uint cycle, alias selector)(Spec spec, SysTime now) { 83 import std.range : iota, empty, front; 84 import std.algorithm : find; 85 return spec.match!((Always a) { 86 return unit(1); 87 }, (Exact e) { 88 if (selector(now) >= e.value) 89 return unit(e.value + cycle - selector(now)); 90 return unit(e.value - selector(now)); 91 }, (Every e) { 92 auto next = iota(e.starting, uint.max, e.step).find!(v => v > selector(now)); 93 return unit(next.front() - selector(now)); 94 }, (Each e) { 95 auto next = e.values.find!(v => v > selector(now)); 96 if (next.empty) 97 return unit(e.values.front + cycle - selector(now)); 98 return unit(next.front() - selector(now)); 99 }); 100 } 101 102 auto timeTillNextMinute(Spec spec, SysTime now) { 103 import core.time; 104 return timeTillNext!(minutes, 60, i => i.minute)(spec, now) - seconds(now.second); 105 } 106 107 auto timeTillNextHour(Spec spec, SysTime now) { 108 import core.time; 109 return timeTillNext!(hours, 24, i => i.hour)(spec, now) - minutes(now.minute) - seconds(now.second); 110 }