209. Barrier

A barrier is a Java synchronizer that allows a group of threads (known as parties) to reach a common barrier point. Basically, a group of threads waits for each other to meet at the barrier. It is like a bunch of friends who decide on a meeting point, and when all of them get this point, they go farther together. They won't leave the meeting point until all of them have arrived or until they feel they've been waiting too long.

This synchronizer works well for problems that rely on a task that can be divided into subtasks. Each subtask runs in a different thread and waits for the rest of the threads. When all the threads complete, they combine their results in a single result.

This following diagram shows an example of a barrier flow with three threads:

In API terms, a barrier is implemented using java.util.concurrent.CyclicBarrier.

A CyclicBarrier can be constructed via two constructors:

  • One of them allows us to specify the number of parties (this is an integer)
  • The other one allows us to add an action that should take place after all parties are at the barrier (this is a Runnable)

This action takes place when all threads in the party arrive, but before the release of any threads.

When a thread is ready to wait at the barrier, it simply calls the await() method. This method can wait indefinitely or until the specified timeout (if the specified timeout elapses or the thread is interrupted, this thread is released with a TimeoutException; the barrier is considered broken, and all the waiting threads at the barrier are released with a BrokenBarrierException). We can find out how many parties are required to trip this barrier via the getParties() method and how many are currently waiting at the barrier via the getNumberWaiting() method.

The await() method returns an integer that represents the arrival index of the current thread, where the index getParties()— 1 or 0 indicates the first or the last to arrive, respectively.

Let's assume that we want to start a server. The server is considered started after its internal services have started. Services can be prepared for start concurrently (this is time-consuming), but they run interdependently, therefore, once they are ready to start, they must be started all at once.

So, each service can be prepared to start in a separate thread. Once it is ready to start, the thread will wait at the barrier for the rest of the services. When all of them are ready to start, they cross the barrier and start running. Let's consider three services, so CyclicBarrier can be defined as follows:

Runnable barrierAction
= () -> logger.info("Services are ready to start ...");

CyclicBarrier barrier = new CyclicBarrier(3, barrierAction);

And, let's prepare the services via three threads:

public class ServerInstance implements Runnable {

private static final Logger logger
= Logger.getLogger(ServerInstance.class.getName());

private final Runnable barrierAction
= () -> logger.info("Services are ready to start ...");

private final CyclicBarrier barrier
= new CyclicBarrier(3, barrierAction);

@Override
public void run() {
logger.info("The server is getting ready to start ");
logger.info("Starting services ... ");

long starting = System.currentTimeMillis();

Thread service1 = new Thread(
new ServerService(barrier, "HTTP Listeners"));
Thread service2 = new Thread(
new ServerService(barrier, "JMX"));
Thread service3 = new Thread(
new ServerService(barrier, "Connectors"));

service1.start();
service2.start();
service3.start();

try {
service1.join();
service2.join();
service3.join();

logger.info(() -> "Server has successfully started in "
+ (System.currentTimeMillis() - starting) / 1000
+ " seconds");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
}
}
}

The ServerService is responsible for preparing each service to start and blocking it at the barrier via await():

public class ServerService implements Runnable {

private static final Logger logger =
Logger.getLogger(ServerService.class.getName());

private final String serviceName;
private final CyclicBarrier barrier;
private final Random rnd = new Random();

public ServerService(CyclicBarrier barrier, String serviceName) {
this.barrier = barrier;
this.serviceName = serviceName;
}

@Override
public void run() {

int startingIn = rnd.nextInt(10) * 1000;

try {
logger.info(() -> "Preparing service '"
+ serviceName + "' ...");

Thread.sleep(startingIn);
logger.info(() -> "Service '" + serviceName
+ "' was prepared in " + startingIn / 1000
+ " seconds (waiting for remaining services)");

barrier.await();

logger.info(() -> "The service '" + serviceName
+ "' is running ...");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
} catch (BrokenBarrierException ex) {
logger.severe(() -> "Exception ... barrier is broken! " + ex);
}
}
}

Now, let's run it:

Thread server = new Thread(new ServerInstance());
server.start();

Here is a possible output (notice how the threads have been released to cross the barrier):

[10:38:34] [INFO] The server is getting ready to start

[10:38:34] [INFO] Starting services ...
[10:38:34] [INFO] Preparing service 'Connectors' ...
[10:38:34] [INFO] Preparing service 'JMX' ...
[10:38:34] [INFO] Preparing service 'HTTP Listeners' ...

[10:38:35] [INFO] Service 'HTTP Listeners' was prepared in 1 seconds
(waiting for remaining services)
[10:38:36] [INFO] Service 'JMX' was prepared in 2 seconds
(waiting for remaining services)
[10:38:38] [INFO] Service 'Connectors' was prepared in 4 seconds
(waiting for remaining services)

[10:38:38] [INFO] Services are ready to start ...

[10:38:38] [INFO] The service 'Connectors' is running ...
[10:38:38] [INFO] The service 'HTTP Listeners' is running ...
[10:38:38] [INFO] The service 'JMX' is running ...

[10:38:38] [INFO] Server has successfully started in 4 seconds
A CyclicBarrier is cyclic because it can be reset and reused. For this, call the reset() method after all threads waiting at the barrier are released, otherwise BrokenBarrierException will be thrown.

A barrier that is in a broken state will cause the isBroken() flag method to return true.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset