1 module ut.concurrency.thread; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import unit_threaded; 9 import core.atomic : atomicOp; 10 11 @("stdTaskPool") 12 @safe unittest { 13 import std.process : thisThreadID; 14 static auto fun() @trusted { 15 import core.thread : Thread; 16 import core.time : msecs; 17 Thread.sleep(10.msecs); 18 return thisThreadID; 19 } 20 auto pool = stdTaskPool(2); 21 22 auto task = justFrom(&fun); 23 auto scheduledTask = task.on(pool.getScheduler); 24 25 task.syncWait.value.should == thisThreadID; 26 scheduledTask.syncWait.value.shouldNotEqual(thisThreadID); 27 28 auto ts = whenAll(scheduledTask, scheduledTask).syncWait.value; 29 ts[0].shouldNotEqual(ts[1]); 30 } 31 32 @("stdTaskPool.scope") 33 @safe unittest { 34 void disappearScheduler(StdTaskPoolProtoScheduler p) @safe; 35 void disappearSender(Sender)(Sender s) @safe; 36 37 auto pool = stdTaskPool(2); 38 39 auto scheduledTask = VoidSender().on(pool.getScheduler); 40 41 // ensure we can't leak the scheduler 42 static assert(!__traits(compiles, disappearScheduler(pool.getScheduler))); 43 44 // ensure we can't leak a sender that scheduled on the scoped pool 45 static assert(!__traits(compiles, disappearSender(scheduledTask))); 46 } 47 48 @("stdTaskPool.assert") 49 @system unittest { 50 import std.exception : assertThrown; 51 import core.exception : AssertError; 52 auto pool = stdTaskPool(2); 53 just(42).then((int i) => assert(i == 99, "i must be 99")).via(pool.getScheduler.schedule()).syncWait.assertThrown!(AssertError)("i must be 99"); 54 } 55 56 @("ThreadSender.assert") 57 @system unittest { 58 import std.exception : assertThrown; 59 import core.exception : AssertError; 60 just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender()).syncWait.assertThrown!(AssertError)("i must be 99"); 61 } 62 63 @("localThreadWorker.assert") 64 @system unittest { 65 import std.exception : assertThrown; 66 import core.exception : AssertError; 67 just(42).then((int i) => assert(i == 99, "i must be 99")).syncWait.assertThrown!(AssertError)("i must be 99"); 68 } 69 70 @("ThreadSender.whenAll.assert") 71 @system unittest { 72 import std.exception : assertThrown; 73 import core.time : msecs; 74 import core.exception : AssertError; 75 auto fail = just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender()); 76 auto slow = delay(100.msecs); 77 whenAll(fail,slow).syncWait.assertThrown!(AssertError)("i must be 99"); 78 }