1 module concurrency.data.queue.waitable; 2 3 final class WaitableQueue(Q) { 4 import core.sync.semaphore : Semaphore; 5 import core.time : Duration; 6 import mir.algebraic : Nullable; 7 8 private Q q; 9 private Semaphore sema; 10 11 this() @trusted { 12 q = new Q(); 13 sema = new Semaphore(); 14 } 15 16 bool push(Q.ElementType t) @trusted { 17 bool r = q.push(t); 18 if (r) 19 sema.notify(); 20 return r; 21 } 22 23 Q.ElementType pop() @trusted { 24 auto r = q.pop(); 25 if (r !is null) 26 return r; 27 28 sema.wait(); 29 return q.pop(); 30 } 31 32 bool empty() @safe @nogc nothrow { 33 return q.empty(); 34 } 35 36 Q.ElementType pop(Duration max) @trusted { 37 auto r = q.pop(); 38 if (r !is null) 39 return r; 40 41 if (!sema.wait(max)) 42 return null; 43 44 return q.pop(); 45 } 46 47 auto opSlice() @safe nothrow @nogc { 48 return q.opSlice(); 49 } 50 51 static if (__traits(compiles, q.producer)) { 52 shared(WaitableQueueProducer!Q) producer() @trusted nothrow @nogc { 53 return shared WaitableQueueProducer!Q(cast(shared)q, cast(shared)sema); 54 } 55 } 56 } 57 58 struct WaitableQueueProducer(Q) { 59 import core.sync.semaphore : Semaphore; 60 61 private shared Q q; 62 private shared Semaphore sema; 63 64 bool push(Q.ElementType t) shared @trusted { 65 bool r = (cast()q).push(t); 66 if (r) 67 (cast()sema).notify(); 68 return r; 69 } 70 }