For our proposed architecture, we need to decouple acquisition from processing, improving the scalability and the independence of the layers. To achieve this goal, we can use a queue. We could either use Java Message Service (JMS) or Advanced Message Queuing Protocol (AMQP), but in this case we are going to use Apache Kafka. This is supported by most common analytics platforms, it has a very high performance and scalability, and it also has a good analytics framework.
In Kafka, each topic is divided into a set of logs called partitions. The producers write to the tail of Kafka's logs and consumers read the logs. Apache Kafka scales topic consumption by distributing partitions among a consumer group. A consumer group is a set of consumers which share a common group identifier. The following diagram shows a topic with three partitions and two consumer groups with two members:
Each partition in the topic is assigned to exactly one member in the group.
We have to install Apache Kafka and an MQTT plugin which subscribes to Mosquitto's topics. The MQTT plugin is available at https://github.com/evokly/kafka-connect-mqtt, or we can use an already-configured Docker container at https://github.com/PacktPublishing/Hands-On-Industrial-Internet-of-Things. Enter the following command in the console:
$ git clone https://github.com/PacktPublishing/Hands-On-Industrial-Internet-of-Things
$ cd Chapter08/kafka-mqtt-connector
$ docker build . -t iiot-book/kafka-mqtt
Docker will build the image using the following Dockerfile:
FROM wurstmeister/kafka
EXPOSE 9092
EXPOSE 2881
# INSTALL GETTEXT
RUN apk update
&& apk add gettext
# Install Connector (jar plugin)
ADD kafka-connect-mqtt-1.1-SNAPSHOT.jar $KAFKA_HOME/libs/
ADD org.eclipse.paho.client.mqttv3-1.0.2.jar $KAFKA_HOME/libs/
# MQTT
ADD mqtt.properties /tmp/mqtt.properties
ADD start-all.sh start-all.sh
VOLUME ["/kafka"]
# Run kairosdb in foreground on boot
CMD ["/start-all.sh"]
The current Docker image uses an existing standard Kafka installation and copies the JAR plugin and mqtt.properties on it.
These examples lose data when the container is stopped. To keep data, we need to mount an external volume.
We can now test our chain. Get your IP address using ifconfig (or ipconfig for Windows), then launch the following command (please replace the <ip> marker with your IP address, such as 192.168.0.1):
$ docker run -p 9092:9092 -p 2181:2181 -e MQTT_URI=tcp://<ip>:1883 -e KAFKA_ADVERTISED_HOST_NAME=<ip> iiot-book/kafka-mqtt:latest
We now need to subscribe to the MQTT Kafka topic. We can install an official consumer from the Apache Kafka distribution (https://kafka.apache.org/downloads) and run the following command:
$ <your kafka home>/bin/kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic mqtt --from-beginning
Alternatively, we can run the same Docker image using the interactive mode:
$ docker run -it iiot-book/kafka-mqtt:latest /opt/kafka/bin/kafka-console-consumer.sh
bootstrap-server <ip>:9092 --topic mqtt --from-beginning
To test the status of our containers, we can run the following command:
$ docker ps
To test our chain, we can publish our first message to the MQTT data broker:
$ mqtt-cli localhost topic/device0 "device0.my.measure.temperature, 27,GOOD"
The expected output should look as follows:
The MQTT Kafka publishes a message in Kafka's queue called mqtt and encodes the payload to base-64:
{"schema":{"type":"bytes","optional":false},"payload":"ZGV2aWNlMC5teS5tZWFzdXJlLnRlbXBlcmF0dXJlLCAyOCxHT09E"}
It produces a Kafka key with the name of the MQTT topic:
{"schema":{"type":"string","optional":false},"payload":"topic/device0"}
To provide good scalability, Apache Kafka uses a key for partitioning. This means that every device or group of devices can push messages on different MQTT topics.