1 module ut.concurrency.thread;
2 
3 import concurrency;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concurrency.operations;
7 import concurrency.receiver;
8 import unit_threaded;
9 import core.atomic : atomicOp;
10 
11 @("stdTaskPool")
12 @safe unittest {
13   import std.process : thisThreadID;
14   static auto fun() @trusted {
15     import core.thread : Thread;
16     import core.time : msecs;
17     Thread.sleep(10.msecs);
18     return thisThreadID;
19   }
20   auto pool = stdTaskPool(2);
21 
22   auto task = justFrom(&fun);
23   auto scheduledTask = task.on(pool.getScheduler);
24 
25   task.syncWait.value.should == thisThreadID;
26   scheduledTask.syncWait.value.shouldNotEqual(thisThreadID);
27 
28   auto ts = whenAll(scheduledTask, scheduledTask).syncWait.value;
29   ts[0].shouldNotEqual(ts[1]);
30 }
31 
32 @("stdTaskPool.scope")
33 @safe unittest {
34   void disappearScheduler(StdTaskPoolProtoScheduler p) @safe;
35   void disappearSender(Sender)(Sender s) @safe;
36 
37   auto pool = stdTaskPool(2);
38 
39   auto scheduledTask = VoidSender().on(pool.getScheduler);
40 
41   // ensure we can't leak the scheduler
42   static assert(!__traits(compiles, disappearScheduler(pool.getScheduler)));
43 
44   // ensure we can't leak a sender that scheduled on the scoped pool
45   static assert(!__traits(compiles, disappearSender(scheduledTask)));
46 }
47 
48 @("stdTaskPool.assert")
49 @system unittest {
50   import std.exception : assertThrown;
51   import core.exception : AssertError;
52   auto pool = stdTaskPool(2);
53   just(42).then((int i) => assert(i == 99, "i must be 99")).via(pool.getScheduler.schedule()).syncWait.assertThrown!(AssertError)("i must be 99");
54 }
55 
56 @("ThreadSender.assert")
57 @system unittest {
58   import std.exception : assertThrown;
59   import core.exception : AssertError;
60   just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender()).syncWait.assertThrown!(AssertError)("i must be 99");
61 }
62 
63 @("localThreadWorker.assert")
64 @system unittest {
65   import std.exception : assertThrown;
66   import core.exception : AssertError;
67   just(42).then((int i) => assert(i == 99, "i must be 99")).syncWait.assertThrown!(AssertError)("i must be 99");
68 }
69 
70 @("ThreadSender.whenAll.assert")
71 @system unittest {
72   import std.exception : assertThrown;
73   import core.time : msecs;
74   import core.exception : AssertError;
75   auto fail = just(42).then((int i) => assert(i == 99, "i must be 99")).via(ThreadSender());
76   auto slow = delay(100.msecs);
77   whenAll(fail,slow).syncWait.assertThrown!(AssertError)("i must be 99");
78 }
79 
80 @("dynamicLoadRaw.getThreadLocalExecutor")
81 @safe unittest {
82   import concurrency.utils;
83   dynamicLoadRaw!concurrency_getLocalThreadExecutor.should.not == null;
84 }
85 
86 @("nested.intervalStream")
87 @safe unittest {
88   import core.time : msecs;
89   import concurrency.stream : intervalStream, take;
90 
91   static auto interval() {
92     return intervalStream(1.msecs).take(90).collect(() shared {});
93   }
94   auto sender = justFrom(() => interval().syncWait()).via(ThreadSender());
95 
96   auto d = delay(10.msecs);
97 
98   whenAll(interval, sender, interval, sender, interval, sender, d, d, d).syncWait().assumeOk;
99 }