Kafka Streams as a Rule Engine

Kafka Streams is a client library for building mission-critical or real-time applications, where the input and/or the output data is stored in Apache Kafka clusters. We can use Kafka Streams to build a minimalistic rule engine. We can implement a simple Java project with Maven. We can assume the following standard Maven project structure:

<project home>
+ srcmainjava<java package>RuleEngineDemo.java
+ pom.xml

To build our project we have to accomplish the following steps:

  1.  We have to define the file pom.xml in the root directory of the project. Then, we can add the following dependencies:
...
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.2</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
...
  1. We can write our consumer. On the srcmainjava<java package> directory of the project we can write a simple Java class RuleEngineDemo.java, with the following code:
import java.util.*;
import java.util.concurrent.*;

import org.apache.commons.collections4.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.internals.*;

class RuleEngineDemo {
// window size within which the filtering is applied
private static final int WINDOW_SIZE = 5;

///HERE KEY and VALUE EXTRACTOR

public static void main(String[] args) throws Exception {

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-measures");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ValueMQTTSerializer.class);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("mqtt");

///HERE THE MAPPING OF TAGS
///HERE THE KERNEL OF THE APP

final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

We assume that a standard MQTT format would be as follows:

measure_name,value,quality

Alternatively, we can add a timestamp when the signal has been acquired:

measure_name,timestamp,value,quality
  1. We can write two functions to extract the measure and the value from the MQTT payload:
public static String getKey(String key, String value) {

String[] values= value.split(",");

if (values.length>1) {
return values[0];
} else {
return value;
}
}

public static Double getValue(String value) {
String[] values= value.split(",");

if (values.length>2) {
return Double.parseDouble(values[2]);
} else if (values.length>1) {
return Double.parseDouble(values[1]);
} else {
return Double.parseDouble(value);
}
}
  1. Now we are ready to write our kernel code:

KStream<Windowed<String>, String> max = source
.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return getKey(key, value);
}
})
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(WINDOW_SIZE)))
.reduce(new Reducer<String>() {
@Override
public String apply(String value1, String value2) {
double v1=getValue(value1);
double v2=getValue(value2);
if ( v1 >= v2)
return value1;
else
return value2;
}
})
.toStream()
.filter(new Predicate<Windowed<String>, String>() {
@Override
public boolean test(Windowed<String> key, String value) {
String measure = tagMapping.get(key.key());
if (measure!=null) {
Double threshold = excursion.get(measure);
if (threshold!=null) {
return getValue(value) > threshold;
}
}
return false;
}

});

WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), WINDOW_SIZE);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

// the output
max.to("excursion", Produced.with(windowedSerde, Serdes.String()));

Using the selectKey function, we extract the key and we group by key using groupByKey. Then we extract only the maximum value in a range of five seconds using the windowedBy function. Finally, we can apply our rule to filter values that are greater than a given threshold. Please notice that our simple rule engine uses two maps to translate the measure to a standard measure and to get the right threshold.

The code is available at https://github.com/PacktPublishing/Hands-On-Industrial-Internet-of-ThingsRuleEngineDemo Java code can be found in  Chapter08/kairos-kafka/src/main/java/org/apache/kafka/streams/iiotbook/mqtt/temperature .
 

Here an example of how to build the two maps:

//transform the real measure to a standard measure
Map<String,String> tagMapping = new HashedMap<>();
tagMapping.put("device0.my.measure.temperature", "temperature");

//rules definition
Map<String,Double> excursion = new HashedMap<>();
excursion.put("temperature", 25.0);

Normally, this information can be stored on the asset registry. In an I-IoT environment, we acquire trillions of measures every day. Each measure has a hard-to-read encoded name, so normally we translate it to a human readable name and apply the algorithm.

For example, the following two measures shown on the left can be translated to the names on the right:

device0.my.measure.temperature  -> temperature of asset MY
device1.X523.MODEL7.TEMP -> temperature of asset X523

We should apply the same rule to each. Rules are more complex than a simple threshold, but we can extend our code easily. To test our example, we have to start Mosquitto, Kafka, and our Rule Engine. We then have to subscribe to the excursion queue:

$ docker run -it iiot-book/kafka-mqtt:latest /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic excursion --from-beginning

Finally, we can publish our measure:

$ mqtt-cli localhost topic/device1 device0.my.measure.temperature,11,GOOD
$ mqtt-cli localhost topic/device1 device0.my.measure.temperature,16,GOOD
$ mqtt-cli localhost topic/device1 device0.my.measure.temperature,26,GOOD
$ mqtt-cli localhost topic/device1 device0.my.measure.temperature,27,GOOD

When the temperature reaches 26 degrees, we receive a message in the excursion queue to notify us.

To complete our example, we should also consider the quality of the signal, ignoring it if it has a BAD or UNKNOWN quality.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset