1 module lock_free.rwqueue; 2 3 import core.atomic; 4 5 /** 6 * A lock-free single-reader, single-writer FIFO queue. 7 */ 8 shared struct RWQueue(T, size_t capacity = roundPow2!(PAGE_SIZE / T.sizeof)) 9 { 10 static assert(capacity > 0, "Cannot have a capacity of 0."); 11 static assert(roundPow2!capacity == capacity, "The capacity must be a power of 2"); 12 13 @property size_t length() shared const 14 { 15 return atomicLoad!(MemoryOrder.acq)(_wpos) - atomicLoad!(MemoryOrder.acq)(_rpos); 16 } 17 18 @property bool empty() shared const 19 { 20 return !length; 21 } 22 23 @property bool full() const 24 { 25 return length == capacity; 26 } 27 28 void push(shared(T) t) 29 in { assert(!full); } 30 body 31 { 32 immutable pos = atomicLoad!(MemoryOrder.acq)(_wpos); 33 _data[pos & mask] = t; 34 atomicStore!(MemoryOrder.rel)(_wpos, pos + 1); 35 } 36 37 shared(T) pop() 38 in { assert(!empty); } 39 body 40 { 41 immutable pos = atomicLoad!(MemoryOrder.acq)(_rpos); 42 auto res = _data[pos & mask]; 43 atomicStore!(MemoryOrder.rel)(_rpos, pos + 1); 44 return res; 45 } 46 47 private: 48 // import std.algorithm; // move 49 50 enum mask = capacity - 1; 51 52 size_t _wpos; 53 size_t _rpos; 54 T[capacity] _data; 55 } 56 57 private: 58 59 enum PAGE_SIZE = 4096; 60 61 template roundPow2(size_t v) 62 { 63 import core.bitop : bsr; 64 enum roundPow2 = v ? cast(size_t)1 << bsr(v) : 0; 65 } 66 67 static assert(roundPow2!0 == 0); 68 static assert(roundPow2!3 == 2); 69 static assert(roundPow2!4 == 4); 70 71 version (unittest) 72 { 73 import core.thread, std.concurrency; 74 enum amount = 10_000; 75 76 void push(T)(ref shared(RWQueue!T) queue) 77 { 78 foreach (i; 0 .. amount) 79 { 80 while (queue.full) 81 Thread.yield(); 82 queue.push(cast(shared T)i); 83 } 84 } 85 86 void pop(T)(ref shared(RWQueue!T) queue) 87 { 88 foreach (i; 0 .. amount) 89 { 90 while (queue.empty) 91 Thread.yield(); 92 assert(queue.pop() == cast(shared T)i); 93 } 94 } 95 } 96 97 unittest 98 { 99 shared(RWQueue!size_t) queue; 100 auto t0 = new Thread({push(queue);}), 101 t1 = new Thread({pop(queue);}); 102 t0.start(); t1.start(); 103 t0.join(); t1.join(); 104 } 105 106 unittest 107 { 108 static struct Data { size_t i; } 109 shared(RWQueue!Data) queue; 110 auto t0 = new Thread({push(queue);}), 111 t1 = new Thread({pop(queue);}); 112 t0.start(); t1.start(); 113 t0.join(); t1.join(); 114 }