1 module ut.concurrency.thread;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("stdTaskPool")
12 @safe unittest {
13   import std.process : thisThreadID;
14   static auto fun() @trusted {
15     import core.thread : Thread;
16     import core.time : msecs;
17     Thread.sleep(10.msecs);
18     return thisThreadID;
19   }
20   auto pool = stdTaskPool(2);
21 
22   auto task = justFrom(&fun);
23   auto scheduledTask = task.on(pool.getScheduler);
24 
25   task.syncWait.value.should == thisThreadID;
26   scheduledTask.syncWait.value.shouldNotEqual(thisThreadID);
27 
28   auto ts = whenAll(scheduledTask, scheduledTask).syncWait.value;
29   ts[0].shouldNotEqual(ts[1]);
30 }
31 
32 @("stdTaskPool.scope")
33 @safe unittest {
34   void disappearScheduler(StdTaskPoolProtoScheduler p) @safe;
35   void disappearSender(Sender)(Sender s) @safe;
36 
37   auto pool = stdTaskPool(2);
38 
39   auto scheduledTask = VoidSender().on(pool.getScheduler);
40 
41   // ensure we can't leak the scheduler
42   static assert(!__traits(compiles, disappearScheduler(pool.getScheduler)));
43 
44   // ensure we can't leak a sender that scheduled on the scoped pool
45   static assert(!__traits(compiles, disappearSender(scheduledTask)));
46 }
47 
48 @("stdTaskPool.assert")
49 @system unittest {
50   import std.exception : assertThrown;
51   import core.exception : AssertError;
52   auto pool = stdTaskPool(2);
53   just(42).then((int i) => assert(i == 99, "i must be 99")).via(pool.getScheduler.schedule()).syncWait.assertThrown!(AssertError)("i must be 99");
54 }
55 
56 @("ThreadSender.assert")
57 @system unittest {
58   import std.exception : assertThrown;
59   import core.exception : AssertError;
60   just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender()).syncWait.assertThrown!(AssertError)("i must be 99");
61 }
62 
63 @("localThreadWorker.assert")
64 @system unittest {
65   import std.exception : assertThrown;
66   import core.exception : AssertError;
67   just(42).then((int i) => assert(i == 99, "i must be 99")).syncWait.assertThrown!(AssertError)("i must be 99");
68 }
69 
70 @("ThreadSender.whenAll.assert")
71 @system unittest {
72   import std.exception : assertThrown;
73   import core.time : msecs;
74   import core.exception : AssertError;
75   auto fail = just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender());
76   auto slow = delay(100.msecs);
77   whenAll(fail,slow).syncWait.assertThrown!(AssertError)("i must be 99");
78 }