1 module concurrency.data.queue.waitable;
2 
3 final class WaitableQueue(Q) {
4   import core.sync.semaphore : Semaphore;
5   import core.time : Duration;
6   import mir.algebraic : Nullable;
7 
8   private Q q;
9   private Semaphore sema;
10 
11   this() @trusted {
12     q = new Q();
13     sema = new Semaphore();
14   }
15 
16   bool push(Q.ElementType t) @trusted {
17     bool r = q.push(t);
18     if (r)
19       sema.notify();
20     return r;
21   }
22 
23   Q.ElementType pop() @trusted {
24     auto r = q.pop();
25     if (r !is null)
26       return r;
27 
28     sema.wait();
29     return q.pop();
30   }
31 
32   bool empty() @safe @nogc nothrow {
33     return q.empty();
34   }
35 
36   Q.ElementType pop(Duration max) @trusted {
37     auto r = q.pop();
38     if (r !is null)
39       return r;
40 
41     if (!sema.wait(max))
42       return null;
43 
44     return q.pop();
45   }
46 
47   static if (__traits(compiles, q.producer)) {
48     shared(WaitableQueueProducer!Q) producer() @trusted nothrow @nogc {
49       return shared WaitableQueueProducer!Q(cast(shared)q, cast(shared)sema);
50     }
51   }
52 }
53 
54 struct WaitableQueueProducer(Q) {
55   import core.sync.semaphore : Semaphore;
56 
57   private shared Q q;
58   private shared Semaphore sema;
59 
60   bool push(Q.ElementType t) shared @trusted {
61     bool r = (cast()q).push(t);
62     if (r)
63       (cast()sema).notify();
64     return r;
65   }
66 }