1 module ut.concurrency.mpsc; 2 3 import unit_threaded; 4 import concurrency.data.queue.mpsc; 5 import concurrency : syncWait; 6 7 struct Node { 8 int payload; 9 Node* next; 10 } 11 12 auto intProducer(Q)(Q q, int num) { 13 import concurrency.sender : just; 14 import concurrency.thread; 15 import concurrency.operations; 16 17 auto producer = q.producer(); 18 return just(producer, num).then((shared MPSCQueueProducer!Node producer, int num) shared { 19 foreach(i; 0..num) 20 producer.push(new Node(i+1)); 21 }).via(ThreadSender()); 22 } 23 24 auto intSummer(Q)(Q q) { 25 import concurrency.operations : withStopToken, via; 26 import concurrency.thread; 27 import concurrency.sender : justFrom, just; 28 import concurrency.stoptoken : StopToken; 29 import core.time : msecs; 30 31 return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { 32 int sum = 0; 33 while (!stopToken.isStopRequested()) { 34 if (auto node = q.pop()) { 35 sum += node.payload; 36 } 37 } 38 while (true) { 39 if (auto node = q.pop()) 40 sum += node.payload; 41 else 42 break; 43 } 44 return sum; 45 }).via(ThreadSender()); 46 } 47 48 @("single") 49 @safe unittest { 50 import concurrency.operations : race, stopWhen; 51 import core.time : msecs; 52 53 auto q = new MPSCQueue!Node(); 54 q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should == 1250025000; 55 q.empty.should == true; 56 } 57 58 @("race") 59 @safe unittest { 60 import concurrency.operations : race, stopWhen, whenAll; 61 62 auto q = new MPSCQueue!Node(); 63 q.intSummer.stopWhen(whenAll(intProducer(q, 10000), 64 intProducer(q, 10000), 65 intProducer(q, 10000), 66 intProducer(q, 10000), 67 )).syncWait.value.should == 200020000; 68 q.empty.should == true; 69 }