1 /// taken from https://github.com/ikod/timingwheels 2 /// license: BSL-1.0 3 /+ 4 Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following: 5 6 The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor. 7 8 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 9 +/ 10 module concurrency.timingwheels; 11 12 import std.datetime; 13 import std.exception; 14 import std.typecons; 15 import std.format; 16 import std.traits; 17 import std.range; 18 import std.algorithm; 19 import std.experimental.logger; 20 21 import std.experimental.allocator; 22 import std.experimental.allocator.mallocator: Mallocator; 23 24 import core.thread; 25 import core.memory; 26 27 import ikod.containers.hashmap; 28 import automem; 29 import mir.algebraic : Nullable, nullable; 30 31 version(twtesting) 32 { 33 import unit_threaded; 34 } 35 36 37 version(twtesting) 38 { 39 private class Timer 40 { 41 static ulong _current_id; 42 private 43 { 44 ulong _id; 45 } 46 this() @safe @nogc 47 { 48 _id = _current_id; 49 _current_id++; 50 } 51 ~this() @safe @nogc 52 { 53 54 } 55 ulong id() @safe @nogc 56 { 57 return _id; 58 } 59 override string toString() 60 { 61 return "%d".format(_id); 62 } 63 } 64 } 65 /// 66 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 67 /// 68 /// 69 class ScheduleTimerError: Exception 70 { 71 /// 72 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 73 { 74 super(msg, file, line); 75 } 76 } 77 /// 78 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 79 /// 80 class CancelTimerError: Exception 81 { 82 /// 83 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 84 { 85 super(msg, file, line); 86 } 87 } 88 /// 89 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 90 /// 91 class AdvanceWheelError: Exception 92 { 93 /// 94 /// 95 /// 96 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 97 { 98 super(msg, file, line); 99 } 100 } 101 102 debug(timingwheels) @safe @nogc nothrow 103 { 104 package 105 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 106 { 107 bool osx,ldc; 108 version(OSX) 109 { 110 osx = true; 111 } 112 version(LDC) 113 { 114 ldc = true; 115 } 116 debug (timingwheels) try 117 { 118 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 119 if (!osx || !ldc) 120 { 121 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 122 } 123 } 124 catch(Exception e) 125 { 126 } 127 } 128 } 129 130 pragma(inline) 131 private void dl_insertFront(L)(L *le, L** head) 132 { 133 if ( *head == null) 134 { 135 le.next = le.prev = le; 136 } 137 else 138 { 139 auto curr_head = *head; 140 le.prev = curr_head.prev; 141 le.next = curr_head; 142 curr_head.prev.next = le; 143 curr_head.prev = le; 144 } 145 *head = le; 146 } 147 148 pragma(inline) 149 private void dl_unlink(L)(L *le, L** head) 150 in(*head != null) 151 { 152 if (le.next == le && *head == le) 153 { 154 *head = null; 155 return; 156 } 157 if (le == *head) 158 { 159 *head = le.next; 160 } 161 le.next.prev = le.prev; 162 le.prev.next = le.next; 163 } 164 pragma(inline) 165 private void dl_walk(L)(L** head) 166 { 167 if (*head == null) 168 { 169 return; 170 } 171 auto le = *head; 172 do 173 { 174 le = le.next; 175 } while (le != *head); 176 } 177 pragma(inline) 178 private void dl_relink(L)(L* le, L** head_from, L** head_to) 179 in(le.prev !is null && le.next !is null) 180 { 181 dl_unlink(le, head_from); 182 dl_insertFront(le, head_to); 183 } 184 185 @("dl") 186 unittest 187 { 188 globalLogLevel = LogLevel.info; 189 struct LE 190 { 191 int p; 192 LE *next; 193 LE *prev; 194 } 195 LE* head1 = null; 196 LE* head2 = null; 197 auto le1 = new LE(1); 198 auto le2 = new LE(2); 199 dl_insertFront(le1, &head1); 200 assert(head1 != null); 201 dl_unlink(le1, &head1); 202 assert(head1 == null); 203 204 dl_insertFront(le1, &head1); 205 assert(head1 != null); 206 dl_insertFront(le2, &head1); 207 dl_unlink(le1, &head1); 208 assert(head1 != null); 209 dl_unlink(le2, &head1); 210 assert(head1 == null); 211 212 dl_insertFront(le1, &head1); 213 assert(head1 != null); 214 dl_insertFront(le2, &head1); 215 dl_unlink(le2, &head1); 216 assert(head1 != null); 217 dl_unlink(le1, &head1); 218 assert(head1 == null); 219 220 dl_insertFront(le1, &head1); 221 dl_relink(le1, &head1, &head2); 222 assert(head1 == null); 223 assert(head2 != null); 224 } 225 /// 226 /// This structure implements scheme 6.2 thom the 227 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf) 228 /// and supports several primitives: 229 /// $(UL 230 /// $(LI schedule timer in the future.) 231 /// $(LI cancel timer.) 232 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.) 233 /// ) 234 /// Each operation take O(1) time. 235 /// 236 struct TimingWheels(T) 237 { 238 import core.bitop: bsr; 239 240 private 241 { 242 alias TimerIdType = ReturnType!(T.id); 243 alias allocator = Mallocator.instance; 244 245 enum MASK = 0xff; 246 enum LEVELS = 8; 247 enum LEVEL_MAX = LEVELS - 1; 248 enum SLOTS = 256; 249 enum FreeListMaxLen = 100; 250 251 struct ListElement(T) 252 { 253 private 254 { 255 T timer; 256 ulong scheduled_at; 257 ushort position; 258 ListElement!T* prev, next; 259 } 260 } 261 struct Slot 262 { 263 ListElement!T* head; 264 } 265 struct Level 266 { 267 // now if counter of ticks processed on this level 268 ulong now; 269 Slot[SLOTS] slots; 270 } 271 272 Level[LEVELS] levels; 273 ListElement!T* freeList; 274 int freeListLen; 275 HashMap!(TimerIdType, ListElement!T*) 276 ptrs; 277 long startedAt; 278 } 279 invariant 280 { 281 assert(freeListLen>=0); 282 } 283 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 284 285 // hashing ticks to slots 286 // 8 levels, each level 256 slots, with of slot on each level 256 times 287 // translate ticks to level 288 // 0x00_00_00_00_00_00_00_00 <- ticks 289 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 290 // □ □ □ □ □ □ □ □ \ 291 // □ □ □ □ □ □ □ □ | 292 // . . . . . . . . | 256 slots 293 // . . . . . . . . | 294 // □ □ □ □ □ □ □ □ / 295 // 7 6 5 4 3 2 1 0 296 // <- 8 levels 297 // each slot - double linked list of timers 298 299 // ticks to level = bsr(ticks)/8 300 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 301 { 302 if (t == 0) 303 { 304 return 0; 305 } 306 return bsr(t)/LEVELS; 307 } 308 // ticks to slot = ticks >> (level*8) 309 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 310 { 311 return (t >> (l<<3)) & MASK; 312 } 313 // level to ticks 314 // l[0] -> 256 315 // l[1] -> 256*256 316 // ... 317 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 318 { 319 return SLOTS<<l; 320 } 321 ~this() 322 { 323 ptrs.clear; 324 for(int l=0;l<=LEVEL_MAX;l++) 325 for(int s=0; s<SLOTS; s++) 326 { 327 while(levels[l].slots[s].head) 328 { 329 auto le = levels[l].slots[s].head; 330 dl_unlink(le, &levels[l].slots[s].head); 331 () @trusted { 332 dispose(allocator, le); 333 }(); 334 } 335 } 336 while(freeList) 337 { 338 assert(freeListLen>0); 339 auto n = freeList.next; 340 () @trusted { 341 dispose(allocator, freeList); 342 }(); 343 freeListLen--; 344 freeList = n; 345 } 346 } 347 348 private ListElement!T* getOrCreate() 349 { 350 ListElement!T* result; 351 if (freeList !is null) 352 { 353 result = freeList; 354 freeList = freeList.next; 355 freeListLen--; 356 return result; 357 } 358 result = make!(ListElement!T)(allocator); 359 return result; 360 } 361 private void returnToFreeList(ListElement!T* le) 362 { 363 if ( freeListLen >= FreeListMaxLen ) 364 { 365 // this can be safely disposed as we do not leak ListElements outide this module 366 () @trusted { 367 dispose(allocator, le); 368 }(); 369 } 370 else 371 { 372 le.position = 0xffff; 373 le.next = freeList; 374 freeList = le; 375 freeListLen++; 376 } 377 } 378 void init() 379 { 380 startedAt = Clock.currStdTime; 381 } 382 void init(ulong time) { 383 startedAt = time; 384 } 385 /++ 386 + Return internal view on current time - it is time at the call to $(B init) 387 + plus total number of steps multiplied by $(B tick) duration. 388 + Params: 389 + tick = tick duration 390 +/ 391 auto currStdTime(Duration tick) 392 { 393 return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs; 394 } 395 /// 396 /// Schedule timer to $(B ticks) ticks forward from internal 'now'. 397 ///Params: 398 /// timer = timer to schedule; 399 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 400 ///Returns: 401 /// void 402 ///Throws: 403 /// ScheduleTimerError 404 /// when thicks == 0 405 /// or when timer already scheduled 406 /// 407 void schedule(T)(T timer, const ulong ticks) 408 { 409 if (ticks == 0) 410 { 411 throw new ScheduleTimerError("ticks can't be 0"); 412 } 413 auto timer_id = timer.id(); 414 if (ptrs.contains(timer_id)) 415 { 416 throw new ScheduleTimerError("Timer already scheduled"); 417 } 418 size_t level_index = 0; 419 long t = ticks; 420 long s = 1; // width of the slot in ticks on level 421 long shift = 0; 422 while(t > s<<8) // while t > slots on level 423 { 424 t -= (SLOTS - (levels[level_index].now & MASK)) * s; 425 level_index++; 426 s = s << 8; 427 shift += 8; 428 } 429 auto level = &levels[level_index]; 430 auto mask = s - 1; 431 size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 432 auto slot = &levels[level_index].slots[slot_index]; 433 debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now); 434 auto le = getOrCreate(); 435 le.timer = timer; 436 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 437 le.scheduled_at = levels[0].now + ticks; 438 dl_insertFront(le, &slot.head); 439 ptrs[timer_id] = le; 440 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s", 441 timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index); 442 } 443 /// Cancel timer 444 ///Params: 445 /// timer = timer to cancel 446 ///Returns: 447 /// void 448 ///Throws: 449 /// CancelTimerError 450 /// if timer not in wheel 451 bool cancel(T)(T timer) 452 { 453 // get list element pointer 454 auto v = ptrs.fetch(timer.id()); 455 if ( !v.ok ) 456 { 457 return false; 458 } 459 auto le = v.value; 460 immutable level_index = le.position>>8; 461 immutable slot_index = le.position & 0xff; 462 assert(timer is le.timer); 463 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 464 dl_unlink(le, &levels[level_index].slots[slot_index].head); 465 returnToFreeList(le); 466 ptrs.remove(timer.id()); 467 return true; 468 } 469 /// Number of ticks to rotate wheels until internal wheel 'now' 470 /// catch up with real world realTime. 471 /// Calculation based on time when wheels were stared and total 472 /// numer of ticks pasded. 473 ///Params: 474 /// tick = your tick length (Duration) 475 /// realTime = current real world now (Clock.currStdTime) 476 ///Returns: ticks to advance so that we catch up real world current time 477 int ticksToCatchUp(Duration tick, ulong realTime) 478 { 479 auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now; 480 auto v = (realTime - c) / tick.split!"hnsecs".hnsecs; 481 if ( v > 256 ) 482 { 483 return 256; 484 } 485 return cast(int)v; 486 } 487 /// Time until next scheduled timer event. 488 /// You provide tick size and current real world time. 489 /// This function find ticks until next event and use time of the start and 490 /// total steps executed to calculate time delta from $(B realNow) to next event. 491 ///Params: 492 /// tick = your accepted tick duration. 493 /// realNow = real world now, result of Clock.currStdTime 494 ///Returns: time until next event. Can be zero or negative in case you have already expired events. 495 /// 496 Nullable!Duration timeUntilNextEvent(const Duration tick, ulong realNow) 497 { 498 assert(startedAt>0, "Forgot to call init()?"); 499 if (totalTimers == 0) 500 return typeof(return).init; 501 immutable n = ticksUntilNextEvent(); 502 immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs; 503 auto delta = (target - realNow).hnsecs; 504 debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt)); 505 return nullable(delta); 506 } 507 508 /// 509 /// Adnvance wheel and return all timers expired during wheel turn. 510 // 511 /// Params: 512 /// ticks = how many ticks to advance. Must be in range 0 <= 256 513 /// Returns: list of expired timers 514 /// 515 import std.array : Appender; 516 void advance(this W)(ulong ticks, ref Appender!(T[]) app) 517 { 518 if (ticks > l2t(0)) 519 { 520 throw new AdvanceWheelError("You can't advance that much"); 521 } 522 if (ticks == 0) 523 { 524 throw new AdvanceWheelError("ticks must be > 0"); 525 } 526 debug(timingwheels) safe_tracef("advancing %d ticks", ticks); 527 auto level = &levels[0]; 528 529 while(ticks) 530 { 531 ticks--; 532 immutable now = ++level.now; 533 immutable slot_index = now & MASK; 534 auto slot = &level.slots[slot_index]; 535 //debug(timingwheels) safe_tracef("level 0, now=%s", now); 536 while(slot.head) 537 { 538 auto le = slot.head; 539 auto timer = le.timer; 540 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at); 541 app.put(timer); 542 dl_unlink(le, &slot.head); 543 returnToFreeList(le); 544 ptrs.remove(timer.id()); 545 } 546 if (slot_index == 0) 547 { 548 advance_level(1); 549 } 550 } 551 } 552 auto totalTimers() pure @safe @nogc 553 { 554 return ptrs.length(); 555 } 556 auto allTimers() @safe @nogc 557 { 558 struct AllTimers 559 { 560 HashMap!(TimerIdType, T) _map; 561 auto timers() 562 { 563 return _map.byValue; 564 } 565 auto length() 566 { 567 return _map.length(); 568 } 569 bool contains(TimerIdType id) 570 { 571 return _map.contains(id); 572 } 573 } 574 alias AllResult = automem.RefCounted!(AllTimers, Mallocator); 575 AllTimers result; 576 foreach (p; ptrs.byPair) 577 { 578 result._map[p.key] = p.value.timer; 579 } 580 return result; 581 } 582 // 583 // ticks until next event on level 0 or until next wheel rotation 584 // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 585 // at the time when next timer have to be processed. 586 //Returns: number of safe "sleep" ticks. 587 // 588 private int ticksUntilNextEvent() 589 out(r; r<=256) 590 { 591 int result = 1; 592 auto level = &levels[0]; 593 immutable uint now = levels[0].now & MASK; 594 auto slot = (now + 1) & MASK; 595 //assert(level.slots[now].head == null); 596 do 597 { 598 if (level.slots[slot].head != null) 599 { 600 break; 601 } 602 result++; 603 slot = (slot + 1) & MASK; 604 } 605 while(slot != now); 606 607 return min(result, 256-now); 608 } 609 610 private void advance_level(int level_index) 611 in(level_index>0) 612 { 613 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 614 immutable now0 = levels[0].now; 615 auto level = &levels[level_index]; 616 immutable now = ++level.now; 617 immutable slot_index = now & MASK; 618 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 619 auto slot = &level.slots[slot_index]; 620 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 621 while(slot.head) 622 { 623 auto listElement = slot.head; 624 625 immutable delta = listElement.scheduled_at - now0; 626 size_t lower_level_index = 0; 627 long t = delta; 628 size_t s = 1; // width of the slot in ticks on level 629 size_t shift = 0; 630 while(t > s<<8) // while t > slots on level 631 { 632 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s; 633 lower_level_index++; 634 s = s << 8; 635 shift += 8; 636 } 637 auto mask = s - 1; 638 size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 639 debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)", 640 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta); 641 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 642 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 643 } 644 if (slot_index == 0 && level_index < LEVEL_MAX) 645 { 646 advance_level(level_index+1); 647 } 648 } 649 void reset() { 650 this = TimingWheels!T(); 651 } 652 653 } 654 655 version(twtesting): 656 657 @("TimingWheels") 658 unittest 659 { 660 import std.stdio; 661 globalLogLevel = LogLevel.info; 662 TimingWheels!Timer w; 663 w.init(); 664 assert(w.t2l(1) == 0); 665 assert(w.t2s(1, 0) == 1); 666 immutable t = 0x00_00_00_11_00_00_00_77; 667 immutable level = w.t2l(t); 668 assert(level==4); 669 immutable slot = w.t2s(t, level); 670 assert(slot == 0x11); 671 auto timer = new Timer(); 672 () @nogc @safe { 673 w.schedule(timer, 2); 674 bool thrown; 675 // check that you can't schedule same timer twice 676 try 677 { 678 w.schedule(timer, 5); 679 } 680 catch(ScheduleTimerError e) 681 { 682 thrown = true; 683 } 684 assert(thrown); 685 thrown = false; 686 try 687 { 688 w.advance(1024); 689 } 690 catch(AdvanceWheelError e) 691 { 692 thrown = true; 693 } 694 assert(thrown); 695 thrown = false; 696 w.cancel(timer); 697 w.advance(1); 698 }(); 699 w = TimingWheels!Timer(); 700 w.init(); 701 w.schedule(timer, 1); 702 auto r = w.advance(1); 703 assert(r.timers.count == 1); 704 w.schedule(timer, 256); 705 r = w.advance(255); 706 assert(r.timers.count == 0); 707 r = w.advance(1); 708 assert(r.timers.count == 1); 709 w.schedule(timer, 256*256); 710 int c; 711 for(int i=0;i<256;i++) 712 { 713 r = w.advance(256); 714 c += r.timers.count; 715 } 716 assert(c==1); 717 } 718 @("rt") 719 @Tags("noauto") 720 unittest 721 { 722 globalLogLevel = LogLevel.info; 723 TimingWheels!Timer w; 724 Duration Tick = 5.msecs; 725 w.init(); 726 ulong now = Clock.currStdTime; 727 assert(now - w.currStdTime(Tick) < 5*10_000); 728 Thread.sleep(2*Tick); 729 now = Clock.currStdTime; 730 assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10); 731 auto toCatchUp = w.ticksToCatchUp(Tick, now); 732 toCatchUp.shouldEqual(2); 733 auto t = w.advance(toCatchUp); 734 toCatchUp = w.ticksToCatchUp(Tick, now); 735 toCatchUp.shouldEqual(0); 736 } 737 @("cancel") 738 unittest 739 { 740 globalLogLevel = LogLevel.info; 741 TimingWheels!Timer w; 742 w.init(); 743 Timer timer0 = new Timer(); 744 Timer timer1 = new Timer(); 745 w.schedule(timer0, 256); 746 w.schedule(timer1, 256+128); 747 auto r = w.advance(255); 748 assert(r.timers.count == 0); 749 w.cancel(timer0); 750 r = w.advance(1); 751 assert(r.timers.count == 0); 752 assertThrown!CancelTimerError(w.cancel(timer0)); 753 w.cancel(timer1); 754 } 755 @("ticksUntilNextEvent") 756 unittest 757 { 758 globalLogLevel = LogLevel.info; 759 TimingWheels!Timer w; 760 w.init(); 761 auto s = w.ticksUntilNextEvent; 762 assert(s==256); 763 auto r = w.advance(s); 764 assert(r.timers.count == 0); 765 Timer t = new Timer(); 766 w.schedule(t, 50); 767 s = w.ticksUntilNextEvent; 768 assert(s==50); 769 r = w.advance(s); 770 assert(r.timers.count == 1); 771 } 772 773 @("load") 774 @Serial 775 unittest 776 { 777 import std.array:array; 778 globalLogLevel = LogLevel.info; 779 enum TIMERS = 100_000; 780 Timer._current_id = 1; 781 auto w = TimingWheels!Timer(); 782 w.init(); 783 for(int i=1;i<=TIMERS;i++) 784 { 785 auto t = new Timer(); 786 w.schedule(t, i); 787 } 788 int counter; 789 for(int i=1;i<=TIMERS;i++) 790 { 791 auto r = w.advance(1); 792 auto timers = r.timers; 793 auto t = timers.array()[0]; 794 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 795 assert(timers.count == 1); 796 counter++; 797 } 798 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 799 800 for(int i=1;i<=TIMERS;i++) 801 { 802 auto t = new Timer(); 803 w.schedule(t, i); 804 } 805 counter = 0; 806 for(int i=TIMERS+1;i<=2*TIMERS;i++) 807 { 808 auto r = w.advance(1); 809 auto timers = r.timers; 810 auto t = timers.array()[0]; 811 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 812 assert(timers.count == 1); 813 counter++; 814 } 815 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 816 817 } 818 // @("cornercase") 819 // @Serial 820 // unittest 821 // { 822 // Timer._current_id = 1; 823 // auto w = TimingWheels!Timer(); 824 // globalLogLevel = LogLevel.trace; 825 // w.advance(254); 826 // auto t = new Timer(); 827 // w.schedule(t, 511); 828 // for(int i=0; i<511; i++) 829 // { 830 // w.advance(1); 831 // } 832 // } 833 834 /// 835 /// 836 /// 837 @("example") 838 @Tags("noauto") 839 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs) 840 @Serial 841 unittest 842 { 843 import std; 844 globalLogLevel = LogLevel.info; 845 auto rnd = Random(142); 846 auto Tick = getValue!Duration(); 847 /// track execution 848 int counter; 849 SysTime last; 850 851 /// this is our Timer 852 class Timer 853 { 854 static ulong __id; 855 private ulong _id; 856 private string _name; 857 this(string name) 858 { 859 _id = __id++; 860 _name = name; 861 } 862 /// must provide id() method 863 ulong id() 864 { 865 return _id; 866 } 867 } 868 869 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 870 871 // each tick span 5 msecs - this is our link with time in reality 872 TimingWheels!Timer w; 873 w.init(); 874 auto durationToTicks(Duration d) 875 { 876 // we have to adjust w.now and realtime 'now' before scheduling timer 877 auto real_now = Clock.currStdTime; 878 auto tw_now = w.currStdTime(Tick); 879 auto delay = (real_now - tw_now).hnsecs; 880 return (d + delay)/Tick; 881 } 882 void process_timer(Timer t) 883 { 884 switch(t._name) 885 { 886 case "periodic": 887 if ( last.stdTime == 0) 888 { 889 // initialize tracking 890 last = Clock.currTime - 50.msecs; 891 } 892 auto delta = Clock.currTime - last; 893 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs)); 894 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 895 last = Clock.currTime; 896 counter++; 897 w.schedule(t, durationToTicks(50.msecs)); // rearm 898 break; 899 default: 900 writefln("@ %s", t._name); 901 break; 902 } 903 } 904 // emulate some random initial delay 905 auto randomInitialDelay = uniform(0, 500, rnd).msecs; 906 Thread.sleep(randomInitialDelay); 907 // 908 // start one arbitrary timer and one periodic timer 909 // 910 auto some_timer = new Timer("some"); 911 auto periodic_timer = new Timer("periodic"); 912 w.schedule(some_timer, durationToTicks(32.msecs)); 913 w.schedule(periodic_timer, durationToTicks(50.msecs)); 914 915 while(counter < 10) 916 { 917 auto realNow = Clock.currStdTime; 918 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 919 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs); 920 // wait for what should happen earlier 921 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 922 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 923 Thread.sleep(time_to_sleep); 924 // make steps if required 925 int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime); 926 if (ticks > 0) 927 { 928 auto wr = w.advance(ticks); 929 foreach(t; wr.timers) 930 { 931 process_timer(t); 932 } 933 } 934 // emulate some random processing time 935 Thread.sleep(uniform(0, 5, rnd).msecs); 936 } 937 }