For a large, multi-threaded video processing application, I need to manage various child processes. I need to monitor their stdout and stderr.
My key concerns:
- I should not have any spin loops. I have a
Thread.sleep(10)
on myinputHandler
(... which is actually a process output handler, but okay), but that's because it has to block until either the process has something new to say, or the process has exited. - Methods not spawning a new thread should not block unless stated otherwise.
- Methods should be thread safe.
- Am I using too much threads?
I also implemented a listener that waits until a process dies, which will be used to schedule cleanup for any missed files and then to restart the process if needed. I did research as to what would be the best way to know when a thread dies, but all I found was "make the thread notify listeners as the last task". This introduces a vulnerability where if a thread dies, nobody is ever notified. And I don't trust all subclasses of ProcessHandler
to never fail with handling output.
ProcessListener
public interface ProcessListener {
public void onProcessEnd(ProcessHandler processEnded);
}
ProcessHandler
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public abstract class ProcessHandler {
private final Process process;
private final BufferedReader stderr;
private final BufferedReader stdin;
private boolean stopSignalReceived = false;
private final List<ProcessListener> processListeners = new ArrayList<>();
private Thread inputHandler;
public ProcessHandler(Process process) {
this.process = process;
stderr = new BufferedReader(new InputStreamReader(process.getErrorStream()));
stdin = new BufferedReader(new InputStreamReader(process.getInputStream()));
inputHandler = new Thread(new Runnable() {
@Override
public void run() {
do {
handleInput();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignooore.
}
} while (!hasStopped());
// after stopping, check 1 last time for missed reads.
handleInput();
}
private void handleInput(){
try {
while (stderr.ready()) {
onStandardErr(stderr.readLine());
}
while (stdin.ready()) {
onStandardOut(stdin.readLine());
}
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
});
inputHandler.start();
final ProcessHandler instance = this;
new Thread(new Runnable(){
@Override
public void run() {
do{
try{
inputHandler.join();
break;
} catch (InterruptedException ignored){
}
} while(true);
synchronized(processListeners){
for(ProcessListener listener : processListeners){
listener.onProcessEnd(instance);
}
}
}
}).start();
}
public void addListener(ProcessListener listenerToAdd){
processListeners.add(listenerToAdd);
}
protected abstract void onStandardOut(String line);
protected abstract void onStandardErr(String line);
public Integer getExitValue() {
try {
return process.exitValue();
} catch (IllegalThreadStateException e) {
return null;
}
}
public boolean hasStopped() {
return getExitValue() != null;
}
public boolean isStopping() {
return stopSignalReceived || hasStopped();
}
/**
* Blocking - Waits for the managed process to end.
* Returns early if the process has not stopped and has not been given a signal to stop either.
* Recommended usage is to call stop first.
*/
public void waitForOutputProcessing() {
if (!isStopping()) {
return;
}
do {
try {
inputHandler.join();
return;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} while (true);
}
public void stop() {
if (!stopSignalReceived) {
stopSignalReceived = true;
if (!hasStopped()) {
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
sendKillCommand();
} else {
process.destroy();
}
}
}
}
private void sendKillCommand() {
try {
final Field pidField = process.getClass().getDeclaredField("pid");
pidField.setAccessible(true);
final int pid = pidField.getInt(process);
final ProcessBuilder builder = new ProcessBuilder();
builder.command("kill", "" + pid);
new Thread(new Runnable() {
@Override
public void run() {
Process killProcess = null;
do {
try {
if (hasStopped()) {
return;
}
if (killProcess == null) {
killProcess = builder.start();
}
killProcess.waitFor();
process.waitFor();
return;
} catch (InterruptedException e) {
// Ignore interrupts. Continue retrying to murder
// ffmpeg process.
} catch (IOException e) {
// IO error occurred.
if (!hasStopped()) {
throw new IllegalStateException("Process (Process ID: " + pid + ") failed to be killed.", e);
}
return;
}
} while (true);
}
}).start();
} catch (IllegalAccessException | NoSuchFieldException | SecurityException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
process.destroy();// So we don't know the PID. Guess the only way to
// progress is to just kill FFMPEG.
}
}
}