1 module ut.concurrency.mpsc;
2 
3 import unit_threaded;
4 import concurrency.data.queue.mpsc;
5 import concurrency : syncWait;
6 
7 struct Node {
8   int payload;
9   shared Node* next;
10 }
11 
12 auto intProducer(Q)(Q q, int num) {
13   import concurrency.sender : just;
14   import concurrency.thread;
15   import concurrency.operations;
16 
17   auto producer = q.producer();
18   return just(producer, num).then((shared MPSCQueueProducer!Node producer, int num) shared {
19       foreach(i; 0..num)
20         producer.push(new Node(i+1));
21     }).via(ThreadSender());
22 }
23 
24 auto intSummer(Q)(Q q) {
25   import concurrency.operations : withStopToken, via;
26   import concurrency.thread;
27   import concurrency.sender : justFrom, just;
28   import concurrency.stoptoken : StopToken;
29   import core.time : msecs;
30 
31   return just(q).withStopToken((StopToken stopToken, Q q) shared @safe {
32       int sum = 0;
33       while (!stopToken.isStopRequested()) {
34         if (auto node = q.pop()) {
35           sum += node.payload;
36         }
37       }
38       while (true) {
39         if (auto node = q.pop())
40           sum += node.payload;
41         else
42           break;
43       }
44       return sum;
45     }).via(ThreadSender());
46 }
47 
48 @("single")
49 @safe unittest {
50   import concurrency.operations : race, stopWhen;
51   import core.time : msecs;
52 
53   auto q = new MPSCQueue!Node();
54   q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should == 1250025000;
55   q.empty.should == true;
56 }
57 
58 @("race")
59 @safe unittest {
60   import concurrency.operations : race, stopWhen, whenAll;
61 
62   auto q = new MPSCQueue!Node();
63   q.intSummer.stopWhen(whenAll(intProducer(q, 10000),
64                                intProducer(q, 10000),
65                                intProducer(q, 10000),
66                                intProducer(q, 10000),
67                                )).syncWait.value.should == 200020000;
68   q.empty.should == true;
69 }