1 module concurrency.data.queue.mpsc;
2 
3 struct MPSCQueueProducer(Node) {
4   private MPSCQueue!(Node) q;
5   void push(Node* node) shared @trusted {
6     (cast()q).push(node);
7   }
8 }
9 
10 final class MPSCQueue(Node) {
11   import std.traits : hasUnsharedAliasing;
12   alias ElementType = Node*;
13   private Node* head, tail;
14   private Node stub;
15 
16   static if (hasUnsharedAliasing!Node) {
17     // TODO: next version add deprecated("Node has unshared aliasing") 
18     this() @safe nothrow @nogc {
19       head = tail = &stub;
20       stub.next = null;
21     }
22   } else {
23     this() @safe nothrow @nogc {
24       head = tail = &stub;
25       stub.next = null;
26     }
27   }
28 
29   shared(MPSCQueueProducer!Node) producer() @trusted nothrow @nogc {
30     return shared MPSCQueueProducer!Node(cast(shared)this);
31   }
32 
33   /// returns true if first to push
34   bool push(Node* n) @safe nothrow @nogc {
35     import core.atomic : atomicExchange;
36     n.next = null;
37     Node* prev = atomicExchange(&head, n);
38     prev.next = toShared(n);
39     return prev is &stub;
40   }
41 
42   bool empty() @safe nothrow @nogc {
43     import core.atomic : atomicLoad;
44     return head.atomicLoad is &stub;
45   }
46 
47   /// returns node or null if none
48   Node* pop() @safe nothrow @nogc {
49     import core.atomic : atomicLoad, MemoryOrder;
50     while(true) {
51       Node* end = this.tail;
52       shared Node* next = tail.next.atomicLoad!(MemoryOrder.raw).toShared();
53       if (end is &stub) { // still pointing to stub
54         if (null is next) // no new nodes
55           return null;
56         this.tail = next.toUnshared(); // at least one node was added and stub.next points to the first one
57         end = next.toUnshared();
58         next = next.next.atomicLoad!(MemoryOrder.raw);
59       }
60       if (next) { // if there is at least another node
61         this.tail = toUnshared(next);
62         return end;
63       }
64       Node* start = this.head.atomicLoad!(MemoryOrder.acq);
65       if (end is start) {
66         push(&stub);
67         next = end.next.atomicLoad!(MemoryOrder.raw).toShared();
68         if (next) {
69           this.tail = toUnshared(next);
70           return end;
71         }
72       }
73     }
74   }
75 
76   auto opSlice() @safe nothrow @nogc {
77     import core.atomic : atomicLoad, MemoryOrder;
78     auto current = atomicLoad(tail);
79     if (current is &stub)
80       current = current.next.atomicLoad!(MemoryOrder.raw).toUnshared();
81     return Iterator!(Node)(current);
82   }
83 }
84 
85 struct Iterator(Node) {
86   Node* head;
87   this(Node* head) @safe nothrow @nogc {
88     this.head = head;
89   }
90   bool empty() @safe nothrow @nogc {
91     return head is null;
92   }
93   Node* front() @safe nothrow @nogc {
94     return head;
95   }
96   void popFront() @safe nothrow @nogc {
97     import core.atomic : atomicLoad, MemoryOrder;
98     head = head.next.atomicLoad!(MemoryOrder.raw).toUnshared();
99   }
100   Iterator!(Node) safe() {
101     return Iterator!(Node)(head);
102   }
103 }
104 
105 
106 private shared(Node*) toShared(Node)(Node* node) @trusted nothrow @nogc {
107   return cast(shared)node;
108 }
109 
110 private auto toUnshared(Node)(Node* node) @trusted nothrow @nogc {
111   static if (is(Node : shared(T), T))
112     return cast(T*)node;
113   else
114     return node;
115 }