1 module ut.concurrency.scheduler; 2 3 import concurrency.operations; 4 import concurrency.sender : DelaySender; 5 import concurrency; 6 import unit_threaded; 7 import concurrency.stoptoken; 8 import core.time : msecs; 9 import concurrency.scheduler; 10 11 @("scheduleAfter") 12 @safe unittest { 13 DelaySender(10.msecs).syncWait; 14 } 15 16 @("scheduleAfter.cancel") 17 @safe unittest { 18 race(DelaySender(10.msecs), DelaySender(3.msecs)).syncWait; 19 } 20 21 @("scheduleAfter.stop-before-add") 22 @safe unittest { 23 import concurrency.sender : delay, justFrom; 24 auto source = new shared StopSource(); 25 whenAll(justFrom(() shared => source.stop), delay(10.msecs)).syncWait(source); 26 } 27 28 @("ManualTimeWorker") 29 @safe unittest { 30 import core.atomic : atomicOp; 31 32 shared int g, h; 33 auto worker = new shared ManualTimeWorker(); 34 worker.addTimer((TimerTrigger trigger) shared { g.atomicOp!"+="(1); }, 10.msecs); 35 worker.addTimer((TimerTrigger trigger) shared { h.atomicOp!"+="(1); }, 5.msecs); 36 37 worker.timeUntilNextEvent().should == 5.msecs; 38 g.should == 0; 39 h.should == 0; 40 41 worker.advance(4.msecs); 42 worker.timeUntilNextEvent().should == 1.msecs; 43 h.should == 0; 44 g.should == 0; 45 46 worker.advance(1.msecs); 47 worker.timeUntilNextEvent().should == 5.msecs; 48 h.should == 1; 49 g.should == 0; 50 51 worker.advance(5.msecs); 52 h.should == 1; 53 g.should == 1; 54 worker.timeUntilNextEvent().should == null; 55 } 56 57 @("ManualTimeWorker.cancel") 58 @safe unittest { 59 import core.atomic : atomicOp; 60 61 shared int g; 62 auto worker = new shared ManualTimeWorker(); 63 auto timer = worker.addTimer((TimerTrigger trigger) shared { g.atomicOp!"+="(1 + (trigger == TimerTrigger.cancel)); }, 10.msecs); 64 worker.timeUntilNextEvent().should == 10.msecs; 65 g.should == 0; 66 67 worker.advance(4.msecs); 68 worker.timeUntilNextEvent().should == 6.msecs; 69 g.should == 0; 70 71 worker.cancelTimer(timer); 72 worker.timeUntilNextEvent().should == null; 73 g.should == 2; 74 } 75 76 @("ManualTimeWorker.error") 77 @safe unittest { 78 import core.time; 79 import concurrency.operations : withScheduler, whenAll; 80 import concurrency.sender : justFrom; 81 82 shared int p = 0; 83 import concurrency.scheduler : ManualTimeWorker; 84 85 auto worker = new shared ManualTimeWorker(); 86 87 auto sender = DelaySender(10.msecs) 88 .withScheduler(worker.getScheduler); 89 90 auto driver = justFrom(() shared { 91 worker.advance(7.msecs); 92 throw new Exception("halt"); 93 }); 94 95 whenAll(sender, driver).syncWait.assumeOk.shouldThrowWithMessage("halt"); 96 } 97 98 @("toSenderObject.Schedule") 99 @safe unittest { 100 import concurrency.sender : toSenderObject; 101 Schedule().toSenderObject.syncWait.assumeOk; 102 }