2
\$\begingroup\$

Main ObjectPool Class: It uses atomic operations and a concurrent linked queue to achieve the best performance compared to Apache commons pool. I want to make sure there are no concurrency issues even though unit tests passed without any issue.

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ObjectPool<T> {
    private static final boolean LOGS_ENABLED = false;

    private AtomicInteger totalObjectsCount = new AtomicInteger();
    private AtomicInteger pooledObjectsCount = new AtomicInteger();

    private int initialPoolSize;
    private int maximumPoolSize;

    private Class<T> type;
    private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<T>();
    private Object waitSync = new Object();

    public ObjectPool(Class<T> type) {
        this(type, 0);
    }

    public ObjectPool(Class<T> type, int initialPoolSize) {
        this(type, initialPoolSize, -1);
    }

    public ObjectPool(Class<T> type, int initialPoolSize, int maximumPoolSize) {
        if(initialPoolSize < 0) {
            throw new IllegalArgumentException("initialPoolSize");
        }

        if(maximumPoolSize < -1 || (maximumPoolSize != -1 && initialPoolSize > maximumPoolSize)) {
            throw new IllegalArgumentException("maximumPoolSize");
        }

        if(type != null) {
             try {
                 type.getConstructor();
             } catch (NoSuchMethodException e) {
                 throw new IllegalArgumentException("type", e);
             }
        } else {
            if(getClass() == ObjectPool.class) {
                throw new IllegalArgumentException("type");
            }
        }

        this.initialPoolSize = initialPoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.type = type;
        if(type != null) {
            initialize();
        }
    }

    public int getInitialPoolSize() {
        return initialPoolSize;
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public int getPooledObjectsCount() {
        return pooledObjectsCount.get();
    }

    public int getTotalObjectsCount() {
        return totalObjectsCount.get();
    }

    protected void initialize() {
        List<T> itemList = new LinkedList<T>();
        for (int i = 0; i < initialPoolSize; i++) {
            try {
                itemList.add(take());
            } catch (InterruptedException e) {
            }
        }

        for (T item:
                itemList) {
            put(item);
        }
    }

    protected T create() {
        if(type != null) {
            try {
                return type.newInstance();
            } catch (InstantiationException e) {
                throw new UnsupportedOperationException(e);
            } catch (IllegalAccessException e) {
                throw new UnsupportedOperationException(e);
            }
        } else {
            throw new UnsupportedOperationException();
        }
    }

    public T take() throws InterruptedException {
        T object;
        while ((object = queue.poll()) == null) {
            if(maximumPoolSize <= 0 || this.totalObjectsCount.get() < maximumPoolSize) {
                boolean success = false;
                try {
                    int totalObjectsCount = this.totalObjectsCount.incrementAndGet();
                    if (maximumPoolSize <= 0 || totalObjectsCount <= maximumPoolSize) {
                        object = create();
                        if (LOGS_ENABLED) {
                            Log.info("%s Object Created :- Created : %d, Maximum : %d", getClass().getSimpleName(), this.totalObjectsCount.get(), maximumPoolSize);
                        }
                        success = true;
                        return object;
                    }
                } finally {
                    if(!success) {
                        this.totalObjectsCount.decrementAndGet();
                        // Notify other sleeping takers, so that one of them can continue creating new object.
                        synchronized (waitSync) {
                            waitSync.notify();
                        }
                    }
                }
            }
            synchronized (waitSync) {
                if (this.pooledObjectsCount.get() <= 0) {
                    if (LOGS_ENABLED) {
                        Log.info("%s Waiting for Object :- Created : %d, Maximum : %d", getClass().getSimpleName(), this.totalObjectsCount.get(), maximumPoolSize);
                    }
                    waitSync.wait();
                }
            }
        }
        this.pooledObjectsCount.decrementAndGet();
        return object;
    }

    public void put(T e) {
        queue.add(e);
        // Notify consumer when first object arrives
        if(this.pooledObjectsCount.incrementAndGet() <= 1) {
            synchronized (waitSync) {
                waitSync.notifyAll();
            }
        }
    }

    @Override
    public String toString() {
        return String.format("PooledObjectsCount : %d, TotalObjectsCount : %d, MaxPoolSize : %d", getPooledObjectsCount(), getTotalObjectsCount(), getMaximumPoolSize());
    }
}

Unit Test Class:

public class ObjectPoolTest {



    @Test
    public void testConstructors() throws Exception {

        ObjectPool<Object> objectPool = null;

        try {
            objectPool = null;
            objectPool = new ObjectPool<Object>(null, 10, 20);
        } catch (IllegalArgumentException e) {
        }
        Assert.assertNull(objectPool);

        try {
            objectPool = null;
            objectPool = new ObjectPool<Object>(Object.class, -10, 5);
        } catch (IllegalArgumentException e) {
        }
        Assert.assertNull(objectPool);

        try {
            objectPool = null;
            objectPool = new ObjectPool<Object>(Object.class, 10, -20);
        } catch (IllegalArgumentException e) {
        }
        Assert.assertNull(objectPool);

        try {
            objectPool = null;
            objectPool = new ObjectPool<Object>(Object.class, 10, 20);
            Assert.assertTrue(objectPool.getInitialPoolSize() == 10);
            Assert.assertTrue(objectPool.getMaximumPoolSize() == 20);
            Assert.assertTrue(objectPool.getPooledObjectsCount() == 10);
        } catch (IllegalArgumentException e) {
        }
        Assert.assertNotNull(objectPool);
    }

    @Test
    public void testUnboundedPool() {
        // Unbounded Pool test is risky on build server. Just test it on dev machine.
        testSingleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class));
        testSingleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class));
        testMultipleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class));
        testMultipleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class));
    }

    @Test
    public void testBoundedPool() {
        testSingleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
        testSingleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
        testMultipleProducerAndSingleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
        testMultipleProducerAndMultipleConsumer(new ObjectPool<Object>(Object.class, 100000, 200000));
    }

    private void testSingleProducerAndSingleConsumer(ObjectPool<Object> objectPool) {
        testUsingMultipleThreads(objectPool, false, false);
    }

    private void testSingleProducerAndMultipleConsumer(ObjectPool<Object> objectPool) {
        testUsingMultipleThreads(objectPool, false, true);
    }

    private void testMultipleProducerAndSingleConsumer(ObjectPool<Object> objectPool) {
        testUsingMultipleThreads(objectPool, true, false);
    }

    private void testMultipleProducerAndMultipleConsumer(ObjectPool<Object> objectPool) {
        testUsingMultipleThreads(objectPool, true, true);
    }

    private void testUsingMultipleThreads(ObjectPool<Object> objectPool, boolean multipleProducer, boolean multipleConsumer) {
        List<Thread> threads = new ArrayList<>();

        ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
        long startTime = System.currentTimeMillis();;
        int testDuration = 10 * 1000;
        int threadCount = 50;

        for (int i = 0; i < threadCount; i++) {
            // Create consumer threads.
            if(i == 0 || multipleConsumer) {
                threads.add(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            while (System.currentTimeMillis() - startTime <= testDuration) {
                                queue.add(objectPool.take());
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                }));
            }

            // Create producer threads
            if(i == 0 || multipleProducer) {
                threads.add(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (System.currentTimeMillis() - startTime <= testDuration) {
                            Object object = queue.poll();
                            if (object == null) {
                                continue;
                            }
                            objectPool.put(object);
                        }
                    }
                }));
            }
        }

        threads.forEach(thread -> thread.start());
        try {
            Thread.sleep(2 * testDuration);
        } catch (InterruptedException e) {
        }

        // Add all borrowed items back to object pool.
        Object object;
        while ((object = queue.poll()) != null) {
            objectPool.put(object);
        }

        // Wait more time, so it will allow time to consumer threads to unblock from take() and exit.
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
        }

        // Check if any thread is still working. In that case test is failed.
        threads.forEach(thread -> {
            if(thread.isAlive()) {
                Arrays.stream(thread.getStackTrace()).forEach(st -> System.out.printf("%s%n", st));
                Assert.assertFalse(objectPool.toString(), thread.isAlive());
            }
        });

        Assert.assertTrue(objectPool.toString(),objectPool.getPooledObjectsCount() == objectPool.getTotalObjectsCount());

        if(objectPool.getMaximumPoolSize() > 0) {
            Assert.assertTrue(objectPool.toString(), objectPool.getTotalObjectsCount() <= objectPool.getMaximumPoolSize());
        }
    }
}
\$\endgroup\$

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.