I've recently given a coding interview on a Java concurrency task and unfortunately didn't get the job. The worst part is I've given my best but now I'm not even sure where went wrong. Can anyone help give me some ideas about things I can improve on below code?

The question is pretty vague. Given 4 generic interfaces which on a high level divides a task into small pieces, work on each piece and combine the partial result into final result. I'm asked to implement the central controller piece of the interface. The only requirement is to use concurrency in the partial result processing and "code must be production quality".

My code is as below (the interfaces were given). I did put in a lot of comment to explain my assumptions which are removed here.

// adding V,W in order to use in private fields types
public class ControllerImpl<T, U, V, W> implements Controller<T, U> {

    private static Logger logger = LoggerFactory.getLogger(ControllerImpl.class);

    private static int BATCH_SIZE = 100;

    private Preprocessor<T, V> preprocessor;
    private Processor<V, W> processor;
    private Postprocessor<U, W> postprocessor;

    public ControllerImpl() {
        this.preprocessor = new PreprocessorImpl<>();
        this.processor = new ProcessorImpl<>();
        this.postprocessor = new PostprocessorImpl<>();
    }

    public ControllerImpl(Preprocessor preprocessor, Processor processor, Postprocessor postprocessor) {
        this.preprocessor = preprocessor;
        this.processor = processor;
        this.postprocessor = postprocessor;
    }

    @Override
    public U process(T arg) {
        if (arg == null) return null;

        final V[] parts = preprocessor.split(arg);
        final W[] partResult = (W[]) new Object[parts.length];

        final int poolSize = Runtime.getRuntime().availableProcessors();  
        final ExecutorService executor = getExecutor(poolSize);

        int i = 0;
        while (i < parts.length) {
            final List<Callable<W>> tasks = IntStream.range(i, i + BATCH_SIZE)
                .filter(e -> e < parts.length) 
                .mapToObj(e -> (Callable<W>) () -> partResult[e] = processor.processPart(parts[e])) 
                .collect(Collectors.toList());
            i += tasks.size();

            try {
                logger.info("invoking batch of {} tasks to workers", tasks.size());
                long start = System.currentTimeMillis();
                final List<Future<W>> futures = executor.invokeAll(tasks); 
                long end = System.currentTimeMillis();
                logger.info("done batch processing took {} ms", end - start);
                for (Future future : futures) {
                    future.get();
                }
            } catch (InterruptedException e) {
                logger.error("{}", e);// have comments to explain better handling according to real business requirement 
            } catch (ExecutionException e) {
                logger.error("error: ", e);
            }
        }

        MoreExecutors.shutdownAndAwaitTermination(executor, 60, TimeUnit.SECONDS);

        return postprocessor.aggregate(partResult);
    }

    private ExecutorService getExecutor(int poolSize) {
        final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("Processor-%d")
            .setDaemon(true)
            .build();
        return new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
    }
}
share|improve this question

Hints

I would have named the generic types properly.

Maybe I would have divided Logging from processing with help of a listener pattern.

I would have expected the input parameter to never be null, so returning null is obsolete.

I would have extracted some methods to have parts of code be named.

Code

On a very high level view I would have expected exactly what you described what the code should do. Splitting an input in parts, processing these parts in parallel, collect the results and aggregate them. This should have been expressed in the "process"-method.

public class Controller<INPUT, RESULT, PARTIALINPUT, PARTIALRESULT> {

    ...

    public RESULT process(INPUT input) {

        PARTIALINPUT[] partialInputArray = preprocessor.split(input);

        List<Future<PARTIALRESULT>> partialResultFutures = processAsynchronously(partialInputArray);
        List<PARTIALRESULT> partialResults = collectPartialResults(partialResultFutures);

        RESULT result = postprocessor.aggregate(partialResults);

        return result;
    }

    private List<PARTIALRESULT> collectPartialResults(List<Future<PARTIALRESULT>> partialResultFutures)
            throws InterruptedException, ExecutionException {

        List<PARTIALRESULT> partialResults = new ArrayList<>();

        for (Future<PARTIALRESULT> future : partialResultFutures) {
            partialResults.add(future.get());
        }

        return partialResults;
    }

    private List<Future<PARTIALRESULT>> processAsynchronously(PARTIALINPUT[] partialInputArray) {

        List<Future<PARTIALRESULT>> partialResultFutures = new ArrayList<>();

        for (PARTIALINPUT partialinput : partialInputArray) {
            Future<PARTIALRESULT> future = processAsyncronously(partialInput);
            partialResultFutures.add(future);
        }

        return partialResultFutures;
    }

    ...

}
share|improve this answer

Different remarks:

You are mixing Java 8 style and classic Java style better would be to use one style either Java 8 Streams or classic "procedural" style.

If you decide to Java 8 Style the whole thing could have been written something like (also not perfect)

public U process(T arg)  {

    List<W> tmp = preProcess(arg)
                .parallel()
                .map(processor::processPart)
                .collect(Collectors.toList());

    return postProcess(tmp); 
}

private Stream<V> preProcess(T arg) {
    return Arrays.stream(preprocessor.split(arg));
}

private U postProcess(List<W> ws) {
    @SuppressWarnings("unchecked")
    W[] wArray = (W[])new Object[ws.size()];
    return postprocessor.aggregate(ws.toArray(wArray));
}

That also uses the java concurrency framework to process the parts.

The block that create the task list is very hard to understand, and if you want use something like that should be extracted to a separate method.

The log output of the execution time per Blok is wrong. invokeAll schedules the execution. The block execution is finished when all returned futures in the returned List return isDone() == true.

The for loop blocks the execution even when there are available threads. The get() operation is blocking. Even worse the first element in your loop may block the execution even all the other futures are already finished and the Threads are waiting in an idle state. Your are not interested in the result so why not using isDone()?

It's not clear why you are using this "Batch" approach. If you want to limit the number of scheduled tasks you can use limit the queue size.

The handling of the InterruptedException in the form you are dealing with it is considered bad practice( see Brian Goetz article about "Dealing with InterruptedException").

Last but not least a design issue: Your Controller must not create a new instance of the provided interfaces - it does not even know how to create an instance of these interfaces.

The two biggest issues are that your code blocks the processing even in cases where it is not necessary and the handling of the InterruptedException.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.