Apache Kafka as a data dispatcher

For our proposed architecture, we need to decouple acquisition from processing, improving the scalability and the independence of the layers. To achieve this goal, we can use a queue. We could either use Java Message Service (JMS) or Advanced Message Queuing Protocol (AMQP), but in this case we are going to use Apache Kafka. This is supported by most common analytics platforms, it has a very high performance and scalability, and it also has a good analytics framework.

In Kafka, each topic is divided into a set of logs called partitions. The producers write to the tail of Kafka's logs and consumers read the logs. Apache Kafka scales topic consumption by distributing partitions among a consumer group. A consumer group is a set of consumers which share a common group identifier. The following diagram shows a topic with three partitions and two consumer groups with two members:

Kafka architecture—each topic is organized in partitions

Each partition in the topic is assigned to exactly one member in the group.

Apache Kafka will only conserve the order inside a partition. If we need to preserve the order, we can use RabbitMQ or reduce the number of partitions to one.

We have to install Apache Kafka and an MQTT plugin which subscribes to Mosquitto's topics. The MQTT plugin is available at https://github.com/evokly/kafka-connect-mqtt, or we can use an already-configured Docker container at https://github.com/PacktPublishing/Hands-On-Industrial-Internet-of-ThingsEnter the following command in the console:

$ git clone https://github.com/PacktPublishing/Hands-On-Industrial-Internet-of-Things
$ cd Chapter08/kafka-mqtt-connector
$ docker build . -t iiot-book/kafka-mqtt

Docker will build the image using the following  Dockerfile:

FROM wurstmeister/kafka

EXPOSE 9092
EXPOSE 2881

# INSTALL GETTEXT
RUN apk update
&& apk add gettext

# Install Connector (jar plugin)
ADD kafka-connect-mqtt-1.1-SNAPSHOT.jar $KAFKA_HOME/libs/
ADD org.eclipse.paho.client.mqttv3-1.0.2.jar $KAFKA_HOME/libs/

# MQTT
ADD mqtt.properties /tmp/mqtt.properties
ADD start-all.sh start-all.sh

VOLUME ["/kafka"]

# Run kairosdb in foreground on boot
CMD ["/start-all.sh"]

The current Docker image uses an existing standard Kafka installation and copies the JAR plugin and mqtt.properties on it.

These examples lose data when the container is stopped. To keep data, we need to mount an external volume.

We can now test our chain. Get your IP address using ifconfig (or ipconfig for Windows), then launch the following command (please replace the <ip> marker with your IP address, such as 192.168.0.1):

$ docker run -p 9092:9092 -p 2181:2181 -e MQTT_URI=tcp://<ip>:1883 -e KAFKA_ADVERTISED_HOST_NAME=<ip> iiot-book/kafka-mqtt:latest

We now need to subscribe to the MQTT Kafka topic. We can install an official consumer from the Apache Kafka distribution (https://kafka.apache.org/downloads) and run the following command:

$ <your kafka home>/bin/kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic mqtt --from-beginning

Alternatively, we can run the same Docker image using the interactive mode:

$ docker run -it iiot-book/kafka-mqtt:latest /opt/kafka/bin/kafka-console-consumer.sh 
bootstrap-server <ip>:9092 --topic mqtt --from-beginning

To test the status of our containers, we can run the following command:

$ docker ps

To test our chain, we can publish our first message to the MQTT data broker:

$ mqtt-cli localhost topic/device0 "device0.my.measure.temperature, 27,GOOD"

The expected output should look as follows:

Testing the MQTT Kafka chain, the top window shows the expected output, while the central window shows the signal sent

The MQTT Kafka publishes a message in Kafka's queue called mqtt and encodes the payload to base-64:

{"schema":{"type":"bytes","optional":false},"payload":"ZGV2aWNlMC5teS5tZWFzdXJlLnRlbXBlcmF0dXJlLCAyOCxHT09E"}

It produces a Kafka key with the name of the MQTT topic:

{"schema":{"type":"string","optional":false},"payload":"topic/device0"}

To provide good scalability, Apache Kafka uses a key for partitioning. This means that every device or group of devices can push messages on different MQTT topics.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset