1 module concurrency.data.queue.mpsc;
2 
3 struct MPSCQueueProducer(Node) {
4   private MPSCQueue!(Node) q;
5   void push(Node* node) shared @trusted {
6     (cast()q).push(node);
7   }
8 }
9 
10 final class MPSCQueue(Node) {
11   alias ElementType = Node*;
12   private Node* head, tail;
13   private Node stub;
14 
15   this() @safe nothrow @nogc {
16     head = tail = &stub;
17     stub.next = null;
18   }
19 
20   shared(MPSCQueueProducer!Node) producer() @trusted nothrow @nogc {
21     return shared MPSCQueueProducer!Node(cast(shared)this);
22   }
23 
24   /// returns true if first to push
25   bool push(Node* n) @safe nothrow @nogc {
26     import core.atomic : atomicExchange;
27     n.next = null;
28     Node* prev = atomicExchange(&head, n);
29     prev.next = n;
30     return prev is &stub;
31   }
32 
33   bool empty() @safe nothrow @nogc {
34     import core.atomic : atomicLoad;
35     return head.atomicLoad is &stub;
36   }
37 
38   /// returns node or null if none
39   Node* pop() @safe nothrow @nogc {
40     import core.atomic : atomicLoad, MemoryOrder;
41     while(true) {
42       Node* end = this.tail;
43       Node* next = tail.next.atomicLoad!(MemoryOrder.raw);
44       if (end is &stub) { // still pointing to stub
45         if (null is next) // no new nodes
46           return null;
47         this.tail = next; // at least one node was added and stub.next points to the first one
48         end = next;
49         next = next.next.atomicLoad!(MemoryOrder.raw);
50       }
51       if (next) { // if there is at least another node
52         this.tail = next;
53         return end;
54       }
55       Node* start = this.head.atomicLoad!(MemoryOrder.acq);
56       if (end is start) {
57         push(&stub);
58         next = end.next.atomicLoad!(MemoryOrder.raw);
59         if (next) {
60           this.tail = next;
61           return end;
62         }
63       }
64     }
65   }
66 }