1 /// taken from https://github.com/ikod/timingwheels
2 /// license: BSL-1.0
3 /+
4  Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following:
5 
6 The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor.
7 
8 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
9  +/
10 module concurrency.timingwheels;
11 
12 import std.datetime;
13 import std.exception;
14 import std.typecons;
15 import std.format;
16 import std.traits;
17 import std.range;
18 import std.algorithm;
19 import std.experimental.logger;
20 
21 import std.experimental.allocator;
22 import std.experimental.allocator.mallocator: Mallocator;
23 
24 import core.thread;
25 import core.memory;
26 
27 import ikod.containers.hashmap;
28 import automem;
29 import mir.algebraic : Nullable, nullable;
30 
31 version(twtesting)
32 {
33     import unit_threaded;
34 }
35 
36 
37 version(twtesting)
38 {
39     private class Timer
40     {
41         static ulong _current_id;
42         private
43         {
44             ulong   _id;
45         }
46         this() @safe @nogc
47         {
48             _id = _current_id;
49             _current_id++;
50         }
51         ~this() @safe @nogc
52         {
53 
54         }
55         ulong id() @safe @nogc
56         {
57             return _id;
58         }
59         override string toString()
60         {
61             return "%d".format(_id);
62         }
63     }
64 }
65 ///
66 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled.
67 ///
68 ///
69 class ScheduleTimerError: Exception
70 {
71     ///
72     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
73     {
74         super(msg, file, line);
75     }
76 }
77 ///
78 /// Cancel timer error occurs if you try to cancel timer which is not scheduled.
79 ///
80 class CancelTimerError: Exception
81 {
82     ///
83     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
84     {
85         super(msg, file, line);
86     }
87 }
88 ///
89 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256
90 ///
91 class AdvanceWheelError: Exception
92 {
93     ///
94     ///
95     ///
96     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
97     {
98         super(msg, file, line);
99     }
100 }
101 
102 debug(timingwheels) @safe @nogc nothrow
103 {
104     package
105     void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow
106     {
107         bool osx,ldc;
108         version(OSX)
109         {
110             osx = true;
111         }
112         version(LDC)
113         {
114             ldc = true;
115         }
116         debug (timingwheels) try
117         {
118             // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240
119             if (!osx || !ldc)
120             {
121                 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}();
122             }
123         }
124         catch(Exception e)
125         {
126         }
127     }    
128 }
129 
130 pragma(inline)
131 private void dl_insertFront(L)(L *le, L** head)
132 {
133     if ( *head == null)
134     {
135         le.next = le.prev = le;
136     }
137     else
138     {
139         auto curr_head = *head;
140         le.prev = curr_head.prev;
141         le.next = curr_head;
142         curr_head.prev.next = le;
143         curr_head.prev = le;
144     }
145     *head = le;
146 }
147 
148 pragma(inline)
149 private void dl_unlink(L)(L *le, L** head)
150 in(*head != null)
151 {
152     if (le.next == le && *head == le)
153     {
154         *head = null;
155         return;
156     }
157     if (le == *head)
158     {
159         *head = le.next;        
160     }
161     le.next.prev = le.prev;
162     le.prev.next = le.next;
163 }
164 pragma(inline)
165 private void dl_walk(L)(L** head)
166 {
167     if (*head == null)
168     {
169         return;
170     }
171     auto le = *head;
172     do
173     {
174         le = le.next;
175     } while (le != *head);
176 }
177 pragma(inline)
178 private void dl_relink(L)(L* le, L** head_from, L** head_to)
179 in(le.prev !is null && le.next !is null)
180 {
181     dl_unlink(le, head_from);
182     dl_insertFront(le, head_to);
183 }
184 
185 @("dl")
186 unittest
187 {
188     globalLogLevel = LogLevel.info;
189     struct LE
190     {
191         int p;
192         LE *next;
193         LE *prev;
194     }
195     LE* head1 = null;
196     LE* head2 = null;
197     auto le1 = new LE(1);
198     auto le2 = new LE(2);
199     dl_insertFront(le1, &head1);
200     assert(head1 != null);
201     dl_unlink(le1, &head1);
202     assert(head1 == null);
203 
204     dl_insertFront(le1, &head1);
205     assert(head1 != null);
206     dl_insertFront(le2, &head1);
207     dl_unlink(le1, &head1);
208     assert(head1 != null);
209     dl_unlink(le2, &head1);
210     assert(head1 == null);
211 
212     dl_insertFront(le1, &head1);
213     assert(head1 != null);
214     dl_insertFront(le2, &head1);
215     dl_unlink(le2, &head1);
216     assert(head1 != null);
217     dl_unlink(le1, &head1);
218     assert(head1 == null);
219 
220     dl_insertFront(le1, &head1);
221     dl_relink(le1, &head1, &head2);
222     assert(head1 == null);
223     assert(head2 != null);
224 }
225 ///
226 /// This structure implements scheme 6.2 thom the
227 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf)
228 /// and supports several primitives:
229 /// $(UL
230 /// $(LI schedule timer in the future.)
231 /// $(LI cancel timer.)
232 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.)
233 /// )
234 /// Each operation take O(1) time.
235 /// 
236 struct TimingWheels(T)
237 {
238     import core.bitop: bsr;
239 
240     private
241     {
242         alias TimerIdType = ReturnType!(T.id);
243         alias allocator = Mallocator.instance;
244 
245         enum MASK = 0xff;
246         enum LEVELS = 8;
247         enum LEVEL_MAX = LEVELS - 1;
248         enum SLOTS  = 256;
249         enum FreeListMaxLen = 100;
250 
251         struct ListElement(T)
252         {
253             private
254             {
255                 T               timer;
256                 ulong           scheduled_at;
257                 ushort          position;
258                 ListElement!T*  prev, next;
259             }
260         }
261         struct Slot
262         {
263             ListElement!T* head;
264         }
265         struct Level
266         {
267             // now if counter of ticks processed on this level
268             ulong       now;
269             Slot[SLOTS] slots;
270         }
271 
272         Level[LEVELS]   levels;
273         ListElement!T*  freeList;
274         int             freeListLen;
275         HashMap!(TimerIdType, ListElement!T*)
276                         ptrs;
277         long            startedAt;
278     }
279     invariant
280     {
281         assert(freeListLen>=0);
282     }
283     alias Ticks = ulong; // ticks are 64 bit unsigned integers.
284 
285     // hashing ticks to slots
286     // 8 levels, each level 256 slots, with of slot on each level 256 times
287     // translate ticks to level
288     // 0x00_00_00_00_00_00_00_00 <- ticks
289     //   ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓
290     //   □  □  □  □  □  □  □  □ \
291     //   □  □  □  □  □  □  □  □ |
292     //   .  .  .  .  .  .  .  . | 256 slots
293     //   .  .  .  .  .  .  .  . |
294     //   □  □  □  □  □  □  □  □ /
295     //   7  6  5  4  3  2  1  0
296     //                          <- 8 levels
297     // each slot - double linked list of timers
298 
299     // ticks to level = bsr(ticks)/8
300     pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow
301     {
302         if (t == 0)
303         {
304             return 0;
305         }
306         return bsr(t)/LEVELS;
307     }
308     // ticks to slot  = ticks >> (level*8)
309     pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow
310     {
311         return (t >> (l<<3)) & MASK;
312     }
313     // level to ticks
314     // l[0] -> 256
315     // l[1] -> 256*256
316     // ...
317     pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow
318     {
319         return SLOTS<<l;
320     }
321     ~this()
322     {
323         ptrs.clear;
324         for(int l=0;l<=LEVEL_MAX;l++)
325             for(int s=0; s<SLOTS; s++)
326             {
327                 while(levels[l].slots[s].head)
328                 {
329                     auto le = levels[l].slots[s].head;
330                     dl_unlink(le, &levels[l].slots[s].head);
331                     () @trusted {
332                         dispose(allocator, le);
333                     }();
334                 }
335             }
336         while(freeList)
337         {
338             assert(freeListLen>0);
339             auto n = freeList.next;
340             () @trusted {
341                 dispose(allocator, freeList);
342             }();
343             freeListLen--;
344             freeList = n;
345         }
346     }
347 
348     private ListElement!T* getOrCreate()
349     {
350         ListElement!T* result;
351         if (freeList !is null)
352         {
353             result = freeList;
354             freeList = freeList.next;
355             freeListLen--;
356             return result;
357         }
358         result = make!(ListElement!T)(allocator);
359         return result;
360     }
361     private void returnToFreeList(ListElement!T* le)
362     {
363         if ( freeListLen >= FreeListMaxLen )
364         {
365             // this can be safely disposed as we do not leak ListElements outide this module
366             () @trusted {
367                 dispose(allocator, le);
368             }();
369         }
370         else
371         {
372             le.position = 0xffff;
373             le.next = freeList;
374             freeList = le;
375             freeListLen++;
376         }
377     }
378     void init()
379     {
380         startedAt = Clock.currStdTime;
381     }
382     void init(ulong time) {
383         startedAt = time;
384     }
385     /++ 
386      + Return internal view on current time - it is time at the call to $(B init)
387      + plus total number of steps multiplied by $(B tick) duration.
388      + Params:
389      +   tick = tick duration
390      +/
391     auto currStdTime(Duration tick)
392     {
393         return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs;
394     }
395     ///
396     /// Schedule timer to $(B ticks) ticks forward from internal 'now'.
397     ///Params: 
398     /// timer = timer to schedule;
399     /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max);
400     ///Returns:
401     ///  void
402     ///Throws: 
403     /// ScheduleTimerError
404     ///   when thicks == 0
405     ///   or when timer already scheduled
406     ///
407     void schedule(T)(T timer, const ulong ticks)
408     {
409         if (ticks == 0)
410         {
411             throw new ScheduleTimerError("ticks can't be 0");
412         }
413         auto timer_id = timer.id();
414         if (ptrs.contains(timer_id))
415         {
416             throw new ScheduleTimerError("Timer already scheduled");
417         }
418         size_t level_index = 0;
419         long t = ticks;
420         long s = 1;     // width of the slot in ticks on level
421         long shift = 0;
422         while(t > s<<8) // while t > slots on level
423         {
424             t -= (SLOTS - (levels[level_index].now & MASK)) * s;
425             level_index++;
426             s = s << 8;
427             shift += 8;
428         }
429         auto level = &levels[level_index];
430         auto mask = s - 1;
431         size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK;
432         auto slot       = &levels[level_index].slots[slot_index];
433         debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now);
434         auto le = getOrCreate();
435         le.timer = timer;
436         le.position = ((level_index << 8 ) | slot_index) & 0xffff;
437         le.scheduled_at = levels[0].now + ticks;
438         dl_insertFront(le, &slot.head);
439         ptrs[timer_id] = le;
440         debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s",
441             timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index);
442     }
443     /// Cancel timer
444     ///Params: 
445     /// timer = timer to cancel
446     ///Returns: 
447     /// void
448     ///Throws: 
449     /// CancelTimerError
450     ///  if timer not in wheel
451     bool cancel(T)(T timer)
452     {
453         // get list element pointer
454         auto v = ptrs.fetch(timer.id());
455         if ( !v.ok )
456         {
457             return false;
458         }
459         auto le = v.value;
460         immutable level_index = le.position>>8;
461         immutable slot_index  = le.position & 0xff;
462         assert(timer is le.timer);
463         debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index);
464         dl_unlink(le, &levels[level_index].slots[slot_index].head);
465         returnToFreeList(le);
466         ptrs.remove(timer.id());
467         return true;
468     }
469     /// Number of ticks to rotate wheels until internal wheel 'now'
470     /// catch up with real world realTime.
471     /// Calculation based on time when wheels were stared and total 
472     /// numer of ticks pasded.
473     ///Params: 
474     /// tick = your tick length (Duration)
475     /// realTime = current real world now (Clock.currStdTime)
476     ///Returns: ticks to advance so that we catch up real world current time
477     int ticksToCatchUp(Duration tick, ulong realTime)
478     {
479         auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now;
480         auto v = (realTime - c) / tick.split!"hnsecs".hnsecs;
481         if ( v > 256 )
482         {
483             return 256;
484         }
485         return cast(int)v;
486     }
487     /// Time until next scheduled timer event.
488     /// You provide tick size and current real world time.
489     /// This function find ticks until next event and use time of the start and
490     /// total steps executed to calculate time delta from $(B realNow) to next event.
491     ///Params: 
492     /// tick = your accepted tick duration.
493     /// realNow = real world now, result of Clock.currStdTime
494     ///Returns: time until next event. Can be zero or negative in case you have already expired events.
495     ///
496     Nullable!Duration timeUntilNextEvent(const Duration tick, ulong realNow)
497     {
498         assert(startedAt>0, "Forgot to call init()?");
499         if (totalTimers == 0)
500             return typeof(return).init;
501         immutable n = ticksUntilNextEvent();
502         immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs;
503         auto delta =  (target - realNow).hnsecs;
504         debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt));
505         return nullable(delta);
506     }
507 
508     ///
509     /// Adnvance wheel and return all timers expired during wheel turn.
510     //
511     /// Params:
512     ///   ticks = how many ticks to advance. Must be in range 0 <= 256
513     /// Returns: list of expired timers
514     ///
515     import std.array : Appender;
516     void advance(this W)(ulong ticks, ref Appender!(T[]) app)
517     {
518         if (ticks > l2t(0))
519         {
520             throw new AdvanceWheelError("You can't advance that much");
521         }
522         if (ticks == 0)
523         {
524             throw new AdvanceWheelError("ticks must be > 0");
525         }
526         debug(timingwheels) safe_tracef("advancing %d ticks", ticks);
527         auto      level  = &levels[0];
528 
529         while(ticks)
530         {
531             ticks--;
532             immutable now           = ++level.now;
533             immutable slot_index    = now & MASK;
534             auto      slot = &level.slots[slot_index];
535             //debug(timingwheels) safe_tracef("level 0, now=%s", now);
536             while(slot.head)
537             {
538                 auto le = slot.head;
539                 auto timer = le.timer;
540                 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at);
541                 app.put(timer);
542                 dl_unlink(le, &slot.head);
543                 returnToFreeList(le);
544                 ptrs.remove(timer.id());
545             }
546             if (slot_index == 0)
547             {
548                 advance_level(1);
549             }
550         }
551     }
552     auto totalTimers() pure @safe @nogc
553     {
554         return ptrs.length();
555     }
556     auto allTimers() @safe @nogc
557     {
558         struct AllTimers
559         {
560             HashMap!(TimerIdType, T)    _map;
561             auto timers()
562             {
563                 return _map.byValue;
564             }
565             auto length()
566             {
567                 return _map.length();
568             }
569             bool contains(TimerIdType id)
570             {
571                 return _map.contains(id);
572             }
573         }
574         alias AllResult = automem.RefCounted!(AllTimers, Mallocator);
575         AllTimers result;
576         foreach (p; ptrs.byPair)
577         {
578             result._map[p.key] = p.value.timer;
579         }
580         return result;
581     }
582     //
583     // ticks until next event on level 0 or until next wheel rotation
584     // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up
585     // at the time when next timer have to be processed.
586     //Returns: number of safe "sleep" ticks.
587     //
588     private int ticksUntilNextEvent()
589     out(r; r<=256)
590     {
591         int result = 1;
592         auto level = &levels[0];
593         immutable uint now = levels[0].now & MASK;
594         auto slot = (now + 1) & MASK;
595         //assert(level.slots[now].head == null);
596         do
597         {
598             if (level.slots[slot].head != null)
599             {
600                 break;
601             }
602             result++;
603             slot = (slot + 1) & MASK;
604         }
605         while(slot != now);
606 
607         return min(result, 256-now);
608     }
609 
610     private void advance_level(int level_index)
611     in(level_index>0)
612     {
613         debug(timingwheels) safe_tracef("running advance on level %d", level_index);
614         immutable now0 = levels[0].now;
615         auto      level  = &levels[level_index];
616         immutable now    = ++level.now;
617         immutable slot_index = now & MASK;
618         debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now);
619         auto slot = &level.slots[slot_index];
620         debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index);
621         while(slot.head)
622         {
623             auto listElement = slot.head;
624 
625             immutable delta = listElement.scheduled_at - now0;
626             size_t lower_level_index = 0;
627             long t = delta;
628             size_t s = 1;     // width of the slot in ticks on level
629             size_t shift = 0;
630             while(t > s<<8) // while t > slots on level
631             {
632                 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s;
633                 lower_level_index++;
634                 s = s << 8;
635                 shift += 8;
636             }
637             auto mask = s - 1;
638             size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK;
639             debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)",
640                 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta);
641             listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff;
642             dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head);
643         }
644         if (slot_index == 0 && level_index < LEVEL_MAX)
645         {
646             advance_level(level_index+1);
647         }
648     }
649     void reset() {
650         this = TimingWheels!T();
651     }
652 
653 }
654 
655 version(twtesting):
656 
657 @("TimingWheels")
658 unittest
659 {
660     import std.stdio;
661     globalLogLevel = LogLevel.info;
662     TimingWheels!Timer w;
663     w.init();
664     assert(w.t2l(1) == 0);
665     assert(w.t2s(1, 0) == 1);
666     immutable t = 0x00_00_00_11_00_00_00_77;
667     immutable level = w.t2l(t);
668     assert(level==4);
669     immutable slot = w.t2s(t, level);
670     assert(slot == 0x11);
671     auto timer = new Timer();
672     () @nogc @safe {
673         w.schedule(timer, 2);
674         bool thrown;
675         // check that you can't schedule same timer twice
676         try
677         {
678             w.schedule(timer, 5);
679         }
680         catch(ScheduleTimerError e)
681         {
682             thrown = true;
683         }
684         assert(thrown);
685         thrown = false;
686         try
687         {
688             w.advance(1024);
689         }
690         catch(AdvanceWheelError e)
691         {
692             thrown = true;
693         }
694         assert(thrown);
695         thrown = false;
696         w.cancel(timer);
697         w.advance(1);
698     }();
699     w = TimingWheels!Timer();
700     w.init();
701     w.schedule(timer, 1);
702     auto r = w.advance(1);
703     assert(r.timers.count == 1);
704     w.schedule(timer, 256);
705     r = w.advance(255);
706     assert(r.timers.count == 0);
707     r = w.advance(1);
708     assert(r.timers.count == 1);
709     w.schedule(timer, 256*256);
710     int c;
711     for(int i=0;i<256;i++)
712     {
713         r = w.advance(256);
714         c += r.timers.count;
715     }
716     assert(c==1);
717 }
718 @("rt")
719 @Tags("noauto")
720 unittest
721 {
722     globalLogLevel = LogLevel.info;
723     TimingWheels!Timer w;
724     Duration Tick = 5.msecs;
725     w.init();
726     ulong now = Clock.currStdTime;
727     assert(now - w.currStdTime(Tick) < 5*10_000);
728     Thread.sleep(2*Tick);
729     now = Clock.currStdTime;
730     assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10);
731     auto toCatchUp = w.ticksToCatchUp(Tick, now);
732     toCatchUp.shouldEqual(2);
733     auto t = w.advance(toCatchUp);
734     toCatchUp = w.ticksToCatchUp(Tick, now);
735     toCatchUp.shouldEqual(0);
736 }
737 @("cancel")
738 unittest
739 {
740     globalLogLevel = LogLevel.info;
741     TimingWheels!Timer w;
742     w.init();
743     Timer timer0 = new Timer();
744     Timer timer1 = new Timer();
745     w.schedule(timer0, 256);
746     w.schedule(timer1, 256+128);
747     auto r = w.advance(255);
748     assert(r.timers.count == 0);
749     w.cancel(timer0);
750     r = w.advance(1);
751     assert(r.timers.count == 0);
752     assertThrown!CancelTimerError(w.cancel(timer0));
753     w.cancel(timer1);
754 }
755 @("ticksUntilNextEvent")
756 unittest
757 {
758     globalLogLevel = LogLevel.info;
759     TimingWheels!Timer w;
760     w.init();
761     auto s = w.ticksUntilNextEvent;
762     assert(s==256);
763     auto r = w.advance(s);
764     assert(r.timers.count == 0);
765     Timer t = new Timer();
766     w.schedule(t, 50);
767     s = w.ticksUntilNextEvent;
768     assert(s==50);
769     r = w.advance(s);
770     assert(r.timers.count == 1);
771 }
772 
773 @("load")
774 @Serial
775 unittest
776 {
777     import std.array:array;
778     globalLogLevel = LogLevel.info;
779     enum TIMERS = 100_000;
780     Timer._current_id = 1;
781     auto w = TimingWheels!Timer();
782     w.init();
783     for(int i=1;i<=TIMERS;i++)
784     {
785         auto t = new Timer();
786         w.schedule(t, i);
787     }
788     int counter;
789     for(int i=1;i<=TIMERS;i++)
790     {
791         auto r = w.advance(1);
792         auto timers = r.timers;
793         auto t = timers.array()[0];
794         assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i));
795         assert(timers.count == 1);
796         counter++;
797     }
798     assert(counter == TIMERS, "expected 100 timers, got %d".format(counter));
799 
800     for(int i=1;i<=TIMERS;i++)
801     {
802         auto t = new Timer();
803         w.schedule(t, i);
804     }
805     counter = 0;
806     for(int i=TIMERS+1;i<=2*TIMERS;i++)
807     {
808         auto r = w.advance(1);
809         auto timers = r.timers;
810         auto t = timers.array()[0];
811         assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i));
812         assert(timers.count == 1);
813         counter++;
814     }
815     assert(counter == TIMERS, "expected 100 timers, got %d".format(counter));
816 
817 }
818 // @("cornercase")
819 // @Serial
820 // unittest
821 // {
822 //     Timer._current_id = 1;
823 //     auto w = TimingWheels!Timer();
824 //     globalLogLevel = LogLevel.trace;
825 //     w.advance(254);
826 //     auto t = new Timer();
827 //     w.schedule(t, 511);
828 //     for(int i=0; i<511; i++)
829 //     {
830 //         w.advance(1);
831 //     }
832 // }
833 
834 ///
835 ///
836 ///
837 @("example")
838 @Tags("noauto")
839 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs)
840 @Serial
841 unittest
842 {
843     import std;
844     globalLogLevel = LogLevel.info;
845     auto rnd = Random(142);
846     auto Tick = getValue!Duration();
847     /// track execution
848     int  counter;
849     SysTime last;
850 
851     /// this is our Timer
852     class Timer
853     {
854         static ulong __id;
855         private ulong _id;
856         private string _name;
857         this(string name)
858         {
859             _id = __id++;
860             _name = name;
861         }
862         /// must provide id() method
863         ulong id()
864         {
865             return _id;
866         }
867     }
868 
869     enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs
870 
871     // each tick span 5 msecs - this is our link with time in reality
872     TimingWheels!Timer w;
873     w.init();
874     auto durationToTicks(Duration d)
875     {
876         // we have to adjust w.now and realtime 'now' before scheduling timer
877         auto real_now = Clock.currStdTime;
878         auto tw_now = w.currStdTime(Tick);
879         auto delay = (real_now - tw_now).hnsecs;
880         return (d + delay)/Tick;
881     }
882     void process_timer(Timer t)
883     {
884         switch(t._name)
885         {
886             case "periodic":
887                 if ( last.stdTime == 0)
888                 {
889                     // initialize tracking
890                     last = Clock.currTime - 50.msecs;
891                 }
892                 auto delta = Clock.currTime - last;
893                 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs));
894                 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs);
895                 last = Clock.currTime;
896                 counter++;
897                 w.schedule(t, durationToTicks(50.msecs)); // rearm
898                 break;
899             default:
900                 writefln("@ %s", t._name);
901                 break;
902         }
903     }
904     // emulate some random initial delay
905     auto randomInitialDelay = uniform(0, 500, rnd).msecs;
906     Thread.sleep(randomInitialDelay);
907     //
908     // start one arbitrary timer and one periodic timer
909     //
910     auto some_timer = new Timer("some");
911     auto periodic_timer = new Timer("periodic");
912     w.schedule(some_timer, durationToTicks(32.msecs));
913     w.schedule(periodic_timer, durationToTicks(50.msecs));
914 
915     while(counter < 10)
916     {
917         auto realNow = Clock.currStdTime;
918         auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs;
919         auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs);
920         // wait for what should happen earlier
921         auto time_to_sleep = min(randomIoInterval, nextTimerEvent);
922         writefln("* sleep until timer event or random I/O for %s", time_to_sleep);
923         Thread.sleep(time_to_sleep);
924         // make steps if required
925         int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime);
926         if (ticks > 0)
927         {
928             auto wr = w.advance(ticks);
929             foreach(t; wr.timers)
930             {
931                 process_timer(t);
932             }
933         }
934         // emulate some random processing time
935         Thread.sleep(uniform(0, 5, rnd).msecs);
936     }
937 }