I've recently given a coding interview on a Java concurrency task and unfortunately didn't get the job. The worst part is I've given my best but now I'm not even sure where went wrong. Can anyone help give me some ideas about things I can improve on below code?
The question is pretty vague. Given 4 generic interfaces which on a high level divides a task into small pieces, work on each piece and combine the partial result into final result. I'm asked to implement the central controller piece of the interface. The only requirement is to use concurrency in the partial result processing and "code must be production quality".
My code is as below (the interfaces were given). I did put in a lot of comment to explain my assumptions which are removed here.
// adding V,W in order to use in private fields types
public class ControllerImpl<T, U, V, W> implements Controller<T, U> {
private static Logger logger = LoggerFactory.getLogger(ControllerImpl.class);
private static int BATCH_SIZE = 100;
private Preprocessor<T, V> preprocessor;
private Processor<V, W> processor;
private Postprocessor<U, W> postprocessor;
public ControllerImpl() {
this.preprocessor = new PreprocessorImpl<>();
this.processor = new ProcessorImpl<>();
this.postprocessor = new PostprocessorImpl<>();
}
public ControllerImpl(Preprocessor preprocessor, Processor processor, Postprocessor postprocessor) {
this.preprocessor = preprocessor;
this.processor = processor;
this.postprocessor = postprocessor;
}
@Override
public U process(T arg) {
if (arg == null) return null;
final V[] parts = preprocessor.split(arg);
final W[] partResult = (W[]) new Object[parts.length];
final int poolSize = Runtime.getRuntime().availableProcessors();
final ExecutorService executor = getExecutor(poolSize);
int i = 0;
while (i < parts.length) {
final List<Callable<W>> tasks = IntStream.range(i, i + BATCH_SIZE)
.filter(e -> e < parts.length)
.mapToObj(e -> (Callable<W>) () -> partResult[e] = processor.processPart(parts[e]))
.collect(Collectors.toList());
i += tasks.size();
try {
logger.info("invoking batch of {} tasks to workers", tasks.size());
long start = System.currentTimeMillis();
final List<Future<W>> futures = executor.invokeAll(tasks);
long end = System.currentTimeMillis();
logger.info("done batch processing took {} ms", end - start);
for (Future future : futures) {
future.get();
}
} catch (InterruptedException e) {
logger.error("{}", e);// have comments to explain better handling according to real business requirement
} catch (ExecutionException e) {
logger.error("error: ", e);
}
}
MoreExecutors.shutdownAndAwaitTermination(executor, 60, TimeUnit.SECONDS);
return postprocessor.aggregate(partResult);
}
private ExecutorService getExecutor(int poolSize) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Processor-%d")
.setDaemon(true)
.build();
return new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
}
}