1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 void run() { 23 receiver.setValue(value); 24 } 25 void start() @trusted scope { 26 auto value = new Thread(&this.run).start(); 27 } 28 } 29 auto connect(Receiver)(return Receiver receiver) @safe scope return { 30 // ensure NRVO 31 auto op = Op!(Receiver)(receiver, value); 32 return op; 33 } 34 } 35 36 @("ignoreErrors.syncWait.value") 37 @safe unittest { 38 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 39 ThreadSender() 40 .then(dg) 41 .ignoreError() 42 .syncWait.isCancelled.should == true; 43 } 44 45 @("oob") 46 unittest { 47 auto oob = OutOfBandValueSender!int(43); 48 oob.syncWait.value.should == 43; 49 } 50 51 @("race") 52 unittest { 53 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 54 auto fastThread = ThreadSender().then(() shared => 1); 55 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 56 race(fastThread, slowThread).syncWait.value.should == 1; 57 race(slowThread, fastThread).syncWait.value.should == 1; 58 } 59 60 @("race.multiple") 61 unittest { 62 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 63 } 64 65 @("race.oob") 66 unittest { 67 auto oob = OutOfBandValueSender!int(43); 68 auto value = ValueSender!int(11); 69 race(oob, value).syncWait.value.should == 11; 70 } 71 72 @("race.exception.single") 73 unittest { 74 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 75 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 76 } 77 78 @("race.exception.double") 79 unittest { 80 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 81 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 82 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 83 } 84 85 @("race.cancel-other") 86 unittest { 87 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 88 while (!token.isStopRequested) { Thread.yield(); } 89 }); 90 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 91 } 92 93 @("race.cancel") 94 unittest { 95 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 96 while (!token.isStopRequested) { Thread.yield(); } 97 }); 98 auto nursery = new shared Nursery(); 99 nursery.run(race(waiting, waiting)); 100 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 101 nursery.syncWait.isCancelled.should == true; 102 } 103 104 @("via") 105 unittest { 106 import std.typecons : tuple; 107 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 108 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 109 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 110 } 111 112 @("then.value") 113 @safe unittest { 114 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 115 } 116 117 @("then.oob") 118 @safe unittest { 119 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 120 } 121 122 @("finally") 123 unittest { 124 ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4; 125 ValueSender!int(2).finally_(3).syncWait.value.should == 3; 126 ThrowingSender().finally_(3).syncWait.value.should == 3; 127 ThrowingSender().finally_(() => 4).syncWait.value.should == 4; 128 ThrowingSender().finally_(3).syncWait.value.should == 3; 129 DoneSender().finally_(() => 4).syncWait.isCancelled.should == true; 130 DoneSender().finally_(3).syncWait.isCancelled.should == true; 131 } 132 133 @("whenAll") 134 unittest { 135 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 136 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 137 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 138 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 139 whenAll(VoidSender(), VoidSender()).syncWait.isOk.should == true; 140 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 141 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 142 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 143 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 144 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 145 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 146 147 } 148 149 @("whenAll.cancel") 150 unittest { 151 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 152 while (!token.isStopRequested) { Thread.yield(); } 153 }); 154 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 155 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 156 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 157 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 158 while (!token.isStopRequested) { Thread.yield(); } 159 return 42; 160 }); 161 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 162 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 163 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 164 } 165 166 @("whenAll.stop") 167 unittest { 168 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 169 while (!token.isStopRequested) { Thread.yield(); } 170 }); 171 auto source = new StopSource(); 172 auto stopper = just(source).then((StopSource source) shared => source.stop()); 173 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 174 } 175 176 @("retry") 177 unittest { 178 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 179 int t = 3; 180 int n = 0; 181 struct Sender { 182 alias Value = void; 183 static struct Op(Receiver) { 184 Receiver receiver; 185 bool fail; 186 void start() @safe nothrow { 187 if (fail) 188 receiver.setError(new Exception("Fail fail fail")); 189 else 190 receiver.setValue(); 191 } 192 } 193 auto connect(Receiver)(return Receiver receiver) @safe scope return { 194 // ensure NRVO 195 auto op = Op!(Receiver)(receiver, n++ < t); 196 return op; 197 } 198 } 199 Sender().retry(Times(5)).syncWait.isOk.should == true; 200 n.should == 4; 201 n = 0; 202 203 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 204 n.should == 2; 205 shared int p = 0; 206 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 207 p.should == 5; 208 } 209 210 @("whenAll.oob") 211 unittest { 212 auto oob = OutOfBandValueSender!int(43); 213 auto value = ValueSender!int(11); 214 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 215 } 216 217 @("withStopToken.oob") 218 unittest { 219 auto oob = OutOfBandValueSender!int(44); 220 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 221 } 222 223 @("withStopSource.oob") 224 unittest { 225 auto oob = OutOfBandValueSender!int(45); 226 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 227 } 228 229 @("value.withstoptoken.via.thread") 230 @safe unittest { 231 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 232 } 233 234 @("completewithcancellation") 235 @safe unittest { 236 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 237 } 238 239 @("raceAll") 240 @safe unittest { 241 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 242 while (!token.isStopRequested) { Thread.yield(); } 243 }); 244 raceAll(waiting, DoneSender()).syncWait.isOk.should == true; 245 raceAll(waiting, just(42)).syncWait.value.should == 42; 246 raceAll(waiting, ThrowingSender()).syncWait.isOk.should == true; 247 } 248 249 @("on.ManualTimeWorker") 250 @safe unittest { 251 import concurrency.scheduler : ManualTimeWorker; 252 253 auto worker = new shared ManualTimeWorker(); 254 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 255 worker.timeUntilNextEvent().should == 10.msecs; 256 worker.advance(5.msecs); 257 worker.timeUntilNextEvent().should == 5.msecs; 258 worker.advance(5.msecs); 259 worker.timeUntilNextEvent().should == null; 260 }); 261 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 262 263 whenAll(timer, driver).syncWait().isOk.should == true; 264 } 265 266 @("on.ManualTimeWorker.cancel") 267 @safe unittest { 268 import concurrency.scheduler : ManualTimeWorker; 269 270 auto worker = new shared ManualTimeWorker(); 271 auto source = new StopSource(); 272 auto driver = just(source).then((StopSource source) shared { 273 worker.timeUntilNextEvent().should == 10.msecs; 274 source.stop(); 275 worker.timeUntilNextEvent().should == null; 276 }); 277 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 278 279 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 280 } 281 282 @("then.stack.no-leak") 283 @safe unittest { 284 struct S { 285 void fun(int i) shared { 286 } 287 } 288 shared S s; 289 // its perfectly ok to point to a function on the stack 290 auto sender = just(42).then(&s.fun); 291 292 sender.syncWait(); 293 294 void disappearSender(Sender)(Sender s) @safe; 295 // but the sender can't leak now 296 static assert(!__traits(compiles, disappearSender(sender))); 297 }