In this section, we will see some real-world examples which will produce the streaming data and then will store it in a Kinesis stream. At the same time, we will also see Kinesis consumers that will consume and process the streaming data from Kinesis streams.
The very first step in working with Kinesis streams is getting access to Amazon Web Services (AWS). Please perform the following steps for getting access to AWS:
For detailed instructions, you can refer to the YouTube video at https://www.youtube.com/watch?v=WviHsoz8yHk.
The sign-up process may involve receiving a phone call and entering the temporary verification pin number.
At the end, you will be provided with the username and password for accessing and managing your AWS account and all its associated services.
Once we complete the sign up process successfully, we will have access to AWS and all its services, including Kinesis.
AWS provides a free tier for some of the services where we are not charged till we reach a certain limit; for example, launching and using micro instances for 750 hours a month is not charged.
Refer to https://aws.amazon.com/free/ for more information on services available in the free tier.
Kinesis is not part of the free tier, so it's chargeable, but the good part is that only we have to pay for what we use. We can start using Kinesis streams for as low as $0.015 per hour.
Refer to https://aws.amazon.com/kinesis/pricing/ for more information on Kinesis pricing.
Assuming that we now have access to AWS, let's move forward and set up our development environment for working with Kinesis streams.
In this section, we will talk about all the essential libraries required for working with Amazon Kinesis, and we will also configure our development environment. This will help us in the development of consumers and producers for our Kinesis streams.
Perform the following steps for configuring your development environment:
JAVA_HOME
as the environment variable:export JAVA_HOME=<Path of Java install Dir>
ECLIPSE_HOME
.You might need to restart your Eclipse instance once the AWS toolkit is installed.
RealTimeAnalytics-Kinesis
.chapter.five
. Open the Eclipse project properties window and provide the dependencies of KCL, as shown in the following screenshot:The preceding screenshot shows the Eclipse project and its dependencies that need to be added for compiling and running the code for Kinesis streams.
That's it! We are done with the configurations. Our development is ready for working with Kinesis streams.
Let's move forward to the next sections where we will create streams, and write producers and consumers for developing a streaming service.
In this section, you will learn various ways of creating or hosting Kinesis streams.
Kinesis streams can be created by two different methods: one is using the AWS SDK/toolkit, and the second is directly logging into the AWS and then using the user interface for creating Kinesis streams.
Perform the following steps for creating Kinesis streams using the AWS user interface:
The preceding screenshot shows the Kinesis stream configuration page where we need to specify two configurations:
StreamingService
.We are done!!! Now sit back and relax, and your stream will be created in a few seconds, which can be further seen on the admin page of the Kinesis stream:
The preceding screenshot shows the admin page of Kinesis streams where it lists all the active or inactive Kinesis streams. We can also click on the name of a stream and see the detailed analysis/throughput (read/write/latency) for each stream.
Our streams are ready to consume the data provided by the producers and deliver it to the consumer for further analysis. Let's also see the steps required to create and manage streams using AWS SDK.
Perform the following steps to create Kinesis streams using AWS SDK:
RealTimeAnalytics-Kinesis
and create a Java class named ManageKinesisStreams.java
within the chapter.five.kinesis.admin
package.ManageKinesisStreams.java
and add the following piece of code into the Java class:package chapter.five.kinesis.admin; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.*; public class ManageKinesisStreams { private AmazonKinesisClient kClient; /** * Constructor which initialize the Kinesis Client for working with the * Kinesis streams. */ public ManageKinesisStreams() { //Initialize the AWSCredentials taken from the profile configured in the Eclipse //replace "kinesisCred" with the profile configured in your Eclipse or leave blank to sue default. //Ensure that Access and Secret key are present in credentials file //Default location of credentials file is $USER_HOME/.aws/credentials AWSCredentials cred = new ProfileCredentialsProvider("kinesisCred") .getCredentials(); System.out.println("Access Key = "+ cred.getAWSAccessKeyId()); System.out.println("Secret Key = " + cred.getAWSSecretKey()); kClient = new AmazonKinesisClient(cred); } /** * Create a Kinesis Stream with the given name and the shards. * @param streamName - Name of the Stream * @param shards - Number of Shards for the given Stream */ public void createKinesisStream(String streamName, int shards) { System.out.println("Creating new Stream = '"+streamName+"', with Shards = "+shards); //Check and create stream only if does not exist. if (!checkStreamExists(streamName)) { //CreateStreamRequest for creating Kinesis Stream CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName(streamName); createStreamRequest.setShardCount(shards); kClient.createStream(createStreamRequest); try { //Sleep for 30 seconds so that stream is initialized and created Thread.sleep(30000); } catch (InterruptedException e) { //No need to Print Anything } } } /** * Checks and delete a given Kinesis Stream * @param streamName * - Name of the Stream */ public void deleteKinesisStream(String streamName) { //Check and delete stream only if exists. if (checkStreamExists(streamName)) { kClient.deleteStream(streamName); System.out.println("Deleted the Kinesis Stream = '" + streamName+"'"); return; } System.out.println("Stream does not exists = " + streamName); } /** * Utility Method which checks whether a given Kinesis Stream Exists or Not * @param streamName - Name of the Stream * @return - True in case Stream already exists else False */ public boolean checkStreamExists(String streamName) { try { //DescribeStreamRequest for Describing and checking the //existence of given Kinesis Streams. DescribeStreamRequest desc = new DescribeStreamRequest(); desc.setStreamName(streamName); DescribeStreamResult result = kClient.describeStream(desc); System.out.println("Kinesis Stream '" +streamName+ "' already exists..."); System.out.println("Status of '"+ streamName + "' = " + result.getStreamDescription().getStreamStatus()); } catch (ResourceNotFoundException exception) { System.out.println("Stream '"+streamName+"' does Not exists...Need to create One"); return false; } return true; }}
The preceding piece of code provides the utility methods for creating and deleting streams. Follow the comments provided in the code itself to understand the functionality and usage of the various methods given. We can also add a main()
method directly into the preceding code and invoke any of the given methods for creating/deleting Kinesis streams.
Modification of shards (also known as resharding or merging of shards) is an advanced topic. Please refer to https://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding.html for more information on resharding of streams.
Next, we will create producers that will produce and submit messages to Kinesis streams and we will also create consumers that will consume the messages from Kinesis streams.
In this section, we will create custom producers using the Kinesis AWS API for producing and submitting messages to the Kinesis streams.
Sample dataset
There are number of free datasets available over the web and we can use any of them for our producers. We will use one such dataset provided by the Research & Development Division of the Chicago Police Department about reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago from 2001 to the present, minus the most recent in the last seven days. For our use case, we will consider the data for 1 month—August 1, 2015 to August 31, 2015—which can be filtered, extracted, and downloaded from https://data.cityofchicago.org/Public-Safety/Crimes-2015/vwwp-7yr9. Alternatively, you can also download the filtered dataset from http://tinyurl.com/qgxjbej. Store the Chicago crimes dataset for August 2015 in a local directory and let's refer to the file as <$CRIME_DATA>
.
Please refer to http://tinyurl.com/onul54u to understand the metadata of the crimes dataset.
Use case
Our use case will be to create Kinesis producers which will consume the crimes dataset and submit it to Kinesis streams. In the next section, we will create consumers who will consume the data from Kinesis streams and will generate alerts based on some preconfigured criteria.
Let's perform the following steps for creating producers using APIs provided by the AWS SDK:
RealTimeAnalytics-Kinesis
and add a Java class named AWSChicagoCrimesProducers.java
within the chapter.five.kinesis.producers
package.AWSChicagoCrimesProducers.java
and add following piece of code:package chapter.five.kinesis.producers; import java.io.*; import java.nio.ByteBuffer; import java.util.ArrayList; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.*; public class AWSChicagoCrimesProducers{ private AmazonKinesisClient kClient; //Location of the file from where we need to read the data private String filePath="ChicagoCrimes-Aug-2015.csv"; /** * Constructor which initialize the Kinesis Client for working with the * Kinesis streams. */ public AWSChicagoCrimesProducers() { // Initialize the AWSCredentials taken from the profile configured in // the Eclipse replace "kinesisCred" with the profile //configured in your Eclipse or leave blank to use default. // Ensure that Access and Secret key are present in credentials file // Default location of credentials file is $USER_HOME/.aws/credentials AWSCredentials cred = new ProfileCredentialsProvider("kinesisCred") .getCredentials(); kClient = new AmazonKinesisClient(cred); } /** * Read Each record of the input file and Submit each record to Amazon Kinesis Streams. * @param streamName - Name of the Stream. */ public void readSingleRecordAndSubmit(String streamName) { String data = ""; try (BufferedReader br = new BufferedReader( new FileReader(new File(filePath)))) { //Skipping first line as it has headers; br.readLine(); //Read Complete file - Line by Line while ((data = br.readLine()) != null) { //Create Record Request PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setData(ByteBuffer.wrap((data.getBytes()))); //Data would be partitioned by the IUCR Codes, which is 5 column in the record String IUCRcode = (data.split(","))[4]; putRecordRequest.setPartitionKey(IUCRcode); //Finally Submit the records with Kinesis Client Object System.out.println("Submitting Record = "+data); kClient.putRecord(putRecordRequest); //Sleep for half a second before we read and submit next record. Thread.sleep(500); } } catch (Exception e) { //Print exceptions, in case any e.printStackTrace(); } } /** * Read Data line by line by Submit to Kinesis Streams in the Batches. * @param streamName - Name of Stream * @param batchSize - Batch Size */ public void readAndSubmitBatch(String streamName, int batchSize) { String data = ""; try (BufferedReader br = new BufferedReader( new FileReader(new File(filePath)))) { //Skipping first line as it has headers; br.readLine(); //Counter to keep track of size of Batch int counter = 0; //Collection which will contain the batch of records ArrayList<PutRecordsRequestEntry> recordRequestEntryList = new ArrayList<PutRecordsRequestEntry>(); while ((data = br.readLine()) != null) { //Read Data and Create Object of PutRecordsRequestEntry PutRecordsRequestEntry entry = new PutRecordsRequestEntry(); entry.setData(ByteBuffer.wrap((data.getBytes()))); //Data would be partitioned by the IUCR Codes, which is 5 column in the record String IUCRcode = (data.split(","))[4]; entry.setPartitionKey(IUCRcode); //Add the record the Collection recordRequestEntryList.add(entry); //Increment the Counter counter++; //Submit Records in case Batch size is reached. if (counter == batchSize) { PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setRecords(recordRequestEntryList); putRecordsRequest.setStreamName(streamName); //Finally Submit the records with Kinesis Client Object System.out.println("Submitting Records = "+recordRequestEntryList.size()); kClient.putRecords(putRecordsRequest); //Reset the collection and Counter/Batch recordRequestEntryList = new ArrayList<PutRecordsRequestEntry>(); counter = 0; //Sleep for half a second before processing another batch Thread.sleep(500); } } } catch (Exception e) { //Print all exceptions e.printStackTrace(); } } }
This piece of code defines one constructor and two methods. In the constructor, we create a connection to the AWS and then we can use either of the two methods, readSingleRecordAndSubmit(…)
or readAndSubmitBatch(…)
, for reading and posting data to Kinesis streams. The difference between the two methods is that the former reads and submits data line by line, but the latter reads the data and then creates and submits data in batches to the Kinesis streams. We can follow the comments within the code to understand each line of code for AWSChicagoCrimesProducers
.
Please refer to http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-sdk.html for more information on the AWS API for creating Kinesis producers.
And we are done with our producers. Now, let's move forward and create consumers that will consume the data published by the Kinesis producers and raise some real-time alerts.
In this section, we will create custom consumers using the Kinesis AWS API for consuming the messages from Kinesis streams. Our consumers will consume the messages, but at the same time, consumers will also check for specific crime codes that need special attention and will raise alerts.
Let's perform the following steps for creating consumers using APIs provided by the AWS SDK:
RealTimeAnalytics-Kinesis
and add a Java class named AWSChicagoCrimesConsumers.java
within the chapter.five.kinesis.consumers
package. This consumer will use the AWS API to consume and analyze the data from the Kinesis streams, and then generate alerts whenever they encounter specific crime codes appearing in the messages.AWSChicagoCrimesConsumers.java
can be downloaded along with the code samples provided with this book, or you can also download it from https://drive.google.com/folderview?id=0B5oLQERok6YHUlNQdjFxWXF6WWc&usp=sharing.AWS APIs provide the pull model for receiving the messages from the streams, so our consumers will poll the Kinesis streams every 1 second and pull the crime data from the Kinesis streams.
Refer to http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html for more information on AWS APIs for developing Kinesis consumers.
In this section, we will discuss the final set of steps required for running Kinesis producers and Kinesis consumers.
Perform the following steps to execute producers and consumers:
RealTimeAnalytics-Kinesis
and add MainRunConsumers.java
for running consumers and MainRunProducers.java
for running producers into the chapter.five.kinesis
package.MainRunConsumers.java
:package chapter.five.kinesis; import chapter.five.kinesis.admin.ManageKinesisStreams; import chapter.five.kinesis.consumers.*; public class MainRunConsumers { //This Stream will be used by the producers/consumers using AWS SDK public static String streamNameAWS = "AWSStreamingService"; public static void main(String[] args) { //Using AWS Native API's AWSChicagoCrimesConsumers consumers = new AWSChicagoCrimesConsumers(); consumers.consumeAndAlert(streamNameAWS); //Enable only when you want to Delete the streams ManageKinesisStreams streams = new ManageKinesisStreams(); //streams.deleteKinesisStream(streamNameAWS); //streams.deleteKinesisStream(streamName); }}
MainRunProducers.java
:package chapter.five.kinesis; import chapter.five.kinesis.admin.ManageKinesisStreams; import chapter.five.kinesis.producers.*; public class MainRunProducers { //This Stream will be used by the producers/consumers using AWS SDK public static String streamNameAWS = "AWSStreamingService"; public static void main(String[] args) { ManageKinesisStreams streams = new ManageKinesisStreams(); streams.createKinesisStream(streamNameAWS, 1); //Using AWS Native API's AWSChicagoCrimesProducers producers = new AWSChicagoCrimesProducers(); //Read and Submit record by record //producers.readSingleRecordAndSubmit(streamName); //Submit the records in Batches of 10 producers.readAndSubmitBatch(streamNameAWS, 10); //Enable only when you want to Delete the streams //streams.deleteKinesisStream(streamNameAWS); }}
MainRunProducers.java
and you will see log messages appearing on the console, which will be similar to the following screenshot:MainRunConsumers.java
from Eclipse. As soon as you run the consumer, you will see the consumer will start receiving the messages and will log the same on the console, which will be similar to the following screenshot:We can also enhance our consumers to store all these messages in an RDBMS or NoSQL, which can be further used for performing deep analytics. There are endless possibilities once you have scalable architecture for capturing real-time data feeds.
Kinesis producers and consumers can also be developed using KPL (http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) and KCL (http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html). These libraries are high-level APIs that internally leverage AWS APIs and provide proven design patterns that not only help in expediting the overall development but also help in the development of a robust and stable architecture.
We have skipped the development of producers and consumers, but the sample code provided with this book contains producers and consumers using KPL and KCL. Refer to chapter.five.kinesis.prducers.KPLChicagoCrimesProducers.java
and chapter.five.kinesis.consumers.KCLChicagoCrimesConsumers.java
for producers and consumers within the provided samples.
KPL requires the dependencies for SLF4J (http://www.slf4j.org/) and Commons IO (https://commons.apache.org/proper/commons-io/). For KCL, we need Amazon Kinesis Client 1.6.0 (https://github.com/awslabs/amazon-kinesis-client). Apart from the individual websites, these libraries can be downloaded from https://drive.google.com/open?id=0B5oLQERok6YHTWs5dEJUaHVDNU0.
In this section, we have created a Kinesis streaming service that accepts the Chicago crime records as they are available in the given file, and at the same time the consumers also consume the records and generate alerts for specific crime codes as they appear in the data received from the Kinesis streams.