1 module ut.concurrency.waitable; 2 3 import unit_threaded; 4 import concurrency.data.queue.mpsc; 5 import concurrency.data.queue.waitable; 6 import concurrency : syncWait; 7 8 struct Node { 9 int payload; 10 shared Node* next; 11 } 12 13 auto intProducer(Q)(Q q, int num) { 14 import concurrency.sender : just; 15 import concurrency.thread; 16 import concurrency.operations; 17 18 auto producer = q.producer(); 19 return just(producer, num).then((shared WaitableQueueProducer!(MPSCQueue!Node) producer, int num) shared { 20 foreach(i; 0..num) 21 producer.push(new Node(i+1)); 22 }).via(ThreadSender()); 23 } 24 25 auto intSummer(Q)(Q q) { 26 import concurrency.operations : withStopToken, via; 27 import concurrency.thread; 28 import concurrency.sender : justFrom, just; 29 import concurrency.stoptoken : StopToken; 30 import core.time : msecs; 31 32 return just(q).withStopToken((StopToken stopToken, Q q) shared @safe { 33 int sum = 0; 34 while (!stopToken.isStopRequested()) { 35 if (auto node = q.pop(100.msecs)) { 36 sum += node.payload; 37 } 38 } 39 while (true) { 40 if (auto node = q.pop(100.msecs)) 41 sum += node.payload; 42 else 43 break; 44 } 45 return sum; 46 }).via(ThreadSender()); 47 } 48 49 @("single") 50 @safe unittest { 51 import concurrency.operations : race, stopWhen; 52 import core.time : msecs; 53 54 auto q = new WaitableQueue!(MPSCQueue!Node)(); 55 q.intSummer.stopWhen(intProducer(q, 50000)).syncWait.value.should == 1250025000; 56 q.empty.should == true; 57 } 58 59 @("race") 60 @safe unittest { 61 import concurrency.operations : race, stopWhen, whenAll; 62 63 auto q = new WaitableQueue!(MPSCQueue!Node)(); 64 q.intSummer.stopWhen(whenAll(intProducer(q, 10000), 65 intProducer(q, 10000), 66 intProducer(q, 10000), 67 intProducer(q, 10000), 68 )).syncWait.value.should == 200020000; 69 q.empty.should == true; 70 }