Code Review Stack Exchange is a question and answer site for peer programmer code reviews. Join them; it only takes a minute:

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

I did not find a single working implementation of using an Executor Service with a Blocking Queue. So , I have come with with a working sample. Thoughts?

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Producer implements Runnable {

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {

            for (int i = 0; i < 10000; i++) {
                queue.put(i);
                // System.out.println("Producer ID "+Thread.currentThread().getId()+" is adding task : "+i);
            }
            // poison pill
            // System.out.println("Poison number added! "+Thread.currentThread().getId());
            queue.put(-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    protected BlockingQueue queue = null;
    ConcurrentHashMap<Integer, Integer> hashmap;

    public Consumer(BlockingQueue queue,
            ConcurrentHashMap<Integer, Integer> hashmap) {
        this.queue = queue;
        this.hashmap = hashmap;
    }

    public void run() {
        try {

            while (ExecutorTest.isRunning) {
                Integer i = (Integer) queue.take();
                System.out.println("Consumer " + Thread.currentThread().getId()
                        + ": taking Task : " + i);
                if (i == -1) {
                    queue.put(i);
                    ExecutorTest.isRunning = false;
                    // System.out.println("Setting isRunning to false :  "+Thread.currentThread().getId());
                    break;
                }
                hashmap.put(i, i);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ExecutorTest {

    static BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
    static ConcurrentHashMap<Integer, Integer> hashmap = new ConcurrentHashMap<Integer, Integer>();
    static volatile boolean isRunning = true;

    public static void main(String[] args) {
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue, hashmap);

        new Thread(producer).start();
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        executorService.execute(consumer);
        executorService.execute(consumer);
        executorService.execute(consumer);
        executorService.execute(consumer);

        // wait for the threads to finish
        while (isRunning) {
        };

        executorService.shutdown();
        System.out.println("SHUT DOWN THREAD POOL");

        try {
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("HASHMAP SIZE : " + hashmap.size());
    }

    public static void printHashMap() {
        for (Map.Entry<Integer, Integer> map : hashmap.entrySet()) {
            System.out.println("Entry " + map.getKey() + ":" + map.getValue());
        }
    }

}
share|improve this question
up vote 2 down vote accepted

Producer

protected BlockingQueue queue = null;

Why not apply generics? BlockingQueue<Integer> avoids awkard casting later on.

Why is this protected? Fields are typically private, and there doesn't seem any motivation in your example to widen visibility on the field itself

} catch (InterruptedException e) {
    e.printStackTrace();
}

This is not the proper way to handle an InterruptedException. The ExecutorService uses interruption for forced shutdown (shutDownNow()), but since you eat the interruption the ExecutorService will not be responsive to forced shotdown. In general you would always propagate interruption to the caller.

Consumer

Actually the remarks for Producer go for Consumer as well.

but also

while (ExecutorTest.isRunning) {

Alternatively you could simply use interruption to stop the task : while (!Thread.currentThread().isInterrupted()). But what's worse is that isRunning is actually a field on ExecutorTest, This means Consumer class depends on ExecutorTest class for its proper working.

ExecutorTest

ExecutorService executorService = Executors.newFixedThreadPool(4);

executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);
executorService.execute(consumer);

Perhaps a small for loop, and make the literal 4 variable or a constant.

// wait for the threads to finish
while (isRunning) {
};

This is unnecessary, you already do executorService.awaitTermination(10, TimeUnit.SECONDS); A busy wait like this is also a poorer way to await some other thread, as you are wasting CPU cycles.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.