1 module ut.concurrency.operations; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import concurrency.stoptoken; 9 import concurrency.nursery; 10 import unit_threaded; 11 import core.time; 12 import core.thread; 13 import std.typecons; 14 15 /// Used to test that Senders keep the operational state alive until one receiver's terminal is called 16 struct OutOfBandValueSender(T) { 17 alias Value = T; 18 T value; 19 struct Op(Receiver) { 20 Receiver receiver; 21 T value; 22 void run() { 23 receiver.setValue(value); 24 } 25 void start() @trusted scope { 26 auto value = new Thread(&this.run).start(); 27 } 28 } 29 auto connect(Receiver)(return Receiver receiver) @safe scope return { 30 // ensure NRVO 31 auto op = Op!(Receiver)(receiver, value); 32 return op; 33 } 34 } 35 36 @("ignoreErrors.syncWait.value") 37 @safe unittest { 38 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 39 ThreadSender() 40 .then(dg) 41 .ignoreError() 42 .syncWait.isCancelled.should == true; 43 } 44 45 @("oob") 46 unittest { 47 auto oob = OutOfBandValueSender!int(43); 48 oob.syncWait.value.should == 43; 49 } 50 51 @("race") 52 unittest { 53 race(ValueSender!int(4), ValueSender!int(5)).syncWait.value.should == 4; 54 auto fastThread = ThreadSender().then(() shared => 1); 55 auto slowThread = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); return 2; }); 56 race(fastThread, slowThread).syncWait.value.should == 1; 57 race(slowThread, fastThread).syncWait.value.should == 1; 58 } 59 60 @("race.multiple") 61 unittest { 62 race(ValueSender!int(4), ValueSender!int(5), ValueSender!int(6)).syncWait.value.should == 4; 63 } 64 65 @("race.exception.single") 66 unittest { 67 race(ThrowingSender(), ValueSender!int(5)).syncWait.value.should == 5; 68 race(ThrowingSender(), ThrowingSender()).syncWait.assumeOk.shouldThrow(); 69 } 70 71 @("race.exception.double") 72 unittest { 73 auto slow = ThreadSender().then(() shared @trusted { Thread.sleep(50.msecs); throw new Exception("Slow"); }); 74 auto fast = ThreadSender().then(() shared { throw new Exception("Fast"); }); 75 race(slow, fast).syncWait.assumeOk.shouldThrowWithMessage("Fast"); 76 } 77 78 @("race.cancel-other") 79 unittest { 80 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 81 while (!token.isStopRequested) { Thread.yield(); } 82 }); 83 race(waiting, ValueSender!int(88)).syncWait.value.get.should == 88; 84 } 85 86 @("race.cancel") 87 unittest { 88 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 89 while (!token.isStopRequested) { Thread.yield(); } 90 }); 91 auto nursery = new shared Nursery(); 92 nursery.run(race(waiting, waiting)); 93 nursery.run(ThreadSender().then(() @trusted shared { Thread.sleep(50.msecs); nursery.stop(); })); 94 nursery.syncWait.isCancelled.should == true; 95 } 96 97 @("race.array.just") 98 @safe unittest { 99 race([just(4), just(5)]).syncWait.value.should == 4; 100 } 101 102 @("race.array.void") 103 @safe unittest { 104 race([VoidSender(), VoidSender()]).syncWait.assumeOk; 105 } 106 107 @("via") 108 unittest { 109 import std.typecons : tuple; 110 ValueSender!int(3).via(ValueSender!int(6)).syncWait.value.should == tuple(6,3); 111 ValueSender!int(5).via(VoidSender()).syncWait.value.should == 5; 112 VoidSender().via(ValueSender!int(4)).syncWait.value.should == 4; 113 } 114 115 @("then.value.delegate") 116 @safe unittest { 117 ValueSender!int(3).then((int i) shared => i*3).syncWait.value.shouldEqual(9); 118 } 119 120 @("then.value.function") 121 @safe unittest { 122 ValueSender!int(3).then((int i) => i*3).syncWait.value.shouldEqual(9); 123 } 124 125 @("then.oob") 126 @safe unittest { 127 OutOfBandValueSender!int(46).then((int i) shared => i*3).syncWait.value.shouldEqual(138); 128 } 129 130 @("then.tuple") 131 @safe unittest { 132 just(1,2,3).then((Tuple!(int,int,int) t) shared => t[0]).syncWait.value.shouldEqual(1); 133 } 134 135 @("then.tuple.expand") 136 @safe unittest { 137 just(1,2,3).then((int a,int b,int c) shared => a+b).syncWait.value.shouldEqual(3); 138 } 139 140 @("finally") 141 unittest { 142 ValueSender!int(1).finally_(() => 4).syncWait.value.should == 4; 143 ValueSender!int(2).finally_(3).syncWait.value.should == 3; 144 ThrowingSender().finally_(3).syncWait.value.should == 3; 145 ThrowingSender().finally_(() => 4).syncWait.value.should == 4; 146 ThrowingSender().finally_(3).syncWait.value.should == 3; 147 DoneSender().finally_(() => 4).syncWait.isCancelled.should == true; 148 DoneSender().finally_(3).syncWait.isCancelled.should == true; 149 } 150 151 @("whenAll.basic") 152 unittest { 153 whenAll(ValueSender!int(1), ValueSender!int(2)).syncWait.value.should == tuple(1,2); 154 whenAll(ValueSender!int(1), ValueSender!int(2), ValueSender!int(3)).syncWait.value.should == tuple(1,2,3); 155 whenAll(VoidSender(), ValueSender!int(2)).syncWait.value.should == 2; 156 whenAll(ValueSender!int(1), VoidSender()).syncWait.value.should == 1; 157 whenAll(VoidSender(), VoidSender()).syncWait.assumeOk; 158 whenAll(ValueSender!int(1), ThrowingSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 159 whenAll(ThrowingSender(), ValueSender!int(1)).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 160 whenAll(ValueSender!int(1), DoneSender()).syncWait.isCancelled.should == true; 161 whenAll(DoneSender(), ValueSender!int(1)).syncWait.isCancelled.should == true; 162 whenAll(DoneSender(), ThrowingSender()).syncWait.isCancelled.should == true; 163 whenAll(ThrowingSender(), DoneSender()).syncWait.assumeOk.shouldThrowWithMessage("ThrowingSender"); 164 165 } 166 167 @("whenAll.cancel") 168 unittest { 169 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 170 while (!token.isStopRequested) { Thread.yield(); } 171 }); 172 whenAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 173 whenAll(ThrowingSender(), waiting).syncWait.assumeOk.shouldThrow; 174 whenAll(waiting, ThrowingSender()).syncWait.assumeOk.shouldThrow; 175 auto waitingInt = ThreadSender().withStopToken((StopToken token) @trusted { 176 while (!token.isStopRequested) { Thread.yield(); } 177 return 42; 178 }); 179 whenAll(waitingInt, DoneSender()).syncWait.isCancelled.should == true; 180 whenAll(ThrowingSender(), waitingInt).syncWait.assumeOk.shouldThrow; 181 whenAll(waitingInt, ThrowingSender()).syncWait.assumeOk.shouldThrow; 182 } 183 184 @("whenAll.stop") 185 unittest { 186 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 187 while (!token.isStopRequested) { Thread.yield(); } 188 }); 189 auto source = new StopSource(); 190 auto stopper = just(source).then((StopSource source) shared => source.stop()); 191 whenAll(waiting, stopper).withStopSource(source).syncWait.isCancelled.should == true; 192 } 193 194 @("whenAll.array.just") 195 unittest { 196 whenAll([just(4), just(5)]).syncWait.value.should == [4,5]; 197 } 198 199 @("whenAll.array.void") 200 unittest { 201 whenAll([VoidSender(), VoidSender()]).syncWait.assumeOk; 202 } 203 204 @("retry") 205 unittest { 206 ValueSender!int(5).retry(Times(5)).syncWait.value.should == 5; 207 int t = 3; 208 int n = 0; 209 struct Sender { 210 alias Value = void; 211 static struct Op(Receiver) { 212 Receiver receiver; 213 bool fail; 214 void start() @safe nothrow { 215 if (fail) 216 receiver.setError(new Exception("Fail fail fail")); 217 else 218 receiver.setValue(); 219 } 220 } 221 auto connect(Receiver)(return Receiver receiver) @safe scope return { 222 // ensure NRVO 223 auto op = Op!(Receiver)(receiver, n++ < t); 224 return op; 225 } 226 } 227 Sender().retry(Times(5)).syncWait.assumeOk; 228 n.should == 4; 229 n = 0; 230 231 Sender().retry(Times(2)).syncWait.assumeOk.shouldThrowWithMessage("Fail fail fail"); 232 n.should == 2; 233 shared int p = 0; 234 ThreadSender().then(()shared { import core.atomic; p.atomicOp!("+=")(1); throw new Exception("Failed"); }).retry(Times(5)).syncWait.assumeOk.shouldThrowWithMessage("Failed"); 235 p.should == 5; 236 } 237 238 @("whenAll.oob") 239 unittest { 240 auto oob = OutOfBandValueSender!int(43); 241 auto value = ValueSender!int(11); 242 whenAll(oob, value).syncWait.value.should == tuple(43, 11); 243 } 244 245 @("withStopToken.oob") 246 unittest { 247 auto oob = OutOfBandValueSender!int(44); 248 oob.withStopToken((StopToken stopToken, int t) => t).syncWait.value.should == 44; 249 } 250 251 @("withStopSource.oob") 252 unittest { 253 auto oob = OutOfBandValueSender!int(45); 254 oob.withStopSource(new StopSource()).syncWait.value.should == 45; 255 } 256 257 @("withStopSource.tuple") 258 unittest { 259 just(14, 53).withStopToken((StopToken s, Tuple!(int, int) t) => t[0]*t[1]).syncWait.value.should == 742; 260 } 261 262 @("value.withstoptoken.via.thread") 263 @safe unittest { 264 ValueSender!int(4).withStopToken((StopToken s, int i) { throw new Exception("Badness");}).via(ThreadSender()).syncWait.assumeOk.shouldThrowWithMessage("Badness"); 265 } 266 267 @("completewithcancellation") 268 @safe unittest { 269 ValueSender!void().completeWithCancellation.syncWait.isCancelled.should == true; 270 } 271 272 @("raceAll") 273 @safe unittest { 274 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 275 while (!token.isStopRequested) { Thread.yield(); } 276 }); 277 raceAll(waiting, DoneSender()).syncWait.isCancelled.should == true; 278 raceAll(waiting, just(42)).syncWait.value.should == 42; 279 raceAll(waiting, ThrowingSender()).syncWait.isError.should == true; 280 } 281 282 @("on.ManualTimeWorker") 283 @safe unittest { 284 import concurrency.scheduler : ManualTimeWorker; 285 286 auto worker = new shared ManualTimeWorker(); 287 auto driver = just(worker).then((shared ManualTimeWorker worker) shared { 288 worker.timeUntilNextEvent().should == 10.msecs; 289 worker.advance(5.msecs); 290 worker.timeUntilNextEvent().should == 5.msecs; 291 worker.advance(5.msecs); 292 worker.timeUntilNextEvent().should == null; 293 }); 294 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 295 296 whenAll(timer, driver).syncWait().assumeOk; 297 } 298 299 @("on.ManualTimeWorker.cancel") 300 @safe unittest { 301 import concurrency.scheduler : ManualTimeWorker; 302 303 auto worker = new shared ManualTimeWorker(); 304 auto source = new StopSource(); 305 auto driver = just(source).then((StopSource source) shared { 306 worker.timeUntilNextEvent().should == 10.msecs; 307 source.stop(); 308 worker.timeUntilNextEvent().should == null; 309 }); 310 auto timer = DelaySender(10.msecs).withScheduler(worker.getScheduler); 311 312 whenAll(timer, driver).syncWait(source).isCancelled.should == true; 313 } 314 315 @("then.stack.no-leak") 316 @safe unittest { 317 struct S { 318 void fun(int i) shared { 319 } 320 } 321 shared S s; 322 // its perfectly ok to point to a function on the stack 323 auto sender = just(42).then(&s.fun); 324 325 sender.syncWait(); 326 327 void disappearSender(Sender)(Sender s) @safe; 328 // but the sender can't leak now 329 static assert(!__traits(compiles, disappearSender(sender))); 330 } 331 332 @("forwardOn") 333 @safe unittest { 334 auto pool = stdTaskPool(2); 335 336 VoidSender().forwardOn(pool.getScheduler).syncWait.assumeOk; 337 ErrorSender(new Exception("bad news")).forwardOn(pool.getScheduler).syncWait.isError.should == true; 338 DoneSender().forwardOn(pool.getScheduler).syncWait.isCancelled.should == true; 339 just(42).forwardOn(pool.getScheduler).syncWait.value.should == 42; 340 } 341 342 @("toSingleton") 343 @safe unittest { 344 import std.typecons : tuple; 345 import concurrency.scheduler : ManualTimeWorker; 346 import core.atomic : atomicOp; 347 348 shared int g; 349 350 auto worker = new shared ManualTimeWorker(); 351 352 auto single = delay(2.msecs).then(() shared => g.atomicOp!"+="(1)).toSingleton(worker.getScheduler); 353 354 auto driver = justFrom(() shared => worker.advance(2.msecs)); 355 356 whenAll(single, single, driver).syncWait.value.should == tuple(1,1); 357 whenAll(single, single, driver).syncWait.value.should == tuple(2,2); 358 } 359 360 @("stopOn") 361 @safe unittest { 362 auto sourceInner = new shared StopSource(); 363 auto sourceOuter = new shared StopSource(); 364 365 shared bool b; 366 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 367 just(() => sourceOuter.stop()) 368 ).syncWait(sourceOuter).assumeOk; 369 b.should == true; 370 371 shared bool d; 372 whenAll(delay(5.msecs).then(() shared => b = true).stopOn(StopToken(sourceInner)), 373 just(() => sourceInner.stop()) 374 ).syncWait(sourceOuter).assumeOk; 375 d.should == false; 376 } 377 378 @("withChild") 379 @safe unittest { 380 import core.atomic; 381 382 class State { 383 import core.sync.event : Event; 384 bool parentAfterChild; 385 Event childEvent, parentEvent; 386 this() shared @trusted { 387 (cast()childEvent).initialize(false, false); 388 (cast()parentEvent).initialize(false, false); 389 } 390 void signalChild() shared @trusted { 391 (cast()childEvent).set(); 392 } 393 void waitChild() shared @trusted { 394 (cast()childEvent).wait(); 395 } 396 void signalParent() shared @trusted { 397 (cast()parentEvent).set(); 398 } 399 void waitParent() shared @trusted { 400 (cast()parentEvent).wait(); 401 } 402 } 403 auto state = new shared State(); 404 auto source = new shared StopSource; 405 406 import std.stdio; 407 auto child = just(state).withStopToken((StopToken token, shared State state) @trusted { 408 while(!token.isStopRequested) {} 409 state.signalParent(); 410 state.waitChild(); 411 }).via(ThreadSender()); 412 413 auto parent = just(state).withStopToken((StopToken token, shared State state){ 414 state.waitParent(); 415 state.parentAfterChild.atomicStore(token.isStopRequested == false); 416 state.signalChild(); 417 }).via(ThreadSender()); 418 419 whenAll(parent.withChild(child).withStopSource(source), just(source).then((shared StopSource s) => s.stop())).syncWait.isCancelled; 420 421 state.parentAfterChild.atomicLoad.should == true; 422 } 423 424 @("onTermination.value") 425 @safe unittest { 426 import core.atomic : atomicOp; 427 shared int g = 0; 428 just(42).onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 429 g.should == 1; 430 } 431 432 @("onTermination.done") 433 @safe unittest { 434 import core.atomic : atomicOp; 435 shared int g = 0; 436 DoneSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 437 g.should == 1; 438 } 439 440 @("onTermination.error") 441 @safe unittest { 442 import core.atomic : atomicOp; 443 shared int g = 0; 444 ThrowingSender().onTermination(() @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 445 g.should == 1; 446 } 447 448 @("onError.value") 449 @safe unittest { 450 import core.atomic : atomicOp; 451 shared int g = 0; 452 just(42).onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.assumeOk; 453 g.should == 0; 454 } 455 456 @("onError.done") 457 @safe unittest { 458 import core.atomic : atomicOp; 459 shared int g = 0; 460 DoneSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isCancelled.should == true; 461 g.should == 0; 462 } 463 464 @("onError.error") 465 @safe unittest { 466 import core.atomic : atomicOp; 467 shared int g = 0; 468 ThrowingSender().onError((Exception e) @safe shared => g.atomicOp!"+="(1)).syncWait.isError.should == true; 469 g.should == 1; 470 } 471 472 @("onError.throw") 473 @safe unittest { 474 import core.exception : AssertError; 475 auto err = ThrowingSender().onError((Exception e) @safe shared { throw new Exception("in onError"); }).syncWait.error; 476 err.msg.should == "in onError"; 477 err.next.msg.should == "ThrowingSender"; 478 } 479 480 @("stopWhen.source.value") 481 @safe unittest { 482 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 483 while (!token.isStopRequested) { Thread.yield(); } 484 return 43; 485 }); 486 auto trigger = delay(100.msecs); 487 waiting.stopWhen(trigger).syncWait().value.should == 43; 488 } 489 490 @("stopWhen.source.error") 491 @safe unittest { 492 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 493 while (!token.isStopRequested) { Thread.yield(); } 494 throw new Exception("Upside down"); 495 }); 496 auto trigger = delay(100.msecs); 497 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("Upside down"); 498 } 499 500 @("stopWhen.source.cancelled") 501 @safe unittest { 502 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 503 while (!token.isStopRequested) { Thread.yield(); } 504 }).completeWithCancellation; 505 auto trigger = delay(100.msecs); 506 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 507 } 508 509 @("stopWhen.trigger.error") 510 @safe unittest { 511 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 512 while (!token.isStopRequested) { Thread.yield(); } 513 throw new Exception("This occurres later, so the other one gets propagated"); 514 }); 515 auto trigger = ThrowingSender(); 516 waiting.stopWhen(trigger).syncWait().assumeOk.shouldThrowWithMessage("ThrowingSender"); 517 } 518 519 @("stopWhen.trigger.cancelled.value") 520 @safe unittest { 521 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 522 while (!token.isStopRequested) { Thread.yield(); } 523 return 42; 524 }); 525 auto trigger = delay(100.msecs).completeWithCancellation; 526 waiting.stopWhen(trigger).syncWait().isCancelled.should == true; 527 }