1 module concurrency.data.queue.mpsc;
2
3 struct MPSCQueueProducer(Node) {
4 private MPSCQueue!(Node) q;
5 void push(Node* node) shared @trusted {
6 (cast()q).push(node);
7 }
8 }
9
10 final class MPSCQueue(Node) {
11 alias ElementType = Node*;
12 private Node* head, tail;
13 private Node stub;
14
15 this() @safe nothrow @nogc {
16 head = tail = &stub;
17 stub.next = null;
18 }
19
20 shared(MPSCQueueProducer!Node) producer() @trusted nothrow @nogc {
21 return shared MPSCQueueProducer!Node(cast(shared)this);
22 }
23
24 /// returns true if first to push
25 bool push(Node* n) @safe nothrow @nogc {
26 import core.atomic : atomicExchange;
27 n.next = null;
28 Node* prev = atomicExchange(&head, n);
29 prev.next = n;
30 return prev is &stub;
31 }
32
33 bool empty() @safe nothrow @nogc {
34 import core.atomic : atomicLoad;
35 return head.atomicLoad is &stub;
36 }
37
38 /// returns node or null if none
39 Node* pop() @safe nothrow @nogc {
40 import core.atomic : atomicLoad, MemoryOrder;
41 while(true) {
42 Node* end = this.tail;
43 Node* next = tail.next.atomicLoad!(MemoryOrder.raw);
44 if (end is &stub) { // still pointing to stub
45 if (null is next) // no new nodes
46 return null;
47 this.tail = next; // at least one node was added and stub.next points to the first one
48 end = next;
49 next = next.next.atomicLoad!(MemoryOrder.raw);
50 }
51 if (next) { // if there is at least another node
52 this.tail = next;
53 return end;
54 }
55 Node* start = this.head.atomicLoad!(MemoryOrder.acq);
56 if (end is start) {
57 push(&stub);
58 next = end.next.atomicLoad!(MemoryOrder.raw);
59 if (next) {
60 this.tail = next;
61 return end;
62 }
63 }
64 }
65 }
66 }