I'm trying to code following requirements with lockfree in most-used-code-path that is to get a resource from pool.
Same resource should be used n (
maxUsageCount
) number of times and then destroyed and create a new one.If there is an error with a resource, it should be marked
stale
by client. Staled resource should be destroyed and not handed out to the clients, a new resource should be created.Resource must be closed after
maxUsageCount
otherwise its leak.
Resource is threadsafe.
public class ReusableResource<T extends ResourceProxy> {
public static interface Factory<T extends ResourceProxy> {
T create();
}
private final int maxUsageCount;
private final AtomicReference<ConcurrentLinkedQueue<T>> resources;
private final Object lock;
private final ReusableResource.Factory<T> factory;
public ReusableResource(int maxUsageCount,
ReusableResource.Factory<T> factory) {
if (maxUsageCount < 0)
throw new IllegalArgumentException("maxUsageCount " + maxUsageCount);
if (factory == null)
throw new IllegalArgumentException("factory " + factory);
this.maxUsageCount = maxUsageCount;
resources = new AtomicReference<ConcurrentLinkedQueue<T>>(
new ConcurrentLinkedQueue<T>());
lock = new Object();
this.factory = factory;
}
public T get() {
final ConcurrentLinkedQueue<T> tmpq = resources.get(); // load once
T res = tmpq.poll();
if (res == null) {
synchronized (lock) {
res = tmpq.poll(); // double check
// empty queue, first time or none left
if (res == null) {
final ConcurrentLinkedQueue<T> newq = new ConcurrentLinkedQueue<T>(
Collections.nCopies(maxUsageCount, factory.create()));
// checkout the one we are returning
res = newq.poll();
// overwrite q object, do not modify existing
this.resources.set(newq);
// no stale check on newly created instance
return res; // <<- exit,
}
}
}
if (res.isStaled()) {
synchronized (lock) {
// once staled its going to remain staled
// no need for double check
// call destroy on all remaining elements
// so that it can actually be closed
// this might cause more destroy then checkouts
// and so destroy must be idempotent
res.destroy();
while ((res = tmpq.poll()) != null)
res.destroy();
// create new resources queue, same as first time
final ConcurrentLinkedQueue<T> newq = new ConcurrentLinkedQueue<T>(
Collections.nCopies(maxUsageCount, factory.create()));
// checkout the one we are returning
res = newq.poll();
// overwrite q object, do not modify existing
this.resources.set(newq);
}
}
return res;
}
}
ResourceProxy.java
public abstract class ResourceProxy implements Disposable {
private final AtomicInteger usageCount;
private volatile boolean staled;
public ResourceProxy(int maxUsageCount) {
if (maxUsageCount < 0) throw new IllegalArgumentException();
this.usageCount = new AtomicInteger(maxUsageCount);
this.staled = false;
}
@Override
public void destroy() {
if (usageCount.decrementAndGet() == 0) {
closeResource();
}
}
@Override
public void setStaled() {
this.staled = true;
}
@Override
public boolean isStaled() {
return staled;
}
abstract protected void closeResource();
}
Example usage:
public class QueueConnectionProxy extends ResourceProxy implements
QueueConnection {
public static class Factory implements
ReusableResource.Factory<QueueConnectionProxy> {
private final int maxUsageCount;
public Factory(int maxUsageCount) {
if (maxUsageCount < 0) throw new IllegalArgumentException();
this.maxUsageCount = maxUsageCount;
}
@Override
public QueueConnectionProxy create() {
return new QueueConnectionProxy(maxUsageCount);
}
}
private final QueueConnection delegate;
public QueueConnectionProxy(int maxUsageCount) {
super(maxUsageCount);
delegate = createDelegate();
}
private QueueConnection createDelegate() {
// create QueueConnection
return createdQueueConnection
}
@Override
protected void closeResource() {
try {
if (delegate != null) delegate.close();
} catch (JMSException e) {
e.printStackTrace();
// TODO: log JMSException
}
}
@Override
public void close() throws JMSException {
destroy();
}
// other delegate methods
//
}
To create an instance of ReusableResource
:
ReusableResource<QueueConnectionProxy> reuse =
new ReusableResource<QueueConnectionProxy>(
10, new QueueConnectionProxy.Factory(10));
Client code:
QueueConnection qc = null;
try {
qc = reuse.get();
} finally {
try { qc.close(); }
catch (JMSException e) { e.printStackTrace(); }
}