ControlDaemon

The ControlDaemon class implements the Runnable interface and is started as a separate thread. The constructor gets the set of the parameters that were read from the properties files and converts them to a set of ParamsAndHandle objects, shown as follows: 

private final Set<ParamsAndHandle> handlers;

public ControlDaemon(Set<Parameters> params) {
handlers = params.stream()
.map( s -> new ParamsAndHandle(s,null))
.collect(Collectors.toSet());
}

Because the processes are not started at this point, the handles are all null. The run() method is used to start the processes, as follows:

@Override
public void run() {
try {
for (ParamsAndHandle pah : handlers) {
log.log(DEBUG, "Starting {0}", pah.params);
ProcessHandle handle = start(pah.params);
pah.handle = handle;
}
keepProcessesAlive();
while (handlers.size() > 0) {
allMyProcesses().join();
}
} catch (IOException e) {
log.log(ERROR, e);
}
}

Processing goes through the set of parameters and uses the method (implemented in this class later) to start the processes. The handles to each process get to the ParamsAndHandle object. After that, the keepProcessesAlive() method is called and waits for the processes to finish. When a process stops it gets restarted. If it cannot be restarted it will be removed from the set. 

The allMyProcesses() method (also implemented in this class) returns a CompletableFuture that gets completed when all the started processes have stopped. Some of the processes may have been restarted by the time the join() method returns. As long as there is at least one process running, the thread should run.

Using the CompletableFuture to wait for the processes and the while loop, we use minimal CPU to keep the thread alive as long there is at least one process we manage to run, presumably even after a few restarts. We have to keep this thread alive even if it does not use CPU and executes no code most of the time to let the keepProcessesAlive() method do its work using CompletableFutures. The method is shown in the following code snippet:

private void keepProcessesAlive() {
anyOfMyProcesses().thenAccept(ignore -> {
restartProcesses();
keepProcessesAlive();
});
}

The keepProcessesAlive() method calls the anyOfMyProcesses() method that returns CompletableFuture, which is completed when any of the managed processes exits. The method schedules to execute the Lambda passed as an argument to the thenAccept() method for the time the CompletableFuture is completed. The Lambda does two things:

  • Restarts the processes that are stopped (probably only one)
  • Calls the keepProcessesAlive() method

It is important to understand that this call is not performed from within the keepProcessesAlive() method itself. This is not a recursive call. This is scheduled as a CompletableFuture action. We are not implementing a loop in a recursive call, because we would run out of stack space. We ask the JVM executors to execute this method again when the processes are restarted.

The JVM uses the default ForkJoinPool to schedule these tasks and this pool contains daemon threads. That is the reason we have to wait and keep the method running because that is the only non-daemon thread that prevents the JVM from exiting.

The next method, shown as follows, is restartProcesses():

private void restartProcesses() {
Set<ParamsAndHandle> failing = new HashSet<>();
handlers.stream()
.filter(pah -> !pah.toHandle().isAlive())
.forEach(pah -> {
try {
pah.handle = start(pah.params);
} catch (IOException e) {
failing.add(pah);
}
});
handlers.removeAll(failing);
}

This method starts the processes that are in our set of managed processes and which are not alive. If any of the restarts fail, it removes the failing processes from the set. (Be aware not to remove it in the loop to avoid ConcurrentModificationException). The anyOfMyProcesses() and allMyProcesses() methods are using the auxiliary completableFuturesOfTheProcessesand() method and are straightforward, shown as follows:

private CompletableFuture anyOfMyProcesses() {
return CompletableFuture.anyOf(
completableFuturesOfTheProcesses());
}

private CompletableFuture allMyProcesses() {
return CompletableFuture.allOf(
completableFuturesOfTheProcesses());
}

The completableFuturesOfTheProcesses() method returns an array of CompletableFutures created from the currently running managed processes calling their onExit() method. This is done in a compact and easy-to-read functional programming style, as shown here:

private CompletableFuture[] completableFuturesOfTheProcesses() {
return handlers.stream()
.map(ParamsAndHandle::toHandle)
.map(ProcessHandle::onExit)
.collect(Collectors.toList())
.toArray(new CompletableFuture[handlers.size()]);
}

The set is converted to a Stream, mapped to a Stream of ProcessHandle objects (this is why we needed the toHandle() method in the ParamsAndHandle class). Then the handles are mapped to the CompletableFuture stream using the onExit() method and finally, we collect it to a list and convert it to an array.

Our last method to complete our sample application is as follows: 

private ProcessHandle start(Parameters params) 
throws IOException {
return new ProcessBuilder(params.commandLine)
.start().toHandle();
}

This method starts the process using ProcessBuilder and returns ProcessHandle so that we can replace the old one in our set and manage the new process.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset