The football player microservice – Vert.x + RxJava

We have implemented our football player microservice in order to make it asynchronous and non-I/O-blocking. But we can obtain something more if we use Vert.x and RxJava together.

RxJava is a great implementation of Rx for the Java programming language, and gives you some features that combine the power of Vert.x Future with the benefits of Rx operators.

In order to use RxJava, you must set the following dependency in your Maven pom.xml:

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
</dependency>

Let's start with an easy example.

We can create three new classes that implement the same methods described earlier, but in a reactive manner:

  •  FootballPlayerReactiveVerticle
  •  ActionHelperReactive
  •  FootballPlayerReactiveDAO

In the FootballPlayerVerticle class, we have created the method needed to build the HTTP server that handles requests to our APIs:

private Future<Void> createHttpServer(JsonObject config, Router router) {
Future<Void> future = Future.future();
vertx.createHttpServer().requestHandler(router::accept).listen(config.getInteger("HTTP_PORT",
8080),res -> future.handle(res.mapEmpty()));
return future;
}

We modify the FootballPlayerReactiveVerticle class in order to return an Rx Completable class, which is a stream that indicates its completion:

private Completable createHttpServerReactive(JsonObject config, Router router) {
return vertx.createHttpServer().requestHandler(router::accept)
.rxListen(config.getInteger("HTTP_PORT", 8080)).toCompletable();
}

Another important element is the way we connect to the database.

In our DAO, we have created the following method to do this:

public Future<SQLConnection> connect(JDBCClient jdbc) {
Future<SQLConnection> future = Future.future();
jdbc.getConnection(ar -> future.handle(ar.map(c -> c.setOptions(
new SQLOptions().setAutoGeneratedKeys(true))))
);
return future;
}

In the FootballPlayerReactiveDAO class, we can change the method to return an Rx Single instead of a Future. A Single is like an object of Observable, but instead of emitting a series of values, it emits one value or an error notification. The following is the revisited method:

public Single<SQLConnection> connectReactive(JDBCClient jdbc) {
return jdbc.rxGetConnection().map(c -> c.setOptions(new
SQLOptions().setAutoGeneratedKeys(true)));
}

All interactions will be done in a reactive manner:  for example, to read the file that contains the instructions for creating database tables and populate them, we can use  rxReadFile from the FileSystem class:

public Single<SQLConnection> createTableIfNeeded(FileSystem fileSystem, SQLConnection connection) {
return fileSystem.rxReadFile("schema.sql").map(Buffer::toString)
.flatMapCompletable(connection::rxExecute).toSingleDefault(connection);
}

public Single<SQLConnection> createSomeDataIfNone(FileSystem fileSystem, SQLConnection
connection) {
return connection.rxQuery("SELECT * FROM football_player").flatMap(rs -> {
if (rs.getResults().isEmpty()) {
return fileSystem.rxReadFile("data.sql")
.map(Buffer::toString)
.flatMapCompletable(connection::rxExecute)
.toSingleDefault(connection);
} else {
return Single.just(connection);
}
});
}

All the classes used in the reactive version of our implementation belong to the io.vertx.reactivex package.

Also, the method needed to perform CRUD operations can be revisited in a reactive manner. For example, the update method can be rewritten in this way:

public Single<FootballPlayer> update(SQLConnection connection, String id, FootballPlayer 
footballPlayer) {
String sql = "UPDATE football_player SET name = ?, surname = ?, age = ?, team = ?, position =
?, price = ? WHERE id = ?";
return connection.rxUpdateWithParams(sql, new JsonArray().add(
footballPlayer.getName()).add(footballPlayer.getSurname())
.add(footballPlayer.getAge()).add(footballPlayer.getTeam())
.add(footballPlayer.getPosition()).add(footballPlayer.
getPrice().intValue()).add(Integer.valueOf(id)))
.map(res -> new FootballPlayer(res.getKeys().getInteger(0),
footballPlayer.getName(), footballPlayer.getSurname(),
footballPlayer.getAge(), footballPlayer.getTeam(),
footballPlayer.getPosition(), footballPlayer.getPrice()))
.doFinally(() -> {
connection.close();
});
}

The main difference from the version implemented in FootballPlayerDAO is the use of rxUpdateWithParams, which will execute the SQL UPDATE operation and return a Single object instead of a Future, closing the JDBC connection at the end of the method.

The final pipeline to build the HTTP server, using the configurations defined in the Config files, which must be able to handle our APIs, is as follows:

retriever.rxGetConfig().doOnSuccess(config -> jdbc = JDBCClient.createShared(vertx,
config, "My-Reading-List"))
.flatMap(config -> dao.connect(jdbc)
.flatMap(connection -> this.createTableIfNeeded(connection)
.flatMap(this::createSomeDataIfNone)
.doAfterTerminate(connection::close))
.map(x -> config))
.flatMapCompletable(config -> createHttpServer(config, router))
.subscribe(CompletableHelper.toObserver(fut));

I used the flatMap method to concatenate the operations, and the doOnSuccess method to receive the item from the observed stream and implement logic related to them.

The key part of the pipeline is the subscribe method: if it's not invoked, nothing will happen since streams are lazy.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset