1 module ut.concurrency.scheduler;
2 
3 import concurrency.operations;
4 import concurrency.sender : DelaySender;
5 import concurrency;
6 import unit_threaded;
7 import concurrency.stoptoken;
8 import core.time : msecs;
9 import concurrency.scheduler;
10 
11 @("scheduleAfter")
12 @safe unittest {
13   DelaySender(10.msecs).syncWait;
14 }
15 
16 @("scheduleAfter.cancel")
17 @safe unittest {
18   race(DelaySender(10.msecs), DelaySender(3.msecs)).syncWait;
19 }
20 
21 @("scheduleAfter.stop-before-add")
22 @safe unittest {
23   import concurrency.sender : delay, justFrom;
24   auto source = new shared StopSource();
25   whenAll(justFrom(() shared => source.stop), delay(10.msecs)).syncWait(source);
26 }
27 
28 @("ManualTimeWorker")
29 @safe unittest {
30   import core.atomic : atomicOp;
31 
32   shared int g, h;
33   auto worker = new shared ManualTimeWorker();
34   worker.addTimer((TimerTrigger trigger) shared { g.atomicOp!"+="(1); }, 10.msecs);
35   worker.addTimer((TimerTrigger trigger) shared { h.atomicOp!"+="(1); }, 5.msecs);
36 
37   worker.timeUntilNextEvent().should == 5.msecs;
38   g.should == 0;
39   h.should == 0;
40 
41   worker.advance(4.msecs);
42   worker.timeUntilNextEvent().should == 1.msecs;
43   h.should == 0;
44   g.should == 0;
45 
46   worker.advance(1.msecs);
47   worker.timeUntilNextEvent().should == 5.msecs;
48   h.should == 1;
49   g.should == 0;
50 
51   worker.advance(5.msecs);
52   h.should == 1;
53   g.should == 1;
54   worker.timeUntilNextEvent().should == null;
55 }
56 
57 @("ManualTimeWorker.cancel")
58 @safe unittest {
59   import core.atomic : atomicOp;
60 
61   shared int g;
62   auto worker = new shared ManualTimeWorker();
63   auto timer = worker.addTimer((TimerTrigger trigger) shared { g.atomicOp!"+="(1 + (trigger == TimerTrigger.cancel)); }, 10.msecs);
64   worker.timeUntilNextEvent().should == 10.msecs;
65   g.should == 0;
66 
67   worker.advance(4.msecs);
68   worker.timeUntilNextEvent().should == 6.msecs;
69   g.should == 0;
70 
71   worker.cancelTimer(timer);
72   worker.timeUntilNextEvent().should == null;
73   g.should == 2;
74 }
75 
76 @("ManualTimeWorker.error")
77 @safe unittest {
78   import core.time;
79   import concurrency.operations : withScheduler, whenAll;
80   import concurrency.sender : justFrom;
81 
82   shared int p = 0;
83   import concurrency.scheduler : ManualTimeWorker;
84 
85   auto worker = new shared ManualTimeWorker();
86 
87   auto sender = DelaySender(10.msecs)
88     .withScheduler(worker.getScheduler);
89 
90   auto driver = justFrom(() shared {
91       worker.advance(7.msecs);
92       throw new Exception("halt");
93     });
94 
95   whenAll(sender, driver).syncWait.assumeOk.shouldThrowWithMessage("halt");
96 }
97 
98 @("toSenderObject.Schedule")
99 @safe unittest {
100   import concurrency.sender : toSenderObject;
101   Schedule().toSenderObject.syncWait.assumeOk;
102 }