I am trying to implement a shutdown method in queue implementation. I took the code from BlockingQueue source from Java and trying to check the shutdown method. Will the following code be thread safe and an acceptable implementation?
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<E> {
/* Node encapsulating the object that needs to be stored */
static class Node<E>{
/* Make sure all the threads read / write are done on updated variable */
volatile E element;
Node<E> next;
Node(E value) { element = value;}
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger shutdown = new AtomicInteger(0);
/* Need lock to make the thread re-entrant */
private final ReentrantLock putLock = new ReentrantLock(true);
private final ReentrantLock getLock = new ReentrantLock();
private final Condition queueNotEmpty = getLock.newCondition();
private final Condition queueNotFull = putLock.newCondition();
/* Make sure updates to head and tail variables are atomic
* May be redundant as I have already enclosed the updated to head and tail
* using locks.
*/
private AtomicReference<Node<E>> head, tail;
public BlockingQueue()
{
this.capacity = Integer.MAX_VALUE;
}
public BlockingQueue(int capacity){
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
head = tail = new AtomicReference<BlockingQueue.Node<E>>(null);
}
public void add(E e) throws InterruptedException, Exception{
if (e == null) throw new NullPointerException();
/* We need to wait if there is not enough space on the queue
*
*/
final ReentrantLock lock = this.putLock;
final AtomicInteger count = this.count;
int c =-1;
lock.lockInterruptibly();
try {
try {
while(count.get() == capacity)
queueNotFull.await();
if(shutdown.get() == 1)
throw new Exception();
} catch (InterruptedException exception) {
queueNotFull.signal();
throw exception;
}
// Add the element to the queue
Node<E> elem = new Node<E>(e);
Node<E> prevTail = tail.getAndSet(elem);
prevTail.next = elem;
c = count.incrementAndGet();
if (c + 1 < capacity)
queueNotFull.signal();
} finally {
lock.unlock();
}
if(c == 0)
signalNotEmpty();
}
/**
* Signals a waiting get. Called only from put/offer (which do not
* otherwise ordinarily lock getLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.getLock;
takeLock.lock();
try {
queueNotEmpty.signal();
} finally {
takeLock.unlock();
}
}
public void shutDown()
{
/* Get the locks */
putLock.lock();
getLock.lock();
try{
head.getAndSet(null);
tail.getAndSet(null);
shutdown.getAndSet(1);
//Signal all put and get threads
queueNotFull.signalAll();
queueNotEmpty.signalAll();
}finally {
putLock.unlock();
getLock.unlock();
}
}
}
Is there a better way to implement this method?