1 /// taken from https://github.com/ikod/timingwheels 2 /// license: BSL-1.0 3 /+ 4 Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following: 5 6 The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor. 7 8 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 9 +/ 10 module concurrency.timingwheels; 11 12 import std.datetime; 13 import std.exception; 14 import std.typecons; 15 import std.format; 16 import std.traits; 17 import std.range; 18 import std.algorithm; 19 import std.experimental.logger; 20 21 import std.experimental.allocator; 22 import std.experimental.allocator.mallocator: Mallocator; 23 24 import core.thread; 25 import core.memory; 26 27 import ikod.containers.hashmap; 28 import automem; 29 import mir.algebraic : Nullable, nullable; 30 31 version(twtesting) 32 { 33 import unit_threaded; 34 } 35 36 37 version(twtesting) 38 { 39 private class Timer 40 { 41 static ulong _current_id; 42 private 43 { 44 ulong _id; 45 } 46 this() @safe @nogc 47 { 48 _id = _current_id; 49 _current_id++; 50 } 51 ~this() @safe @nogc 52 { 53 54 } 55 ulong id() @safe @nogc 56 { 57 return _id; 58 } 59 override string toString() 60 { 61 return "%d".format(_id); 62 } 63 } 64 } 65 /// 66 /// scheduling error occurs at schedule() when ticks == 0 or timer already scheduled. 67 /// 68 /// 69 class ScheduleTimerError: Exception 70 { 71 /// 72 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 73 { 74 super(msg, file, line); 75 } 76 } 77 /// 78 /// Cancel timer error occurs if you try to cancel timer which is not scheduled. 79 /// 80 class CancelTimerError: Exception 81 { 82 /// 83 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 84 { 85 super(msg, file, line); 86 } 87 } 88 /// 89 /// Advancing error occurs if number of ticks for advance not in range 0<t<=256 90 /// 91 class AdvanceWheelError: Exception 92 { 93 /// 94 /// 95 /// 96 this(string msg, string file = __FILE__, size_t line = __LINE__) @nogc @safe 97 { 98 super(msg, file, line); 99 } 100 } 101 102 debug(timingwheels) @safe @nogc nothrow 103 { 104 package 105 void safe_tracef(A...)(string f, scope A args, string file = __FILE__, int line = __LINE__) @safe @nogc nothrow 106 { 107 bool osx,ldc; 108 version(OSX) 109 { 110 osx = true; 111 } 112 version(LDC) 113 { 114 ldc = true; 115 } 116 debug (timingwheels) try 117 { 118 // this can fail on pair ldc2/osx, see https://github.com/ldc-developers/ldc/issues/3240 119 if (!osx || !ldc) 120 { 121 () @trusted @nogc {tracef("%s:%d " ~ f, file, line, args);}(); 122 } 123 } 124 catch(Exception e) 125 { 126 } 127 } 128 } 129 130 pragma(inline) 131 private void dl_insertFront(L)(L *le, L** head) 132 { 133 if ( *head == null) 134 { 135 le.next = le.prev = le; 136 } 137 else 138 { 139 auto curr_head = *head; 140 le.prev = curr_head.prev; 141 le.next = curr_head; 142 curr_head.prev.next = le; 143 curr_head.prev = le; 144 } 145 *head = le; 146 } 147 148 pragma(inline) 149 private void dl_unlink(L)(L *le, L** head) 150 in(*head != null) 151 { 152 if (le.next == le && *head == le) 153 { 154 *head = null; 155 return; 156 } 157 if (le == *head) 158 { 159 *head = le.next; 160 } 161 le.next.prev = le.prev; 162 le.prev.next = le.next; 163 } 164 pragma(inline) 165 private void dl_walk(L)(L** head) 166 { 167 if (*head == null) 168 { 169 return; 170 } 171 auto le = *head; 172 do 173 { 174 le = le.next; 175 } while (le != *head); 176 } 177 pragma(inline) 178 private void dl_relink(L)(L* le, L** head_from, L** head_to) 179 in(le.prev !is null && le.next !is null) 180 { 181 dl_unlink(le, head_from); 182 dl_insertFront(le, head_to); 183 } 184 185 @("dl") 186 unittest 187 { 188 globalLogLevel = LogLevel.info; 189 struct LE 190 { 191 int p; 192 LE *next; 193 LE *prev; 194 } 195 LE* head1 = null; 196 LE* head2 = null; 197 auto le1 = new LE(1); 198 auto le2 = new LE(2); 199 dl_insertFront(le1, &head1); 200 assert(head1 != null); 201 dl_unlink(le1, &head1); 202 assert(head1 == null); 203 204 dl_insertFront(le1, &head1); 205 assert(head1 != null); 206 dl_insertFront(le2, &head1); 207 dl_unlink(le1, &head1); 208 assert(head1 != null); 209 dl_unlink(le2, &head1); 210 assert(head1 == null); 211 212 dl_insertFront(le1, &head1); 213 assert(head1 != null); 214 dl_insertFront(le2, &head1); 215 dl_unlink(le2, &head1); 216 assert(head1 != null); 217 dl_unlink(le1, &head1); 218 assert(head1 == null); 219 220 dl_insertFront(le1, &head1); 221 dl_relink(le1, &head1, &head2); 222 assert(head1 == null); 223 assert(head2 != null); 224 } 225 /// 226 /// This structure implements scheme 6.2 thom the 227 /// $(LINK http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf) 228 /// and supports several primitives: 229 /// $(UL 230 /// $(LI schedule timer in the future.) 231 /// $(LI cancel timer.) 232 /// $(LI time step (advance) - all timers expired at current time tick are extracted from wheels.) 233 /// ) 234 /// Each operation take O(1) time. 235 /// 236 struct TimingWheels(T) 237 { 238 import core.bitop: bsr; 239 240 private 241 { 242 alias TimerIdType = ReturnType!(T.id); 243 alias allocator = Mallocator.instance; 244 245 enum MASK = 0xff; 246 enum LEVELS = 8; 247 enum LEVEL_MAX = LEVELS - 1; 248 enum SLOTS = 256; 249 enum FreeListMaxLen = 100; 250 251 struct ListElement(T) 252 { 253 private 254 { 255 T timer; 256 ulong scheduled_at; 257 ushort position; 258 ListElement!T* prev, next; 259 } 260 } 261 struct Slot 262 { 263 ListElement!T* head; 264 } 265 struct Level 266 { 267 // now if counter of ticks processed on this level 268 ulong now; 269 Slot[SLOTS] slots; 270 } 271 272 Level[LEVELS] levels; 273 ListElement!T* freeList; 274 int freeListLen; 275 HashMap!(TimerIdType, ListElement!T*) 276 ptrs; 277 long startedAt; 278 } 279 invariant 280 { 281 assert(freeListLen>=0); 282 } 283 alias Ticks = ulong; // ticks are 64 bit unsigned integers. 284 285 // hashing ticks to slots 286 // 8 levels, each level 256 slots, with of slot on each level 256 times 287 // translate ticks to level 288 // 0x00_00_00_00_00_00_00_00 <- ticks 289 // ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ 290 // □ □ □ □ □ □ □ □ \ 291 // □ □ □ □ □ □ □ □ | 292 // . . . . . . . . | 256 slots 293 // . . . . . . . . | 294 // □ □ □ □ □ □ □ □ / 295 // 7 6 5 4 3 2 1 0 296 // <- 8 levels 297 // each slot - double linked list of timers 298 299 // ticks to level = bsr(ticks)/8 300 pragma(inline) private pure int t2l(ulong t) @safe @nogc nothrow 301 { 302 if (t == 0) 303 { 304 return 0; 305 } 306 return bsr(t)/LEVELS; 307 } 308 // ticks to slot = ticks >> (level*8) 309 pragma(inline) private pure int t2s(ulong t, int l) @safe @nogc nothrow 310 { 311 return (t >> (l<<3)) & MASK; 312 } 313 // level to ticks 314 // l[0] -> 256 315 // l[1] -> 256*256 316 // ... 317 pragma(inline) private pure ulong l2t(int l) @safe @nogc nothrow 318 { 319 return SLOTS<<l; 320 } 321 ~this() 322 { 323 ptrs.clear; 324 for(int l=0;l<=LEVEL_MAX;l++) 325 for(int s=0; s<SLOTS; s++) 326 { 327 while(levels[l].slots[s].head) 328 { 329 auto le = levels[l].slots[s].head; 330 dl_unlink(le, &levels[l].slots[s].head); 331 () @trusted { 332 GC.removeRange(le); 333 dispose(allocator, le); 334 }(); 335 } 336 } 337 while(freeList) 338 { 339 assert(freeListLen>0); 340 auto n = freeList.next; 341 () @trusted { 342 GC.removeRange(freeList); 343 dispose(allocator, freeList); 344 }(); 345 freeListLen--; 346 freeList = n; 347 } 348 } 349 350 private ListElement!T* getOrCreate() 351 { 352 ListElement!T* result; 353 if (freeList !is null) 354 { 355 result = freeList; 356 freeList = freeList.next; 357 freeListLen--; 358 return result; 359 } 360 result = make!(ListElement!T)(allocator); 361 () @trusted { 362 GC.addRange(result, (*result).sizeof); 363 }(); 364 return result; 365 } 366 private void returnToFreeList(ListElement!T* le) 367 { 368 if ( freeListLen >= FreeListMaxLen ) 369 { 370 // this can be safely disposed as we do not leak ListElements outide this module 371 () @trusted { 372 GC.removeRange(le); 373 dispose(allocator, le); 374 }(); 375 } 376 else 377 { 378 le.position = 0xffff; 379 le.next = freeList; 380 freeList = le; 381 freeListLen++; 382 } 383 } 384 void init() 385 { 386 startedAt = Clock.currStdTime; 387 } 388 void init(ulong time) { 389 startedAt = time; 390 } 391 /++ 392 + Return internal view on current time - it is time at the call to $(B init) 393 + plus total number of steps multiplied by $(B tick) duration. 394 + Params: 395 + tick = tick duration 396 +/ 397 auto currStdTime(Duration tick) 398 { 399 return startedAt + levels[0].now * tick.split!"hnsecs".hnsecs; 400 } 401 /// 402 /// Schedule timer to $(B ticks) ticks forward from internal 'now'. 403 ///Params: 404 /// timer = timer to schedule; 405 /// ticks = ticks in the future to schedule timer. (0 < ticks < ulong.max); 406 ///Returns: 407 /// void 408 ///Throws: 409 /// ScheduleTimerError 410 /// when thicks == 0 411 /// or when timer already scheduled 412 /// 413 void schedule(T)(T timer, const ulong ticks) 414 { 415 if (ticks == 0) 416 { 417 throw new ScheduleTimerError("ticks can't be 0"); 418 } 419 auto timer_id = timer.id(); 420 if (ptrs.contains(timer_id)) 421 { 422 throw new ScheduleTimerError("Timer already scheduled"); 423 } 424 size_t level_index = 0; 425 long t = ticks; 426 long s = 1; // width of the slot in ticks on level 427 long shift = 0; 428 while(t > s<<8) // while t > slots on level 429 { 430 t -= (SLOTS - (levels[level_index].now & MASK)) * s; 431 level_index++; 432 s = s << 8; 433 shift += 8; 434 } 435 auto level = &levels[level_index]; 436 auto mask = s - 1; 437 size_t slot_index = (level.now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 438 auto slot = &levels[level_index].slots[slot_index]; 439 debug(timingwheels) safe_tracef("use level/slot %d/%d, level now: %d", level_index, slot_index, level.now); 440 auto le = getOrCreate(); 441 le.timer = timer; 442 le.position = ((level_index << 8 ) | slot_index) & 0xffff; 443 le.scheduled_at = levels[0].now + ticks; 444 dl_insertFront(le, &slot.head); 445 ptrs[timer_id] = le; 446 debug(timingwheels) safe_tracef("scheduled timer id: %s, ticks: %s, now: %d, scheduled at: %s to level: %s, slot %s", 447 timer_id, ticks, levels[0].now, le.scheduled_at, level_index, slot_index); 448 } 449 /// Cancel timer 450 ///Params: 451 /// timer = timer to cancel 452 ///Returns: 453 /// void 454 ///Throws: 455 /// CancelTimerError 456 /// if timer not in wheel 457 void cancel(T)(T timer) 458 { 459 // get list element pointer 460 auto v = ptrs.fetch(timer.id()); 461 if ( !v.ok ) 462 { 463 return; 464 } 465 auto le = v.value; 466 immutable level_index = le.position>>8; 467 immutable slot_index = le.position & 0xff; 468 assert(timer is le.timer); 469 debug(timingwheels) safe_tracef("cancel timer, l:%d, s:%d", level_index, slot_index); 470 dl_unlink(le, &levels[level_index].slots[slot_index].head); 471 returnToFreeList(le); 472 ptrs.remove(timer.id()); 473 } 474 /// Number of ticks to rotate wheels until internal wheel 'now' 475 /// catch up with real world realTime. 476 /// Calculation based on time when wheels were stared and total 477 /// numer of ticks pasded. 478 ///Params: 479 /// tick = your tick length (Duration) 480 /// realTime = current real world now (Clock.currStdTime) 481 ///Returns: ticks to advance so that we catch up real world current time 482 int ticksToCatchUp(Duration tick, ulong realTime) 483 { 484 auto c = startedAt + tick.split!"hnsecs".hnsecs * levels[0].now; 485 auto v = (realTime - c) / tick.split!"hnsecs".hnsecs; 486 if ( v > 256 ) 487 { 488 return 256; 489 } 490 return cast(int)v; 491 } 492 /// Time until next scheduled timer event. 493 /// You provide tick size and current real world time. 494 /// This function find ticks until next event and use time of the start and 495 /// total steps executed to calculate time delta from $(B realNow) to next event. 496 ///Params: 497 /// tick = your accepted tick duration. 498 /// realNow = real world now, result of Clock.currStdTime 499 ///Returns: time until next event. Can be zero or negative in case you have already expired events. 500 /// 501 Nullable!Duration timeUntilNextEvent(const Duration tick, ulong realNow) 502 { 503 assert(startedAt>0, "Forgot to call init()?"); 504 if (totalTimers == 0) 505 return typeof(return).init; 506 immutable n = ticksUntilNextEvent(); 507 immutable target = startedAt + (levels[0].now + n) * tick.split!"hnsecs".hnsecs; 508 auto delta = (target - realNow).hnsecs; 509 debug(timingwheels) safe_tracef("ticksUntilNextEvent=%s, tick=%s, startedAt=%s", n, tick, SysTime(startedAt)); 510 return nullable(delta); 511 } 512 513 /// 514 /// Adnvance wheel and return all timers expired during wheel turn. 515 // 516 /// Params: 517 /// ticks = how many ticks to advance. Must be in range 0 <= 256 518 /// Returns: list of expired timers 519 /// 520 auto advance(this W)(ulong ticks) 521 { 522 struct ExpiredTimers 523 { 524 HashMap!(TimerIdType, T) _map; 525 auto timers() 526 { 527 return _map.byValue; 528 } 529 auto length() 530 { 531 return _map.length(); 532 } 533 bool contains(TimerIdType id) 534 { 535 return _map.contains(id); 536 } 537 void remove(TimerIdType id) 538 { 539 _map.remove(id); 540 } 541 } 542 alias AdvanceResult = automem.RefCounted!(ExpiredTimers, Mallocator); 543 if (ticks > l2t(0)) 544 { 545 throw new AdvanceWheelError("You can't advance that much"); 546 } 547 if (ticks == 0) 548 { 549 throw new AdvanceWheelError("ticks must be > 0"); 550 } 551 debug(timingwheels) safe_tracef("advancing %d ticks", ticks); 552 auto result = AdvanceResult(ExpiredTimers()); 553 auto level = &levels[0]; 554 555 while(ticks) 556 { 557 ticks--; 558 immutable now = ++level.now; 559 immutable slot_index = now & MASK; 560 auto slot = &level.slots[slot_index]; 561 //debug(timingwheels) safe_tracef("level 0, now=%s", now); 562 while(slot.head) 563 { 564 auto le = slot.head; 565 auto timer = le.timer; 566 auto timer_id = timer.id(); 567 assert(!result._map.contains(timer_id), "Something wrong: we try to return same timer twice"); 568 debug(timingwheels) safe_tracef("return timer: %s, scheduled at %s", timer, le.scheduled_at); 569 result._map[timer_id] = timer; 570 dl_unlink(le, &slot.head); 571 returnToFreeList(le); 572 ptrs.remove(timer.id()); 573 } 574 if (slot_index == 0) 575 { 576 advance_level(1); 577 } 578 } 579 return result; 580 } 581 auto totalTimers() pure @safe @nogc 582 { 583 return ptrs.length(); 584 } 585 auto allTimers() @safe @nogc 586 { 587 struct AllTimers 588 { 589 HashMap!(TimerIdType, T) _map; 590 auto timers() 591 { 592 return _map.byValue; 593 } 594 auto length() 595 { 596 return _map.length(); 597 } 598 bool contains(TimerIdType id) 599 { 600 return _map.contains(id); 601 } 602 } 603 alias AllResult = automem.RefCounted!(AllTimers, Mallocator); 604 AllTimers result; 605 foreach (p; ptrs.byPair) 606 { 607 result._map[p.key] = p.value.timer; 608 } 609 return result; 610 } 611 // 612 // ticks until next event on level 0 or until next wheel rotation 613 // If you have empty ticks it is safe to sleep - you will not miss anything, just wake up 614 // at the time when next timer have to be processed. 615 //Returns: number of safe "sleep" ticks. 616 // 617 private int ticksUntilNextEvent() 618 out(r; r<=256) 619 { 620 int result = 1; 621 auto level = &levels[0]; 622 immutable uint now = levels[0].now & MASK; 623 auto slot = (now + 1) & MASK; 624 //assert(level.slots[now].head == null); 625 do 626 { 627 if (level.slots[slot].head != null) 628 { 629 break; 630 } 631 result++; 632 slot = (slot + 1) & MASK; 633 } 634 while(slot != now); 635 636 return min(result, 256-now); 637 } 638 639 private void advance_level(int level_index) 640 in(level_index>0) 641 { 642 debug(timingwheels) safe_tracef("running advance on level %d", level_index); 643 immutable now0 = levels[0].now; 644 auto level = &levels[level_index]; 645 immutable now = ++level.now; 646 immutable slot_index = now & MASK; 647 debug(timingwheels) safe_tracef("level %s, now=%s", level_index, now); 648 auto slot = &level.slots[slot_index]; 649 debug(timingwheels) safe_tracef("haldle l%s:s%s timers", level_index, slot_index); 650 while(slot.head) 651 { 652 auto listElement = slot.head; 653 654 immutable delta = listElement.scheduled_at - now0; 655 size_t lower_level_index = 0; 656 long t = delta; 657 size_t s = 1; // width of the slot in ticks on level 658 size_t shift = 0; 659 while(t > s<<8) // while t > slots on level 660 { 661 t -= (SLOTS - (levels[lower_level_index].now & MASK)) * s; 662 lower_level_index++; 663 s = s << 8; 664 shift += 8; 665 } 666 auto mask = s - 1; 667 size_t lower_level_slot_index = (levels[lower_level_index].now + (t>>shift) + ((t&mask)>0?1:0)) & MASK; 668 debug(timingwheels) safe_tracef("move timer id: %s, scheduledAt; %d to level %s, slot: %s (delta=%s)", 669 listElement.timer.id(), listElement.scheduled_at, lower_level_index, lower_level_slot_index, delta); 670 listElement.position = ((lower_level_index<<8) | lower_level_slot_index) & 0xffff; 671 dl_relink(listElement, &slot.head, &levels[lower_level_index].slots[lower_level_slot_index].head); 672 } 673 if (slot_index == 0 && level_index < LEVEL_MAX) 674 { 675 advance_level(level_index+1); 676 } 677 } 678 } 679 680 version(twtesting): 681 682 @("TimingWheels") 683 unittest 684 { 685 import std.stdio; 686 globalLogLevel = LogLevel.info; 687 TimingWheels!Timer w; 688 w.init(); 689 assert(w.t2l(1) == 0); 690 assert(w.t2s(1, 0) == 1); 691 immutable t = 0x00_00_00_11_00_00_00_77; 692 immutable level = w.t2l(t); 693 assert(level==4); 694 immutable slot = w.t2s(t, level); 695 assert(slot == 0x11); 696 auto timer = new Timer(); 697 () @nogc @safe { 698 w.schedule(timer, 2); 699 bool thrown; 700 // check that you can't schedule same timer twice 701 try 702 { 703 w.schedule(timer, 5); 704 } 705 catch(ScheduleTimerError e) 706 { 707 thrown = true; 708 } 709 assert(thrown); 710 thrown = false; 711 try 712 { 713 w.advance(1024); 714 } 715 catch(AdvanceWheelError e) 716 { 717 thrown = true; 718 } 719 assert(thrown); 720 thrown = false; 721 w.cancel(timer); 722 w.advance(1); 723 }(); 724 w = TimingWheels!Timer(); 725 w.init(); 726 w.schedule(timer, 1); 727 auto r = w.advance(1); 728 assert(r.timers.count == 1); 729 w.schedule(timer, 256); 730 r = w.advance(255); 731 assert(r.timers.count == 0); 732 r = w.advance(1); 733 assert(r.timers.count == 1); 734 w.schedule(timer, 256*256); 735 int c; 736 for(int i=0;i<256;i++) 737 { 738 r = w.advance(256); 739 c += r.timers.count; 740 } 741 assert(c==1); 742 } 743 @("rt") 744 @Tags("noauto") 745 unittest 746 { 747 globalLogLevel = LogLevel.info; 748 TimingWheels!Timer w; 749 Duration Tick = 5.msecs; 750 w.init(); 751 ulong now = Clock.currStdTime; 752 assert(now - w.currStdTime(Tick) < 5*10_000); 753 Thread.sleep(2*Tick); 754 now = Clock.currStdTime; 755 assert((now - w.currStdTime(Tick))/10_000 - (2*Tick).split!"msecs".msecs < 10); 756 auto toCatchUp = w.ticksToCatchUp(Tick, now); 757 toCatchUp.shouldEqual(2); 758 auto t = w.advance(toCatchUp); 759 toCatchUp = w.ticksToCatchUp(Tick, now); 760 toCatchUp.shouldEqual(0); 761 } 762 @("cancel") 763 unittest 764 { 765 globalLogLevel = LogLevel.info; 766 TimingWheels!Timer w; 767 w.init(); 768 Timer timer0 = new Timer(); 769 Timer timer1 = new Timer(); 770 w.schedule(timer0, 256); 771 w.schedule(timer1, 256+128); 772 auto r = w.advance(255); 773 assert(r.timers.count == 0); 774 w.cancel(timer0); 775 r = w.advance(1); 776 assert(r.timers.count == 0); 777 assertThrown!CancelTimerError(w.cancel(timer0)); 778 w.cancel(timer1); 779 } 780 @("ticksUntilNextEvent") 781 unittest 782 { 783 globalLogLevel = LogLevel.info; 784 TimingWheels!Timer w; 785 w.init(); 786 auto s = w.ticksUntilNextEvent; 787 assert(s==256); 788 auto r = w.advance(s); 789 assert(r.timers.count == 0); 790 Timer t = new Timer(); 791 w.schedule(t, 50); 792 s = w.ticksUntilNextEvent; 793 assert(s==50); 794 r = w.advance(s); 795 assert(r.timers.count == 1); 796 } 797 798 @("load") 799 @Serial 800 unittest 801 { 802 import std.array:array; 803 globalLogLevel = LogLevel.info; 804 enum TIMERS = 100_000; 805 Timer._current_id = 1; 806 auto w = TimingWheels!Timer(); 807 w.init(); 808 for(int i=1;i<=TIMERS;i++) 809 { 810 auto t = new Timer(); 811 w.schedule(t, i); 812 } 813 int counter; 814 for(int i=1;i<=TIMERS;i++) 815 { 816 auto r = w.advance(1); 817 auto timers = r.timers; 818 auto t = timers.array()[0]; 819 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 820 assert(timers.count == 1); 821 counter++; 822 } 823 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 824 825 for(int i=1;i<=TIMERS;i++) 826 { 827 auto t = new Timer(); 828 w.schedule(t, i); 829 } 830 counter = 0; 831 for(int i=TIMERS+1;i<=2*TIMERS;i++) 832 { 833 auto r = w.advance(1); 834 auto timers = r.timers; 835 auto t = timers.array()[0]; 836 assert(t.id == i, "expected t.id=%s, got %s".format(t.id, i)); 837 assert(timers.count == 1); 838 counter++; 839 } 840 assert(counter == TIMERS, "expected 100 timers, got %d".format(counter)); 841 842 } 843 // @("cornercase") 844 // @Serial 845 // unittest 846 // { 847 // Timer._current_id = 1; 848 // auto w = TimingWheels!Timer(); 849 // globalLogLevel = LogLevel.trace; 850 // w.advance(254); 851 // auto t = new Timer(); 852 // w.schedule(t, 511); 853 // for(int i=0; i<511; i++) 854 // { 855 // w.advance(1); 856 // } 857 // } 858 859 /// 860 /// 861 /// 862 @("example") 863 @Tags("noauto") 864 @Values(1.msecs,2.msecs,3.msecs,4.msecs,5.msecs,6.msecs,7.msecs,8.msecs, 9.msecs,10.msecs) 865 @Serial 866 unittest 867 { 868 import std; 869 globalLogLevel = LogLevel.info; 870 auto rnd = Random(142); 871 auto Tick = getValue!Duration(); 872 /// track execution 873 int counter; 874 SysTime last; 875 876 /// this is our Timer 877 class Timer 878 { 879 static ulong __id; 880 private ulong _id; 881 private string _name; 882 this(string name) 883 { 884 _id = __id++; 885 _name = name; 886 } 887 /// must provide id() method 888 ulong id() 889 { 890 return _id; 891 } 892 } 893 894 enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs 895 896 // each tick span 5 msecs - this is our link with time in reality 897 TimingWheels!Timer w; 898 w.init(); 899 auto durationToTicks(Duration d) 900 { 901 // we have to adjust w.now and realtime 'now' before scheduling timer 902 auto real_now = Clock.currStdTime; 903 auto tw_now = w.currStdTime(Tick); 904 auto delay = (real_now - tw_now).hnsecs; 905 return (d + delay)/Tick; 906 } 907 void process_timer(Timer t) 908 { 909 switch(t._name) 910 { 911 case "periodic": 912 if ( last.stdTime == 0) 913 { 914 // initialize tracking 915 last = Clock.currTime - 50.msecs; 916 } 917 auto delta = Clock.currTime - last; 918 assert(delta - 50.msecs <= max(Tick + Tick/20, 5.msecs), "delta-50.msecs=%s".format(delta-50.msecs)); 919 writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs); 920 last = Clock.currTime; 921 counter++; 922 w.schedule(t, durationToTicks(50.msecs)); // rearm 923 break; 924 default: 925 writefln("@ %s", t._name); 926 break; 927 } 928 } 929 // emulate some random initial delay 930 auto randomInitialDelay = uniform(0, 500, rnd).msecs; 931 Thread.sleep(randomInitialDelay); 932 // 933 // start one arbitrary timer and one periodic timer 934 // 935 auto some_timer = new Timer("some"); 936 auto periodic_timer = new Timer("periodic"); 937 w.schedule(some_timer, durationToTicks(32.msecs)); 938 w.schedule(periodic_timer, durationToTicks(50.msecs)); 939 940 while(counter < 10) 941 { 942 auto realNow = Clock.currStdTime; 943 auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs; 944 auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs); 945 // wait for what should happen earlier 946 auto time_to_sleep = min(randomIoInterval, nextTimerEvent); 947 writefln("* sleep until timer event or random I/O for %s", time_to_sleep); 948 Thread.sleep(time_to_sleep); 949 // make steps if required 950 int ticks = w.ticksToCatchUp(Tick, Clock.currStdTime); 951 if (ticks > 0) 952 { 953 auto wr = w.advance(ticks); 954 foreach(t; wr.timers) 955 { 956 process_timer(t); 957 } 958 } 959 // emulate some random processing time 960 Thread.sleep(uniform(0, 5, rnd).msecs); 961 } 962 }