1 module concurrency.data.queue.waitable; 2 3 final class WaitableQueue(Q) { 4 import core.sync.semaphore : Semaphore; 5 import core.time : Duration; 6 import mir.algebraic : Nullable; 7 8 private Q q; 9 private Semaphore sema; 10 11 this() @trusted { 12 q = new Q(); 13 sema = new Semaphore(); 14 } 15 16 bool push(Q.ElementType t) @trusted { 17 bool r = q.push(t); 18 if (r) 19 sema.notify(); 20 return r; 21 } 22 23 Q.ElementType pop() @trusted { 24 auto r = q.pop(); 25 if (r !is null) 26 return r; 27 28 sema.wait(); 29 return q.pop(); 30 } 31 32 bool empty() @safe @nogc nothrow { 33 return q.empty(); 34 } 35 36 Q.ElementType pop(Duration max) @trusted { 37 auto r = q.pop(); 38 if (r !is null) 39 return r; 40 41 if (!sema.wait(max)) 42 return null; 43 44 return q.pop(); 45 } 46 47 static if (__traits(compiles, q.producer)) { 48 shared(WaitableQueueProducer!Q) producer() @trusted nothrow @nogc { 49 return shared WaitableQueueProducer!Q(cast(shared)q, cast(shared)sema); 50 } 51 } 52 } 53 54 struct WaitableQueueProducer(Q) { 55 import core.sync.semaphore : Semaphore; 56 57 private shared Q q; 58 private shared Semaphore sema; 59 60 bool push(Q.ElementType t) shared @trusted { 61 bool r = (cast()q).push(t); 62 if (r) 63 (cast()sema).notify(); 64 return r; 65 } 66 }