1 module lock_free.rwqueue;
2 
3 import core.atomic;
4 
5 /**
6  * A lock-free single-reader, single-writer FIFO queue.
7  */
8 shared struct RWQueue(T, size_t capacity = roundPow2!(PAGE_SIZE / T.sizeof))
9 {
10     static assert(capacity > 0, "Cannot have a capacity of 0.");
11     static assert(roundPow2!capacity == capacity, "The capacity must be a power of 2");
12 
13     @property size_t length() shared const
14     {
15         return atomicLoad!(MemoryOrder.acq)(_wpos) - atomicLoad!(MemoryOrder.acq)(_rpos);
16     }
17 
18     @property bool empty() shared const
19     {
20         return !length;
21     }
22 
23     @property bool full() const
24     {
25         return length == capacity;
26     }
27 
28     void push(shared(T) t)
29     in { assert(!full); }
30     body
31     {
32         immutable pos = atomicLoad!(MemoryOrder.acq)(_wpos);
33         _data[pos & mask] = t;
34         atomicStore!(MemoryOrder.rel)(_wpos, pos + 1);
35     }
36 
37     shared(T) pop()
38     in { assert(!empty); }
39     body
40     {
41         immutable pos = atomicLoad!(MemoryOrder.acq)(_rpos);
42         auto res = _data[pos & mask];
43         atomicStore!(MemoryOrder.rel)(_rpos, pos + 1);
44         return res;
45     }
46 
47 private:
48     //    import std.algorithm; // move
49 
50     enum mask = capacity - 1;
51 
52     size_t _wpos;
53     size_t _rpos;
54     T[capacity] _data;
55 }
56 
57 private:
58 
59 enum PAGE_SIZE = 4096;
60 
61 template roundPow2(size_t v)
62 {
63     import core.bitop : bsr;
64     enum roundPow2 = v ? cast(size_t)1 << bsr(v) : 0;
65 }
66 
67 static assert(roundPow2!0 == 0);
68 static assert(roundPow2!3 == 2);
69 static assert(roundPow2!4 == 4);
70 
71 version (unittest)
72 {
73     import core.thread, std.concurrency;
74     enum amount = 10_000;
75 
76     void push(T)(ref shared(RWQueue!T) queue)
77     {
78         foreach (i; 0 .. amount)
79         {
80             while (queue.full)
81                 Thread.yield();
82             queue.push(cast(shared T)i);
83         }
84     }
85 
86     void pop(T)(ref shared(RWQueue!T) queue)
87     {
88         foreach (i; 0 .. amount)
89         {
90             while (queue.empty)
91                 Thread.yield();
92             assert(queue.pop() == cast(shared T)i);
93         }
94     }
95 }
96 
97 unittest
98 {
99     shared(RWQueue!size_t) queue;
100     auto t0 = new Thread({push(queue);}),
101         t1 = new Thread({pop(queue);});
102     t0.start(); t1.start();
103     t0.join(); t1.join();
104 }
105 
106 unittest
107 {
108     static struct Data { size_t i; }
109     shared(RWQueue!Data) queue;
110     auto t0 = new Thread({push(queue);}),
111         t1 = new Thread({pop(queue);});
112     t0.start(); t1.start();
113     t0.join(); t1.join();
114 }