1 module ut.concurrency.waitable;
2 
3 import unit_threaded;
4 import concurrency.data.queue.mpsc;
5 import concurrency.data.queue.waitable;
6 import concurrency : syncWait;
7 
8 struct Node {
9   int payload;
10   shared Node* next;
11 }
12 
13 auto intProducer(Q)(Q q, int num) {
14   import concurrency.sender : just;
15   import concurrency.thread;
16   import concurrency.operations;
17 
18   auto producer = q.producer();
19   return just(producer, num).then((shared WaitableQueueProducer!(MPSCQueue!Node) producer, int num) shared {
20       foreach(i; 0..num)
21         producer.push(new Node(i+1));
22     }).via(ThreadSender());
23 }
24 
25 auto intSummer(Q)(Q q) {
26   import concurrency.operations : withStopToken, via;
27   import concurrency.thread;
28   import concurrency.sender : justFrom, just;
29   import concurrency.stoptoken : StopToken;
30   import core.time : msecs;
31 
32   return just(q).withStopToken((StopToken stopToken, Q q) shared @safe {
33       int sum = 0;
34       while (!stopToken.isStopRequested()) {
35         if (auto node = q.pop(100.msecs)) {
36           sum += node.payload;
37         }
38       }
39       while (true) {
40         if (auto node = q.pop(100.msecs))
41           sum += node.payload;
42         else
43           break;
44       }
45       return sum;
46     }).via(ThreadSender());
47 }
48 
49 @("single")
50 @safe unittest {
51   import concurrency.operations : race, stopWhen;
52   import core.time : msecs;
53 
54   auto q = new WaitableQueue!(MPSCQueue!Node)();
55   q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should == 1250025000;
56   q.empty.should == true;
57 }
58 
59 @("race")
60 @safe unittest {
61   import concurrency.operations : race, stopWhen, whenAll;
62 
63   auto q = new WaitableQueue!(MPSCQueue!Node)();
64   q.intSummer.stopWhen(whenAll(intProducer(q, 10000),
65                                intProducer(q, 10000),
66                                intProducer(q, 10000),
67                                intProducer(q, 10000),
68                                )).syncWait.value.should == 200020000;
69   q.empty.should == true;
70 }