1 /// taken from https://github.com/ikod/timingwheels
2 /// license: BSL-1.0
3 /+
4  Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following:
5 
6 The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor.
7 
8 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
9  +/
10 module concurrency.timingwheels;
11 
12 import std.datetime;
13 import std.exception;
14 import std.typecons;
15 import std.format;
16 import std.traits;
17 import std.range;
18 import std.algorithm;
19 import std.experimental.logger;
20 
21 import std.experimental.allocator;
22 import std.experimental.allocator.mallocator: Mallocator;
23 
24 import core.thread;
25 import core.memory;
26 
27 import ikod.containers.hashmap;
28 import automem;
29 import mir.algebraic : Nullable, nullable;
30 
31 version(twtesting)
32 {
33     import unit_threaded;
34 }
35 
36 
37 version(twtesting)
38 {
39     private class Timer
40     {
41         static ulong _current_id;
42         private
43         {
44             ulong   _id;
45         }
46         this() @safe @nogc
47         {
48             _id = _current_id;
49             _current_id++;
50         }
51         ~this() @safe @nogc
52         {
53 
54         }
55         ulong id() @safe @nogc
56         {
57             return _id;
58         }
59         override string toString()
60         {
61             return "%d".format(_id);
62         }
63     }
64 }
65 ///
66 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled.
67 ///
68 ///
69 class ScheduleTimerError: Exception
70 {
71     ///
72     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
73     {
74         super(msg, file, line);
75     }
76 }
77 ///
78 /// Cancel timer error occurs if you try to cancel timer which is not scheduled.
79 ///
80 class CancelTimerError: Exception
81 {
82     ///
83     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
84     {
85         super(msg, file, line);
86     }
87 }
88 ///
89 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256
90 ///
91 class AdvanceWheelError: Exception
92 {
93     ///
94     ///
95     ///
96     this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe
97     {
98         super(msg, file, line);
99     }
100 }
101 
102 debug(timingwheels) @safe @nogc nothrow
103 {
104     package
105     void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow
106     {
107         bool osx,ldc;
108         version(OSX)
109         {
110             osx = true;
111         }
112         version(LDC)
113         {
114             ldc = true;
115         }
116         debug (timingwheels) try
117         {
118             // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240
119             if (!osx || !ldc)
120             {
121                 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}();
122             }
123         }
124         catch(Exception e)
125         {
126         }
127     }    
128 }
129 
130 pragma(inline)
131 private void dl_insertFront(L)(L *le, L** head)
132 {
133     if ( *head == null)
134     {
135         le.next = le.prev = le;
136     }
137     else
138     {
139         auto curr_head = *head;
140         le.prev = curr_head.prev;
141         le.next = curr_head;
142         curr_head.prev.next = le;
143         curr_head.prev = le;
144     }
145     *head = le;
146 }
147 
148 pragma(inline)
149 private void dl_unlink(L)(L *le, L** head)
150 in(*head != null)
151 {
152     if (le.next == le && *head == le)
153     {
154         *head = null;
155         return;
156     }
157     if (le == *head)
158     {
159         *head = le.next;        
160     }
161     le.next.prev = le.prev;
162     le.prev.next = le.next;
163 }
164 pragma(inline)
165 private void dl_walk(L)(L** head)
166 {
167     if (*head == null)
168     {
169         return;
170     }
171     auto le = *head;
172     do
173     {
174         le = le.next;
175     } while (le != *head);
176 }
177 pragma(inline)
178 private void dl_relink(L)(L* le, L** head_from, L** head_to)
179 in(le.prev !is null && le.next !is null)
180 {
181     dl_unlink(le, head_from);
182     dl_insertFront(le, head_to);
183 }
184 
185 @("dl")
186 unittest
187 {
188     globalLogLevel = LogLevel.info;
189     struct LE
190     {
191         int p;
192         LE *next;
193         LE *prev;
194     }
195     LE* head1 = null;
196     LE* head2 = null;
197     auto le1 = new LE(1);
198     auto le2 = new LE(2);
199     dl_insertFront(le1, &head1);
200     assert(head1 != null);
201     dl_unlink(le1, &head1);
202     assert(head1 == null);
203 
204     dl_insertFront(le1, &head1);
205     assert(head1 != null);
206     dl_insertFront(le2, &head1);
207     dl_unlink(le1, &head1);
208     assert(head1 != null);
209     dl_unlink(le2, &head1);
210     assert(head1 == null);
211 
212     dl_insertFront(le1, &head1);
213     assert(head1 != null);
214     dl_insertFront(le2, &head1);
215     dl_unlink(le2, &head1);
216     assert(head1 != null);
217     dl_unlink(le1, &head1);
218     assert(head1 == null);
219 
220     dl_insertFront(le1, &head1);
221     dl_relink(le1, &head1, &head2);
222     assert(head1 == null);
223     assert(head2 != null);
224 }
225 ///
226 /// This structure implements scheme 6.2 thom the
227 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf)
228 /// and supports several primitives:
229 /// $(UL
230 /// $(LI schedule timer in the future.)
231 /// $(LI cancel timer.)
232 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.)
233 /// )
234 /// Each operation take O(1) time.
235 /// 
236 struct TimingWheels(T)
237 {
238     import core.bitop: bsr;
239 
240     private
241     {
242         alias TimerIdType = ReturnType!(T.id);
243         alias allocator = Mallocator.instance;
244 
245         enum MASK = 0xff;
246         enum LEVELS = 8;
247         enum LEVEL_MAX = LEVELS - 1;
248         enum SLOTS  = 256;
249         enum FreeListMaxLen = 100;
250 
251         struct ListElement(T)
252         {
253             private
254             {
255                 T               timer;
256                 ulong           scheduled_at;
257                 ushort          position;
258                 ListElement!T*  prev, next;
259             }
260         }
261         struct Slot
262         {
263             ListElement!T* head;
264         }
265         struct Level
266         {
267             // now if counter of ticks processed on this level
268             ulong       now;
269             Slot[SLOTS] slots;
270         }
271 
272         Level[LEVELS]   levels;
273         ListElement!T*  freeList;
274         int             freeListLen;
275         HashMap!(TimerIdType, ListElement!T*)
276                         ptrs;
277         long            startedAt;
278     }
279     invariant
280     {
281         assert(freeListLen>=0);
282     }
283     alias Ticks = ulong; // ticks are 64 bit unsigned integers.
284 
285     // hashing ticks to slots
286     // 8 levels, each level 256 slots, with of slot on each level 256 times
287     // translate ticks to level
288     // 0x00_00_00_00_00_00_00_00 <- ticks
289     //   ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓
290     //   □  □  □  □  □  □  □  □ \
291     //   □  □  □  □  □  □  □  □ |
292     //   .  .  .  .  .  .  .  . | 256 slots
293     //   .  .  .  .  .  .  .  . |
294     //   □  □  □  □  □  □  □  □ /
295     //   7  6  5  4  3  2  1  0
296     //                          <- 8 levels
297     // each slot - double linked list of timers
298 
299     // ticks to level = bsr(ticks)/8
300     pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow
301     {
302         if (t == 0)
303         {
304             return 0;
305         }
306         return bsr(t)/LEVELS;
307     }
308     // ticks to slot  = ticks >> (level*8)
309     pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow
310     {
311         return (t >> (l<<3)) & MASK;
312     }
313     // level to ticks
314     // l[0] -> 256
315     // l[1] -> 256*256
316     // ...
317     pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow
318     {
319         return SLOTS<<l;
320     }
321     ~this()
322     {
323         ptrs.clear;
324         for(int l=0;l<=LEVEL_MAX;l++)
325             for(int s=0; s<SLOTS; s++)
326             {
327                 while(levels[l].slots[s].head)
328                 {
329                     auto le = levels[l].slots[s].head;
330                     dl_unlink(le, &levels[l].slots[s].head);
331                     () @trusted {
332                         dispose(allocator, le);
333                     }();
334                 }
335             }
336         while(freeList)
337         {
338             assert(freeListLen>0);
339             auto n = freeList.next;
340             () @trusted {
341                 dispose(allocator, freeList);
342             }();
343             freeListLen--;
344             freeList = n;
345         }
346     }
347 
348     private ListElement!T* getOrCreate()
349     {
350         ListElement!T* result;
351         if (freeList !is null)
352         {
353             result = freeList;
354             freeList = freeList.next;
355             freeListLen--;
356             return result;
357         }
358         result = make!(ListElement!T)(allocator);
359         return result;
360     }
361     private void returnToFreeList(ListElement!T* le)
362     {
363         if ( freeListLen >= FreeListMaxLen )
364         {
365             // this can be safely disposed as we do not leak ListElements outide this module
366             () @trusted {
367                 dispose(allocator, le);
368             }();
369         }
370         else
371         {
372             le.position = 0xffff;
373             le.next = freeList;
374             freeList = le;
375             freeListLen++;
376         }
377     }
378     void init()
379     {
380         startedAt = Clock.currStdTime;
381     }
382     void init(ulong time) {
383         startedAt = time;
384     }
385     /++ 
386      + Return internal view on current time - it is time at the call to $(B init)
387      + plus total number of steps multiplied by $(B tick) duration.
388      + Params:
389      +   tick = tick duration
390      +/
391     auto currStdTime(Duration tick)
392     {
393         return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs;
394     }
395     ///
396     /// Schedule timer to $(B ticks) ticks forward from internal 'now'.
397     ///Params: 
398     /// timer = timer to schedule;
399     /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max);
400     ///Returns:
401     ///  void
402     ///Throws: 
403     /// ScheduleTimerError
404     ///   when thicks == 0
405     ///   or when timer already scheduled
406     ///
407     void schedule(T)(T timer, const ulong ticks)
408     {
409         if (ticks == 0)
410         {
411             throw new ScheduleTimerError("ticks can't be 0");
412         }
413         auto timer_id = timer.id();
414         if (ptrs.contains(timer_id))
415         {
416             throw new ScheduleTimerError("Timer already scheduled");
417         }
418         size_t level_index = 0;
419         long t = ticks;
420         long s = 1;     // width of the slot in ticks on level
421         long shift = 0;
422         while(t > s<<8) // while t > slots on level
423         {
424             t -= (SLOTS - (levels[level_index].now & MASK)) * s;
425             level_index++;
426             s = s << 8;
427             shift += 8;
428         }
429         auto level = &levels[level_index];
430         auto mask = s - 1;
431         size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK;
432         auto slot       = &levels[level_index].slots[slot_index];
433         debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now);
434         auto le = getOrCreate();
435         le.timer = timer;
436         le.position = ((level_index << 8 ) | slot_index) & 0xffff;
437         le.scheduled_at = levels[0].now + ticks;
438         dl_insertFront(le, &slot.head);
439         ptrs[timer_id] = le;
440         debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s",
441             timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index);
442     }
443     /// Cancel timer
444     ///Params: 
445     /// timer = timer to cancel
446     ///Returns: 
447     /// void
448     ///Throws: 
449     /// CancelTimerError
450     ///  if timer not in wheel
451     void cancel(T)(T timer)
452     {
453         // get list element pointer
454         auto v = ptrs.fetch(timer.id());
455         if ( !v.ok )
456         {
457             return;
458         }
459         auto le = v.value;
460         immutable level_index = le.position>>8;
461         immutable slot_index  = le.position & 0xff;
462         assert(timer is le.timer);
463         debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index);
464         dl_unlink(le, &levels[level_index].slots[slot_index].head);
465         returnToFreeList(le);
466         ptrs.remove(timer.id());
467     }
468     /// Number of ticks to rotate wheels until internal wheel 'now'
469     /// catch up with real world realTime.
470     /// Calculation based on time when wheels were stared and total 
471     /// numer of ticks pasded.
472     ///Params: 
473     /// tick = your tick length (Duration)
474     /// realTime = current real world now (Clock.currStdTime)
475     ///Returns: ticks to advance so that we catch up real world current time
476     int ticksToCatchUp(Duration tick, ulong realTime)
477     {
478         auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now;
479         auto v = (realTime - c) / tick.split!"hnsecs".hnsecs;
480         if ( v > 256 )
481         {
482             return 256;
483         }
484         return cast(int)v;
485     }
486     /// Time until next scheduled timer event.
487     /// You provide tick size and current real world time.
488     /// This function find ticks until next event and use time of the start and
489     /// total steps executed to calculate time delta from $(B realNow) to next event.
490     ///Params: 
491     /// tick = your accepted tick duration.
492     /// realNow = real world now, result of Clock.currStdTime
493     ///Returns: time until next event. Can be zero or negative in case you have already expired events.
494     ///
495     Nullable!Duration timeUntilNextEvent(const Duration tick, ulong realNow)
496     {
497         assert(startedAt>0, "Forgot to call init()?");
498         if (totalTimers == 0)
499             return typeof(return).init;
500         immutable n = ticksUntilNextEvent();
501         immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs;
502         auto delta =  (target - realNow).hnsecs;
503         debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt));
504         return nullable(delta);
505     }
506 
507     ///
508     /// Adnvance wheel and return all timers expired during wheel turn.
509     //
510     /// Params:
511     ///   ticks = how many ticks to advance. Must be in range 0 <= 256
512     /// Returns: list of expired timers
513     ///
514     import std.array : Appender;
515     void advance(this W)(ulong ticks, ref Appender!(T[]) app)
516     {
517         if (ticks > l2t(0))
518         {
519             throw new AdvanceWheelError("You can't advance that much");
520         }
521         if (ticks == 0)
522         {
523             throw new AdvanceWheelError("ticks must be > 0");
524         }
525         debug(timingwheels) safe_tracef("advancing %d ticks", ticks);
526         auto      level  = &levels[0];
527 
528         while(ticks)
529         {
530             ticks--;
531             immutable now           = ++level.now;
532             immutable slot_index    = now & MASK;
533             auto      slot = &level.slots[slot_index];
534             //debug(timingwheels) safe_tracef("level 0, now=%s", now);
535             while(slot.head)
536             {
537                 auto le = slot.head;
538                 auto timer = le.timer;
539                 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at);
540                 app.put(timer);
541                 dl_unlink(le, &slot.head);
542                 returnToFreeList(le);
543                 ptrs.remove(timer.id());
544             }
545             if (slot_index == 0)
546             {
547                 advance_level(1);
548             }
549         }
550     }
551     auto totalTimers() pure @safe @nogc
552     {
553         return ptrs.length();
554     }
555     auto allTimers() @safe @nogc
556     {
557         struct AllTimers
558         {
559             HashMap!(TimerIdType, T)    _map;
560             auto timers()
561             {
562                 return _map.byValue;
563             }
564             auto length()
565             {
566                 return _map.length();
567             }
568             bool contains(TimerIdType id)
569             {
570                 return _map.contains(id);
571             }
572         }
573         alias AllResult = automem.RefCounted!(AllTimers, Mallocator);
574         AllTimers result;
575         foreach (p; ptrs.byPair)
576         {
577             result._map[p.key] = p.value.timer;
578         }
579         return result;
580     }
581     //
582     // ticks until next event on level 0 or until next wheel rotation
583     // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up
584     // at the time when next timer have to be processed.
585     //Returns: number of safe "sleep" ticks.
586     //
587     private int ticksUntilNextEvent()
588     out(r; r<=256)
589     {
590         int result = 1;
591         auto level = &levels[0];
592         immutable uint now = levels[0].now & MASK;
593         auto slot = (now + 1) & MASK;
594         //assert(level.slots[now].head == null);
595         do
596         {
597             if (level.slots[slot].head != null)
598             {
599                 break;
600             }
601             result++;
602             slot = (slot + 1) & MASK;
603         }
604         while(slot != now);
605 
606         return min(result, 256-now);
607     }
608 
609     private void advance_level(int level_index)
610     in(level_index>0)
611     {
612         debug(timingwheels) safe_tracef("running advance on level %d", level_index);
613         immutable now0 = levels[0].now;
614         auto      level  = &levels[level_index];
615         immutable now    = ++level.now;
616         immutable slot_index = now & MASK;
617         debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now);
618         auto slot = &level.slots[slot_index];
619         debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index);
620         while(slot.head)
621         {
622             auto listElement = slot.head;
623 
624             immutable delta = listElement.scheduled_at - now0;
625             size_t lower_level_index = 0;
626             long t = delta;
627             size_t s = 1;     // width of the slot in ticks on level
628             size_t shift = 0;
629             while(t > s<<8) // while t > slots on level
630             {
631                 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s;
632                 lower_level_index++;
633                 s = s << 8;
634                 shift += 8;
635             }
636             auto mask = s - 1;
637             size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK;
638             debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)",
639                 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta);
640             listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff;
641             dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head);
642         }
643         if (slot_index == 0 && level_index < LEVEL_MAX)
644         {
645             advance_level(level_index+1);
646         }
647     }
648 }
649 
650 version(twtesting):
651 
652 @("TimingWheels")
653 unittest
654 {
655     import std.stdio;
656     globalLogLevel = LogLevel.info;
657     TimingWheels!Timer w;
658     w.init();
659     assert(w.t2l(1) == 0);
660     assert(w.t2s(1, 0) == 1);
661     immutable t = 0x00_00_00_11_00_00_00_77;
662     immutable level = w.t2l(t);
663     assert(level==4);
664     immutable slot = w.t2s(t, level);
665     assert(slot == 0x11);
666     auto timer = new Timer();
667     () @nogc @safe {
668         w.schedule(timer, 2);
669         bool thrown;
670         // check that you can't schedule same timer twice
671         try
672         {
673             w.schedule(timer, 5);
674         }
675         catch(ScheduleTimerError e)
676         {
677             thrown = true;
678         }
679         assert(thrown);
680         thrown = false;
681         try
682         {
683             w.advance(1024);
684         }
685         catch(AdvanceWheelError e)
686         {
687             thrown = true;
688         }
689         assert(thrown);
690         thrown = false;
691         w.cancel(timer);
692         w.advance(1);
693     }();
694     w = TimingWheels!Timer();
695     w.init();
696     w.schedule(timer, 1);
697     auto r = w.advance(1);
698     assert(r.timers.count == 1);
699     w.schedule(timer, 256);
700     r = w.advance(255);
701     assert(r.timers.count == 0);
702     r = w.advance(1);
703     assert(r.timers.count == 1);
704     w.schedule(timer, 256*256);
705     int c;
706     for(int i=0;i<256;i++)
707     {
708         r = w.advance(256);
709         c += r.timers.count;
710     }
711     assert(c==1);
712 }
713 @("rt")
714 @Tags("noauto")
715 unittest
716 {
717     globalLogLevel = LogLevel.info;
718     TimingWheels!Timer w;
719     Duration Tick = 5.msecs;
720     w.init();
721     ulong now = Clock.currStdTime;
722     assert(now - w.currStdTime(Tick) < 5*10_000);
723     Thread.sleep(2*Tick);
724     now = Clock.currStdTime;
725     assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10);
726     auto toCatchUp = w.ticksToCatchUp(Tick, now);
727     toCatchUp.shouldEqual(2);
728     auto t = w.advance(toCatchUp);
729     toCatchUp = w.ticksToCatchUp(Tick, now);
730     toCatchUp.shouldEqual(0);
731 }
732 @("cancel")
733 unittest
734 {
735     globalLogLevel = LogLevel.info;
736     TimingWheels!Timer w;
737     w.init();
738     Timer timer0 = new Timer();
739     Timer timer1 = new Timer();
740     w.schedule(timer0, 256);
741     w.schedule(timer1, 256+128);
742     auto r = w.advance(255);
743     assert(r.timers.count == 0);
744     w.cancel(timer0);
745     r = w.advance(1);
746     assert(r.timers.count == 0);
747     assertThrown!CancelTimerError(w.cancel(timer0));
748     w.cancel(timer1);
749 }
750 @("ticksUntilNextEvent")
751 unittest
752 {
753     globalLogLevel = LogLevel.info;
754     TimingWheels!Timer w;
755     w.init();
756     auto s = w.ticksUntilNextEvent;
757     assert(s==256);
758     auto r = w.advance(s);
759     assert(r.timers.count == 0);
760     Timer t = new Timer();
761     w.schedule(t, 50);
762     s = w.ticksUntilNextEvent;
763     assert(s==50);
764     r = w.advance(s);
765     assert(r.timers.count == 1);
766 }
767 
768 @("load")
769 @Serial
770 unittest
771 {
772     import std.array:array;
773     globalLogLevel = LogLevel.info;
774     enum TIMERS = 100_000;
775     Timer._current_id = 1;
776     auto w = TimingWheels!Timer();
777     w.init();
778     for(int i=1;i<=TIMERS;i++)
779     {
780         auto t = new Timer();
781         w.schedule(t, i);
782     }
783     int counter;
784     for(int i=1;i<=TIMERS;i++)
785     {
786         auto r = w.advance(1);
787         auto timers = r.timers;
788         auto t = timers.array()[0];
789         assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i));
790         assert(timers.count == 1);
791         counter++;
792     }
793     assert(counter == TIMERS, "expected 100 timers, got %d".format(counter));
794 
795     for(int i=1;i<=TIMERS;i++)
796     {
797         auto t = new Timer();
798         w.schedule(t, i);
799     }
800     counter = 0;
801     for(int i=TIMERS+1;i<=2*TIMERS;i++)
802     {
803         auto r = w.advance(1);
804         auto timers = r.timers;
805         auto t = timers.array()[0];
806         assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i));
807         assert(timers.count == 1);
808         counter++;
809     }
810     assert(counter == TIMERS, "expected 100 timers, got %d".format(counter));
811 
812 }
813 // @("cornercase")
814 // @Serial
815 // unittest
816 // {
817 //     Timer._current_id = 1;
818 //     auto w = TimingWheels!Timer();
819 //     globalLogLevel = LogLevel.trace;
820 //     w.advance(254);
821 //     auto t = new Timer();
822 //     w.schedule(t, 511);
823 //     for(int i=0; i<511; i++)
824 //     {
825 //         w.advance(1);
826 //     }
827 // }
828 
829 ///
830 ///
831 ///
832 @("example")
833 @Tags("noauto")
834 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs)
835 @Serial
836 unittest
837 {
838     import std;
839     globalLogLevel = LogLevel.info;
840     auto rnd = Random(142);
841     auto Tick = getValue!Duration();
842     /// track execution
843     int  counter;
844     SysTime last;
845 
846     /// this is our Timer
847     class Timer
848     {
849         static ulong __id;
850         private ulong _id;
851         private string _name;
852         this(string name)
853         {
854             _id = __id++;
855             _name = name;
856         }
857         /// must provide id() method
858         ulong id()
859         {
860             return _id;
861         }
862     }
863 
864     enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs
865 
866     // each tick span 5 msecs - this is our link with time in reality
867     TimingWheels!Timer w;
868     w.init();
869     auto durationToTicks(Duration d)
870     {
871         // we have to adjust w.now and realtime 'now' before scheduling timer
872         auto real_now = Clock.currStdTime;
873         auto tw_now = w.currStdTime(Tick);
874         auto delay = (real_now - tw_now).hnsecs;
875         return (d + delay)/Tick;
876     }
877     void process_timer(Timer t)
878     {
879         switch(t._name)
880         {
881             case "periodic":
882                 if ( last.stdTime == 0)
883                 {
884                     // initialize tracking
885                     last = Clock.currTime - 50.msecs;
886                 }
887                 auto delta = Clock.currTime - last;
888                 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs));
889                 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs);
890                 last = Clock.currTime;
891                 counter++;
892                 w.schedule(t, durationToTicks(50.msecs)); // rearm
893                 break;
894             default:
895                 writefln("@ %s", t._name);
896                 break;
897         }
898     }
899     // emulate some random initial delay
900     auto randomInitialDelay = uniform(0, 500, rnd).msecs;
901     Thread.sleep(randomInitialDelay);
902     //
903     // start one arbitrary timer and one periodic timer
904     //
905     auto some_timer = new Timer("some");
906     auto periodic_timer = new Timer("periodic");
907     w.schedule(some_timer, durationToTicks(32.msecs));
908     w.schedule(periodic_timer, durationToTicks(50.msecs));
909 
910     while(counter < 10)
911     {
912         auto realNow = Clock.currStdTime;
913         auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs;
914         auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs);
915         // wait for what should happen earlier
916         auto time_to_sleep = min(randomIoInterval, nextTimerEvent);
917         writefln("* sleep until timer event or random I/O for %s", time_to_sleep);
918         Thread.sleep(time_to_sleep);
919         // make steps if required
920         int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime);
921         if (ticks > 0)
922         {
923             auto wr = w.advance(ticks);
924             foreach(t; wr.timers)
925             {
926                 process_timer(t);
927             }
928         }
929         // emulate some random processing time
930         Thread.sleep(uniform(0, 5, rnd).msecs);
931     }
932 }