1 module concurrency.data.queue.mpsc; 2 3 struct MPSCQueueProducer(Node) { 4 private MPSCQueue!(Node) q; 5 void push(Node* node) shared @trusted { 6 (cast()q).push(node); 7 } 8 } 9 10 final class MPSCQueue(Node) { 11 import std.traits : hasUnsharedAliasing; 12 alias ElementType = Node*; 13 private Node* head, tail; 14 private Node stub; 15 16 static if (hasUnsharedAliasing!Node) { 17 // TODO: next version add deprecated("Node has unshared aliasing") 18 this() @safe nothrow @nogc { 19 head = tail = &stub; 20 stub.next = null; 21 } 22 } else { 23 this() @safe nothrow @nogc { 24 head = tail = &stub; 25 stub.next = null; 26 } 27 } 28 29 shared(MPSCQueueProducer!Node) producer() @trusted nothrow @nogc { 30 return shared MPSCQueueProducer!Node(cast(shared)this); 31 } 32 33 /// returns true if first to push 34 bool push(Node* n) @safe nothrow @nogc { 35 import core.atomic : atomicExchange; 36 n.next = null; 37 Node* prev = atomicExchange(&head, n); 38 prev.next = toShared(n); 39 return prev is &stub; 40 } 41 42 bool empty() @safe nothrow @nogc { 43 import core.atomic : atomicLoad; 44 return head.atomicLoad is &stub; 45 } 46 47 /// returns node or null if none 48 Node* pop() @safe nothrow @nogc { 49 import core.atomic : atomicLoad, MemoryOrder; 50 while(true) { 51 Node* end = this.tail; 52 shared Node* next = tail.next.atomicLoad!(MemoryOrder.raw).toShared(); 53 if (end is &stub) { // still pointing to stub 54 if (null is next) // no new nodes 55 return null; 56 this.tail = next.toUnshared(); // at least one node was added and stub.next points to the first one 57 end = next.toUnshared(); 58 next = next.next.atomicLoad!(MemoryOrder.raw); 59 } 60 if (next) { // if there is at least another node 61 this.tail = toUnshared(next); 62 return end; 63 } 64 Node* start = this.head.atomicLoad!(MemoryOrder.acq); 65 if (end is start) { 66 push(&stub); 67 next = end.next.atomicLoad!(MemoryOrder.raw).toShared(); 68 if (next) { 69 this.tail = toUnshared(next); 70 return end; 71 } 72 } 73 } 74 } 75 76 auto opSlice() @safe nothrow @nogc { 77 import core.atomic : atomicLoad, MemoryOrder; 78 auto current = atomicLoad(tail); 79 if (current is &stub) 80 current = current.next.atomicLoad!(MemoryOrder.raw).toUnshared(); 81 return Iterator!(Node)(current); 82 } 83 } 84 85 struct Iterator(Node) { 86 Node* head; 87 this(Node* head) @safe nothrow @nogc { 88 this.head = head; 89 } 90 bool empty() @safe nothrow @nogc { 91 return head is null; 92 } 93 Node* front() @safe nothrow @nogc { 94 return head; 95 } 96 void popFront() @safe nothrow @nogc { 97 import core.atomic : atomicLoad, MemoryOrder; 98 head = head.next.atomicLoad!(MemoryOrder.raw).toUnshared(); 99 } 100 Iterator!(Node) safe() { 101 return Iterator!(Node)(head); 102 } 103 } 104 105 106 private shared(Node*) toShared(Node)(Node* node) @trusted nothrow @nogc { 107 return cast(shared)node; 108 } 109 110 private auto toUnshared(Node)(Node* node) @trusted nothrow @nogc { 111 static if (is(Node : shared(T), T)) 112 return cast(T*)node; 113 else 114 return node; 115 }