Main ObjectPool Class: It uses atomic operations and a concurrent linked queue to achieve the best performance compared to Apache commons pool. I want to make sure there are no concurrency issues even though unit tests passed without any issue.
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ObjectPool<T> {
private static final boolean LOGS_ENABLED = false;
private AtomicInteger totalObjectsCount = new AtomicInteger();
private AtomicInteger pooledObjectsCount = new AtomicInteger();
private int initialPoolSize;
private int maximumPoolSize;
private Class<T> type;
private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<T>();
private Object waitSync = new Object();
public ObjectPool(Class<T> type) {
this(type, 0);
}
public ObjectPool(Class<T> type, int initialPoolSize) {
this(type, initialPoolSize, -1);
}
public ObjectPool(Class<T> type, int initialPoolSize, int maximumPoolSize) {
if(initialPoolSize < 0) {
throw new IllegalArgumentException("initialPoolSize");
}
if(maximumPoolSize < -1 || (maximumPoolSize != -1 && initialPoolSize > maximumPoolSize)) {
throw new IllegalArgumentException("maximumPoolSize");
}
if(type != null) {
try {
type.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("type", e);
}
} else {
if(getClass() == ObjectPool.class) {
throw new IllegalArgumentException("type");
}
}
this.initialPoolSize = initialPoolSize;
this.maximumPoolSize = maximumPoolSize;
this.type = type;
if(type != null) {
initialize();
}
}
public int getInitialPoolSize() {
return initialPoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public int getPooledObjectsCount() {
return pooledObjectsCount.get();
}
public int getTotalObjectsCount() {
return totalObjectsCount.get();
}
protected void initialize() {
List<T> itemList = new LinkedList<T>();
for (int i = 0; i < initialPoolSize; i++) {
try {
itemList.add(take());
} catch (InterruptedException e) {
}
}
for (T item:
itemList) {
put(item);
}
}
protected T create() {
if(type != null) {
try {
return type.newInstance();
} catch (InstantiationException e) {
throw new UnsupportedOperationException(e);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException(e);
}
} else {
throw new UnsupportedOperationException();
}
}
public T take() throws InterruptedException {
T object;
while ((object = queue.poll()) == null) {
if(maximumPoolSize <= 0 || this.totalObjectsCount.get() < maximumPoolSize) {
boolean success = false;
try {
int totalObjectsCount = this.totalObjectsCount.incrementAndGet();
if (maximumPoolSize <= 0 || totalObjectsCount <= maximumPoolSize) {
object = create();
if (LOGS_ENABLED) {
Log.info("%s Object Created :- Created : %d, Maximum : %d", getClass().getSimpleName(), this.totalObjectsCount.get(), maximumPoolSize);
}
success = true;
return object;
}
} finally {
if(!success) {
this.totalObjectsCount.decrementAndGet();
// Notify other sleeping takers, so that one of them can continue creating new object.
synchronized (waitSync) {
waitSync.notify();
}
}
}
}
synchronized (waitSync) {
if (this.pooledObjectsCount.get() <= 0) {
if (LOGS_ENABLED) {
Log.info("%s Waiting for Object :- Created : %d, Maximum : %d", getClass().getSimpleName(), this.totalObjectsCount.get(), maximumPoolSize);
}
waitSync.wait();
}
}
}
this.pooledObjectsCount.decrementAndGet();
return object;
}
public void put(T e) {
queue.add(e);
// Notify consumer when first object arrives
if(this.pooledObjectsCount.incrementAndGet() <= 1) {
synchronized (waitSync) {
waitSync.notifyAll();
}
}
}
@Override
public String toString() {
return String.format("PooledObjectsCount : %d, TotalObjectsCount : %d, MaxPoolSize : %d", getPooledObjectsCount(), getTotalObjectsCount(), getMaximumPoolSize());
}
}
Unit Test Class:
public class ObjectPoolTest {
@Test
public void testConstructors() throws Exception {
ObjectPool<Object> objectPool = null;
try {
objectPool = null;
objectPool = new ObjectPool<Object>(null, 10, 20);
} catch (IllegalArgumentException e) {
}
Assert.assertNull(objectPool);
try {
objectPool = null;
objectPool = new ObjectPool<Object>(Object.class, -10, 5);
} catch (IllegalArgumentException e) {
}
Assert.assertNull(objectPool);
try {
objectPool = null;
objectPool = new ObjectPool<Object>(Object.class, 10, -20);
} catch (IllegalArgumentException e) {
}
Assert.assertNull(objectPool);
try {
objectPool = null;
objectPool = new ObjectPool<Object>(Object.class, 10, 20);
Assert.assertTrue(objectPool.getInitialPoolSize() == 10);
Assert.assertTrue(objectPool.getMaximumPoolSize() == 20);
Assert.assertTrue(objectPool.getPooledObjectsCount() == 10);
} catch (IllegalArgumentException e) {
}
Assert.assertNotNull(objectPool);
}
@Test
public void testUnboundedPool() {
// Unbounded Pool test is risky on build server. Just test it on dev machine.
testSingleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class));
testSingleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class));
testMultipleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class));
testMultipleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class));
}
@Test
public void testBoundedPool() {
testSingleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
testSingleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
testMultipleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
testMultipleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
}
private void testSingleProducerAndSingleConsumer(ObjectPool<Object> objectPool) {
testUsingMultipleThreads(objectPool, false, false);
}
private void testSingleProducerAndMultipleConsumer(ObjectPool<Object> objectPool) {
testUsingMultipleThreads(objectPool, false, true);
}
private void testMultipleProducerAndSingleConsumer(ObjectPool<Object> objectPool) {
testUsingMultipleThreads(objectPool, true, false);
}
private void testMultipleProducerAndMultipleConsumer(ObjectPool<Object> objectPool) {
testUsingMultipleThreads(objectPool, true, true);
}
private void testUsingMultipleThreads(ObjectPool<Object> objectPool, boolean multipleProducer, boolean multipleConsumer) {
List<Thread> threads = new ArrayList<>();
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
long startTime = System.currentTimeMillis();;
int testDuration = 10 * 1000;
int threadCount = 50;
for (int i = 0; i < threadCount; i++) {
// Create consumer threads.
if(i == 0 || multipleConsumer) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
try {
while (System.currentTimeMillis() - startTime <= testDuration) {
queue.add(objectPool.take());
}
} catch (InterruptedException e) {
}
}
}));
}
// Create producer threads
if(i == 0 || multipleProducer) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
while (System.currentTimeMillis() - startTime <= testDuration) {
Object object = queue.poll();
if (object == null) {
continue;
}
objectPool.put(object);
}
}
}));
}
}
threads.forEach(thread -> thread.start());
try {
Thread.sleep(2 * testDuration);
} catch (InterruptedException e) {
}
// Add all borrowed items back to object pool.
Object object;
while ((object = queue.poll()) != null) {
objectPool.put(object);
}
// Wait more time, so it will allow time to consumer threads to unblock from take() and exit.
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
// Check if any thread is still working. In that case test is failed.
threads.forEach(thread -> {
if(thread.isAlive()) {
Arrays.stream(thread.getStackTrace()).forEach(st -> System.out.printf("%s%n", st));
Assert.assertFalse(objectPool.toString(), thread.isAlive());
}
});
Assert.assertTrue(objectPool.toString(),objectPool.getPooledObjectsCount() == objectPool.getTotalObjectsCount());
if(objectPool.getMaximumPoolSize() > 0) {
Assert.assertTrue(objectPool.toString(), objectPool.getTotalObjectsCount() <= objectPool.getMaximumPoolSize());
}
}
}