1 module concurrency.data.queue.mpsc; 2 3 struct MPSCQueueProducer(Node) { 4 private MPSCQueue!(Node) q; 5 void push(Node* node) shared @trusted { 6 (cast()q).push(node); 7 } 8 } 9 10 final class MPSCQueue(Node) { 11 alias ElementType = Node*; 12 private Node* head, tail; 13 private Node stub; 14 15 this() @safe nothrow @nogc { 16 head = tail = &stub; 17 stub.next = null; 18 } 19 20 shared(MPSCQueueProducer!Node) producer() @trusted nothrow @nogc { 21 return shared MPSCQueueProducer!Node(cast(shared)this); 22 } 23 24 /// returns true if first to push 25 bool push(Node* n) @safe nothrow @nogc { 26 import core.atomic : atomicExchange; 27 n.next = null; 28 Node* prev = atomicExchange(&head, n); 29 prev.next = n; 30 return prev is &stub; 31 } 32 33 bool empty() @safe nothrow @nogc { 34 import core.atomic : atomicLoad; 35 return head.atomicLoad is &stub; 36 } 37 38 /// returns node or null if none 39 Node* pop() @safe nothrow @nogc { 40 import core.atomic : atomicLoad, MemoryOrder; 41 while(true) { 42 Node* end = this.tail; 43 Node* next = tail.next.atomicLoad!(MemoryOrder.raw); 44 if (end is &stub) { // still pointing to stub 45 if (null is next) // no new nodes 46 return null; 47 this.tail = next; // at least one node was added and stub.next points to the first one 48 end = next; 49 next = next.next.atomicLoad!(MemoryOrder.raw); 50 } 51 if (next) { // if there is at least another node 52 this.tail = next; 53 return end; 54 } 55 Node* start = this.head.atomicLoad!(MemoryOrder.acq); 56 if (end is start) { 57 push(&stub); 58 next = end.next.atomicLoad!(MemoryOrder.raw); 59 if (next) { 60 this.tail = next; 61 return end; 62 } 63 } 64 } 65 } 66 }