1 /// taken from https://github.com/ikod/timingwheels 2 /// license: BSL-1.0 3 /+ 4 Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following: 5 6 The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor. 7 8 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 9 +/ 10 module concurrency.timingwheels; 11 12 import std.datetime; 13 import std.exception; 14 import std.typecons; 15 import std.format; 16 import std.traits; 17 import std.range; 18 import std.algorithm; 19 import std.experimental.logger; 20 21 import std.experimental.allocator; 22 import std.experimental.allocator.mallocator: Mallocator; 23 24 import core.thread; 25 import core.memory; 26 27 import ikod.containers.hashmap; 28 import automem; 29 import mir.algebraic : Nullable, nullable; 30 31 version(twtesting) 32 { 33 import unit_threaded; 34 } 35 36 37 version(twtesting) 38 { 39 private class Timer 40 { 41 static ulong _current_id; 42 private 43 { 44 ulong _id; 45 } 46 this() @safe @nogc 47 { 48 _id = _current_id; 49 _current_id++; 50 } 51 ~this() @safe @nogc 52 { 53 54 } 55 ulong id() @safe @nogc 56 { 57 return _id; 58 } 59 override string toString() 60 { 61 return "%d".format(_id); 62 } 63 } 64 } 65 /// 66 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 67 /// 68 /// 69 class ScheduleTimerError: Exception 70 { 71 /// 72 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 73 { 74 super(msg, file, line); 75 } 76 } 77 /// 78 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 79 /// 80 class CancelTimerError: Exception 81 { 82 /// 83 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 84 { 85 super(msg, file, line); 86 } 87 } 88 /// 89 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 90 /// 91 class AdvanceWheelError: Exception 92 { 93 /// 94 /// 95 /// 96 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 97 { 98 super(msg, file, line); 99 } 100 } 101 102 debug(timingwheels) @safe @nogc nothrow 103 { 104 package 105 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 106 { 107 bool osx,ldc; 108 version(OSX) 109 { 110 osx = true; 111 } 112 version(LDC) 113 { 114 ldc = true; 115 } 116 debug (timingwheels) try 117 { 118 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 119 if (!osx || !ldc) 120 { 121 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 122 } 123 } 124 catch(Exception e) 125 { 126 } 127 } 128 } 129 130 pragma(inline) 131 private void dl_insertFront(L)(L *le, L** head) 132 { 133 if ( *head == null) 134 { 135 le.next = le.prev = le; 136 } 137 else 138 { 139 auto curr_head = *head; 140 le.prev = curr_head.prev; 141 le.next = curr_head; 142 curr_head.prev.next = le; 143 curr_head.prev = le; 144 } 145 *head = le; 146 } 147 148 pragma(inline) 149 private void dl_unlink(L)(L *le, L** head) 150 in(*head != null) 151 { 152 if (le.next == le && *head == le) 153 { 154 *head = null; 155 return; 156 } 157 if (le == *head) 158 { 159 *head = le.next; 160 } 161 le.next.prev = le.prev; 162 le.prev.next = le.next; 163 } 164 pragma(inline) 165 private void dl_walk(L)(L** head) 166 { 167 if (*head == null) 168 { 169 return; 170 } 171 auto le = *head; 172 do 173 { 174 le = le.next; 175 } while (le != *head); 176 } 177 pragma(inline) 178 private void dl_relink(L)(L* le, L** head_from, L** head_to) 179 in(le.prev !is null && le.next !is null) 180 { 181 dl_unlink(le, head_from); 182 dl_insertFront(le, head_to); 183 } 184 185 @("dl") 186 unittest 187 { 188 globalLogLevel = LogLevel.info; 189 struct LE 190 { 191 int p; 192 LE *next; 193 LE *prev; 194 } 195 LE* head1 = null; 196 LE* head2 = null; 197 auto le1 = new LE(1); 198 auto le2 = new LE(2); 199 dl_insertFront(le1, &head1); 200 assert(head1 != null); 201 dl_unlink(le1, &head1); 202 assert(head1 == null); 203 204 dl_insertFront(le1, &head1); 205 assert(head1 != null); 206 dl_insertFront(le2, &head1); 207 dl_unlink(le1, &head1); 208 assert(head1 != null); 209 dl_unlink(le2, &head1); 210 assert(head1 == null); 211 212 dl_insertFront(le1, &head1); 213 assert(head1 != null); 214 dl_insertFront(le2, &head1); 215 dl_unlink(le2, &head1); 216 assert(head1 != null); 217 dl_unlink(le1, &head1); 218 assert(head1 == null); 219 220 dl_insertFront(le1, &head1); 221 dl_relink(le1, &head1, &head2); 222 assert(head1 == null); 223 assert(head2 != null); 224 } 225 /// 226 /// This structure implements scheme 6.2 thom the 227 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf) 228 /// and supports several primitives: 229 /// $(UL 230 /// $(LI schedule timer in the future.) 231 /// $(LI cancel timer.) 232 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.) 233 /// ) 234 /// Each operation take O(1) time. 235 /// 236 struct TimingWheels(T) 237 { 238 import core.bitop: bsr; 239 240 private 241 { 242 alias TimerIdType = ReturnType!(T.id); 243 alias allocator = Mallocator.instance; 244 245 enum MASK = 0xff; 246 enum LEVELS = 8; 247 enum LEVEL_MAX = LEVELS - 1; 248 enum SLOTS = 256; 249 enum FreeListMaxLen = 100; 250 251 struct ListElement(T) 252 { 253 private 254 { 255 T timer; 256 ulong scheduled_at; 257 ushort position; 258 ListElement!T* prev, next; 259 } 260 } 261 struct Slot 262 { 263 ListElement!T* head; 264 } 265 struct Level 266 { 267 // now if counter of ticks processed on this level 268 ulong now; 269 Slot[SLOTS] slots; 270 } 271 272 Level[LEVELS] levels; 273 ListElement!T* freeList; 274 int freeListLen; 275 HashMap!(TimerIdType, ListElement!T*) 276 ptrs; 277 long startedAt; 278 } 279 invariant 280 { 281 assert(freeListLen>=0); 282 } 283 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 284 285 // hashing ticks to slots 286 // 8 levels, each level 256 slots, with of slot on each level 256 times 287 // translate ticks to level 288 // 0x00_00_00_00_00_00_00_00 <- ticks 289 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 290 // □ □ □ □ □ □ □ □ \ 291 // □ □ □ □ □ □ □ □ | 292 // . . . . . . . . | 256 slots 293 // . . . . . . . . | 294 // □ □ □ □ □ □ □ □ / 295 // 7 6 5 4 3 2 1 0 296 // <- 8 levels 297 // each slot - double linked list of timers 298 299 // ticks to level = bsr(ticks)/8 300 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 301 { 302 if (t == 0) 303 { 304 return 0; 305 } 306 return bsr(t)/LEVELS; 307 } 308 // ticks to slot = ticks >> (level*8) 309 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 310 { 311 return (t >> (l<<3)) & MASK; 312 } 313 // level to ticks 314 // l[0] -> 256 315 // l[1] -> 256*256 316 // ... 317 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 318 { 319 return SLOTS<<l; 320 } 321 ~this() 322 { 323 ptrs.clear; 324 for(int l=0;l<=LEVEL_MAX;l++) 325 for(int s=0; s<SLOTS; s++) 326 { 327 while(levels[l].slots[s].head) 328 { 329 auto le = levels[l].slots[s].head; 330 dl_unlink(le, &levels[l].slots[s].head); 331 () @trusted { 332 dispose(allocator, le); 333 }(); 334 } 335 } 336 while(freeList) 337 { 338 assert(freeListLen>0); 339 auto n = freeList.next; 340 () @trusted { 341 dispose(allocator, freeList); 342 }(); 343 freeListLen--; 344 freeList = n; 345 } 346 } 347 348 private ListElement!T* getOrCreate() 349 { 350 ListElement!T* result; 351 if (freeList !is null) 352 { 353 result = freeList; 354 freeList = freeList.next; 355 freeListLen--; 356 return result; 357 } 358 result = make!(ListElement!T)(allocator); 359 return result; 360 } 361 private void returnToFreeList(ListElement!T* le) 362 { 363 if ( freeListLen >= FreeListMaxLen ) 364 { 365 // this can be safely disposed as we do not leak ListElements outide this module 366 () @trusted { 367 dispose(allocator, le); 368 }(); 369 } 370 else 371 { 372 le.position = 0xffff; 373 le.next = freeList; 374 freeList = le; 375 freeListLen++; 376 } 377 } 378 void init() 379 { 380 startedAt = Clock.currStdTime; 381 } 382 void init(ulong time) { 383 startedAt = time; 384 } 385 /++ 386 + Return internal view on current time - it is time at the call to $(B init) 387 + plus total number of steps multiplied by $(B tick) duration. 388 + Params: 389 + tick = tick duration 390 +/ 391 auto currStdTime(Duration tick) 392 { 393 return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs; 394 } 395 /// 396 /// Schedule timer to $(B ticks) ticks forward from internal 'now'. 397 ///Params: 398 /// timer = timer to schedule; 399 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 400 ///Returns: 401 /// void 402 ///Throws: 403 /// ScheduleTimerError 404 /// when thicks == 0 405 /// or when timer already scheduled 406 /// 407 void schedule(T)(T timer, const ulong ticks) 408 { 409 if (ticks == 0) 410 { 411 throw new ScheduleTimerError("ticks can't be 0"); 412 } 413 auto timer_id = timer.id(); 414 if (ptrs.contains(timer_id)) 415 { 416 throw new ScheduleTimerError("Timer already scheduled"); 417 } 418 size_t level_index = 0; 419 long t = ticks; 420 long s = 1; // width of the slot in ticks on level 421 long shift = 0; 422 while(t > s<<8) // while t > slots on level 423 { 424 t -= (SLOTS - (levels[level_index].now & MASK)) * s; 425 level_index++; 426 s = s << 8; 427 shift += 8; 428 } 429 auto level = &levels[level_index]; 430 auto mask = s - 1; 431 size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 432 auto slot = &levels[level_index].slots[slot_index]; 433 debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now); 434 auto le = getOrCreate(); 435 le.timer = timer; 436 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 437 le.scheduled_at = levels[0].now + ticks; 438 dl_insertFront(le, &slot.head); 439 ptrs[timer_id] = le; 440 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s", 441 timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index); 442 } 443 /// Cancel timer 444 ///Params: 445 /// timer = timer to cancel 446 ///Returns: 447 /// void 448 ///Throws: 449 /// CancelTimerError 450 /// if timer not in wheel 451 void cancel(T)(T timer) 452 { 453 // get list element pointer 454 auto v = ptrs.fetch(timer.id()); 455 if ( !v.ok ) 456 { 457 return; 458 } 459 auto le = v.value; 460 immutable level_index = le.position>>8; 461 immutable slot_index = le.position & 0xff; 462 assert(timer is le.timer); 463 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 464 dl_unlink(le, &levels[level_index].slots[slot_index].head); 465 returnToFreeList(le); 466 ptrs.remove(timer.id()); 467 } 468 /// Number of ticks to rotate wheels until internal wheel 'now' 469 /// catch up with real world realTime. 470 /// Calculation based on time when wheels were stared and total 471 /// numer of ticks pasded. 472 ///Params: 473 /// tick = your tick length (Duration) 474 /// realTime = current real world now (Clock.currStdTime) 475 ///Returns: ticks to advance so that we catch up real world current time 476 int ticksToCatchUp(Duration tick, ulong realTime) 477 { 478 auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now; 479 auto v = (realTime - c) / tick.split!"hnsecs".hnsecs; 480 if ( v > 256 ) 481 { 482 return 256; 483 } 484 return cast(int)v; 485 } 486 /// Time until next scheduled timer event. 487 /// You provide tick size and current real world time. 488 /// This function find ticks until next event and use time of the start and 489 /// total steps executed to calculate time delta from $(B realNow) to next event. 490 ///Params: 491 /// tick = your accepted tick duration. 492 /// realNow = real world now, result of Clock.currStdTime 493 ///Returns: time until next event. Can be zero or negative in case you have already expired events. 494 /// 495 Nullable!Duration timeUntilNextEvent(const Duration tick, ulong realNow) 496 { 497 assert(startedAt>0, "Forgot to call init()?"); 498 if (totalTimers == 0) 499 return typeof(return).init; 500 immutable n = ticksUntilNextEvent(); 501 immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs; 502 auto delta = (target - realNow).hnsecs; 503 debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt)); 504 return nullable(delta); 505 } 506 507 /// 508 /// Adnvance wheel and return all timers expired during wheel turn. 509 // 510 /// Params: 511 /// ticks = how many ticks to advance. Must be in range 0 <= 256 512 /// Returns: list of expired timers 513 /// 514 import std.array : Appender; 515 void advance(this W)(ulong ticks, ref Appender!(T[]) app) 516 { 517 if (ticks > l2t(0)) 518 { 519 throw new AdvanceWheelError("You can't advance that much"); 520 } 521 if (ticks == 0) 522 { 523 throw new AdvanceWheelError("ticks must be > 0"); 524 } 525 debug(timingwheels) safe_tracef("advancing %d ticks", ticks); 526 auto level = &levels[0]; 527 528 while(ticks) 529 { 530 ticks--; 531 immutable now = ++level.now; 532 immutable slot_index = now & MASK; 533 auto slot = &level.slots[slot_index]; 534 //debug(timingwheels) safe_tracef("level 0, now=%s", now); 535 while(slot.head) 536 { 537 auto le = slot.head; 538 auto timer = le.timer; 539 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at); 540 app.put(timer); 541 dl_unlink(le, &slot.head); 542 returnToFreeList(le); 543 ptrs.remove(timer.id()); 544 } 545 if (slot_index == 0) 546 { 547 advance_level(1); 548 } 549 } 550 } 551 auto totalTimers() pure @safe @nogc 552 { 553 return ptrs.length(); 554 } 555 auto allTimers() @safe @nogc 556 { 557 struct AllTimers 558 { 559 HashMap!(TimerIdType, T) _map; 560 auto timers() 561 { 562 return _map.byValue; 563 } 564 auto length() 565 { 566 return _map.length(); 567 } 568 bool contains(TimerIdType id) 569 { 570 return _map.contains(id); 571 } 572 } 573 alias AllResult = automem.RefCounted!(AllTimers, Mallocator); 574 AllTimers result; 575 foreach (p; ptrs.byPair) 576 { 577 result._map[p.key] = p.value.timer; 578 } 579 return result; 580 } 581 // 582 // ticks until next event on level 0 or until next wheel rotation 583 // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 584 // at the time when next timer have to be processed. 585 //Returns: number of safe "sleep" ticks. 586 // 587 private int ticksUntilNextEvent() 588 out(r; r<=256) 589 { 590 int result = 1; 591 auto level = &levels[0]; 592 immutable uint now = levels[0].now & MASK; 593 auto slot = (now + 1) & MASK; 594 //assert(level.slots[now].head == null); 595 do 596 { 597 if (level.slots[slot].head != null) 598 { 599 break; 600 } 601 result++; 602 slot = (slot + 1) & MASK; 603 } 604 while(slot != now); 605 606 return min(result, 256-now); 607 } 608 609 private void advance_level(int level_index) 610 in(level_index>0) 611 { 612 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 613 immutable now0 = levels[0].now; 614 auto level = &levels[level_index]; 615 immutable now = ++level.now; 616 immutable slot_index = now & MASK; 617 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 618 auto slot = &level.slots[slot_index]; 619 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 620 while(slot.head) 621 { 622 auto listElement = slot.head; 623 624 immutable delta = listElement.scheduled_at - now0; 625 size_t lower_level_index = 0; 626 long t = delta; 627 size_t s = 1; // width of the slot in ticks on level 628 size_t shift = 0; 629 while(t > s<<8) // while t > slots on level 630 { 631 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s; 632 lower_level_index++; 633 s = s << 8; 634 shift += 8; 635 } 636 auto mask = s - 1; 637 size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 638 debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)", 639 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta); 640 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 641 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 642 } 643 if (slot_index == 0 && level_index < LEVEL_MAX) 644 { 645 advance_level(level_index+1); 646 } 647 } 648 } 649 650 version(twtesting): 651 652 @("TimingWheels") 653 unittest 654 { 655 import std.stdio; 656 globalLogLevel = LogLevel.info; 657 TimingWheels!Timer w; 658 w.init(); 659 assert(w.t2l(1) == 0); 660 assert(w.t2s(1, 0) == 1); 661 immutable t = 0x00_00_00_11_00_00_00_77; 662 immutable level = w.t2l(t); 663 assert(level==4); 664 immutable slot = w.t2s(t, level); 665 assert(slot == 0x11); 666 auto timer = new Timer(); 667 () @nogc @safe { 668 w.schedule(timer, 2); 669 bool thrown; 670 // check that you can't schedule same timer twice 671 try 672 { 673 w.schedule(timer, 5); 674 } 675 catch(ScheduleTimerError e) 676 { 677 thrown = true; 678 } 679 assert(thrown); 680 thrown = false; 681 try 682 { 683 w.advance(1024); 684 } 685 catch(AdvanceWheelError e) 686 { 687 thrown = true; 688 } 689 assert(thrown); 690 thrown = false; 691 w.cancel(timer); 692 w.advance(1); 693 }(); 694 w = TimingWheels!Timer(); 695 w.init(); 696 w.schedule(timer, 1); 697 auto r = w.advance(1); 698 assert(r.timers.count == 1); 699 w.schedule(timer, 256); 700 r = w.advance(255); 701 assert(r.timers.count == 0); 702 r = w.advance(1); 703 assert(r.timers.count == 1); 704 w.schedule(timer, 256*256); 705 int c; 706 for(int i=0;i<256;i++) 707 { 708 r = w.advance(256); 709 c += r.timers.count; 710 } 711 assert(c==1); 712 } 713 @("rt") 714 @Tags("noauto") 715 unittest 716 { 717 globalLogLevel = LogLevel.info; 718 TimingWheels!Timer w; 719 Duration Tick = 5.msecs; 720 w.init(); 721 ulong now = Clock.currStdTime; 722 assert(now - w.currStdTime(Tick) < 5*10_000); 723 Thread.sleep(2*Tick); 724 now = Clock.currStdTime; 725 assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10); 726 auto toCatchUp = w.ticksToCatchUp(Tick, now); 727 toCatchUp.shouldEqual(2); 728 auto t = w.advance(toCatchUp); 729 toCatchUp = w.ticksToCatchUp(Tick, now); 730 toCatchUp.shouldEqual(0); 731 } 732 @("cancel") 733 unittest 734 { 735 globalLogLevel = LogLevel.info; 736 TimingWheels!Timer w; 737 w.init(); 738 Timer timer0 = new Timer(); 739 Timer timer1 = new Timer(); 740 w.schedule(timer0, 256); 741 w.schedule(timer1, 256+128); 742 auto r = w.advance(255); 743 assert(r.timers.count == 0); 744 w.cancel(timer0); 745 r = w.advance(1); 746 assert(r.timers.count == 0); 747 assertThrown!CancelTimerError(w.cancel(timer0)); 748 w.cancel(timer1); 749 } 750 @("ticksUntilNextEvent") 751 unittest 752 { 753 globalLogLevel = LogLevel.info; 754 TimingWheels!Timer w; 755 w.init(); 756 auto s = w.ticksUntilNextEvent; 757 assert(s==256); 758 auto r = w.advance(s); 759 assert(r.timers.count == 0); 760 Timer t = new Timer(); 761 w.schedule(t, 50); 762 s = w.ticksUntilNextEvent; 763 assert(s==50); 764 r = w.advance(s); 765 assert(r.timers.count == 1); 766 } 767 768 @("load") 769 @Serial 770 unittest 771 { 772 import std.array:array; 773 globalLogLevel = LogLevel.info; 774 enum TIMERS = 100_000; 775 Timer._current_id = 1; 776 auto w = TimingWheels!Timer(); 777 w.init(); 778 for(int i=1;i<=TIMERS;i++) 779 { 780 auto t = new Timer(); 781 w.schedule(t, i); 782 } 783 int counter; 784 for(int i=1;i<=TIMERS;i++) 785 { 786 auto r = w.advance(1); 787 auto timers = r.timers; 788 auto t = timers.array()[0]; 789 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 790 assert(timers.count == 1); 791 counter++; 792 } 793 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 794 795 for(int i=1;i<=TIMERS;i++) 796 { 797 auto t = new Timer(); 798 w.schedule(t, i); 799 } 800 counter = 0; 801 for(int i=TIMERS+1;i<=2*TIMERS;i++) 802 { 803 auto r = w.advance(1); 804 auto timers = r.timers; 805 auto t = timers.array()[0]; 806 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 807 assert(timers.count == 1); 808 counter++; 809 } 810 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 811 812 } 813 // @("cornercase") 814 // @Serial 815 // unittest 816 // { 817 // Timer._current_id = 1; 818 // auto w = TimingWheels!Timer(); 819 // globalLogLevel = LogLevel.trace; 820 // w.advance(254); 821 // auto t = new Timer(); 822 // w.schedule(t, 511); 823 // for(int i=0; i<511; i++) 824 // { 825 // w.advance(1); 826 // } 827 // } 828 829 /// 830 /// 831 /// 832 @("example") 833 @Tags("noauto") 834 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs) 835 @Serial 836 unittest 837 { 838 import std; 839 globalLogLevel = LogLevel.info; 840 auto rnd = Random(142); 841 auto Tick = getValue!Duration(); 842 /// track execution 843 int counter; 844 SysTime last; 845 846 /// this is our Timer 847 class Timer 848 { 849 static ulong __id; 850 private ulong _id; 851 private string _name; 852 this(string name) 853 { 854 _id = __id++; 855 _name = name; 856 } 857 /// must provide id() method 858 ulong id() 859 { 860 return _id; 861 } 862 } 863 864 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 865 866 // each tick span 5 msecs - this is our link with time in reality 867 TimingWheels!Timer w; 868 w.init(); 869 auto durationToTicks(Duration d) 870 { 871 // we have to adjust w.now and realtime 'now' before scheduling timer 872 auto real_now = Clock.currStdTime; 873 auto tw_now = w.currStdTime(Tick); 874 auto delay = (real_now - tw_now).hnsecs; 875 return (d + delay)/Tick; 876 } 877 void process_timer(Timer t) 878 { 879 switch(t._name) 880 { 881 case "periodic": 882 if ( last.stdTime == 0) 883 { 884 // initialize tracking 885 last = Clock.currTime - 50.msecs; 886 } 887 auto delta = Clock.currTime - last; 888 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs)); 889 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 890 last = Clock.currTime; 891 counter++; 892 w.schedule(t, durationToTicks(50.msecs)); // rearm 893 break; 894 default: 895 writefln("@ %s", t._name); 896 break; 897 } 898 } 899 // emulate some random initial delay 900 auto randomInitialDelay = uniform(0, 500, rnd).msecs; 901 Thread.sleep(randomInitialDelay); 902 // 903 // start one arbitrary timer and one periodic timer 904 // 905 auto some_timer = new Timer("some"); 906 auto periodic_timer = new Timer("periodic"); 907 w.schedule(some_timer, durationToTicks(32.msecs)); 908 w.schedule(periodic_timer, durationToTicks(50.msecs)); 909 910 while(counter < 10) 911 { 912 auto realNow = Clock.currStdTime; 913 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 914 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs); 915 // wait for what should happen earlier 916 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 917 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 918 Thread.sleep(time_to_sleep); 919 // make steps if required 920 int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime); 921 if (ticks > 0) 922 { 923 auto wr = w.advance(ticks); 924 foreach(t; wr.timers) 925 { 926 process_timer(t); 927 } 928 } 929 // emulate some random processing time 930 Thread.sleep(uniform(0, 5, rnd).msecs); 931 } 932 }