package queue;
import java.util.concurrent.atomic.AtomicReference;
Lock-free queue.
@param T
@author
public class DCASQueue<T> {
private AtomicReference<Node> head;
private AtomicReference<Node> tail;
public DCASQueue() {
Node sentinel = new Node(null);
this.head = new AtomicReference<Node>(sentinel);
this.tail = new AtomicReference<Node>(sentinel);
}
Append item to end of queue.
@param item
public void enq(T item) {
if (item == null) throw new NullPointerException();
Node node = new Node(item);
while (true) {
Node last = tail.get();
Node next = last.next.get();
if (last == tail.get()) {
if (next == null) {
AtomicReference[] target = {last.next, tail};
Object[] expected = {next, last};
Object[] update = {node, node};
if (multiCompareAndSet(
(AtomicReference<T>[]) target,
(T[]) expected, (T[]) update)) {
return;
}
}
}
}
}
Remove and return head of queue.
@return
@throws queue.EmptyException
public T deq() throws EmptyException {
while (true) {
Node first = head.get();
Node last = tail.get();
Node next = first.next.get();
if (first == head.get()) {
if (first == last) {
if (next == null) {
throw new EmptyException();
}
tail.compareAndSet(last, next);
} else {
T value = next.value;
if (head.compareAndSet(first, next))
return value;
}
}
}
}
public class Node {
public T value;
public AtomicReference<Node> next;
public Node(T value) {
this.value = value;
this.next = new AtomicReference<Node>(null);
}
}
private static synchronized
<T> boolean multiCompareAndSet(AtomicReference<T>[] target,
T[] expect, T[] update) {
for (int i = 0; i < target.length; i++) {
if (!target[i].compareAndSet(expect[i], update[i])) return false;
}
return true;
}
}