Creating a Kinesis streaming service

In this section, we will see some real-world examples which will produce the streaming data and then will store it in a Kinesis stream. At the same time, we will also see Kinesis consumers that will consume and process the streaming data from Kinesis streams.

Access to AWS Kinesis

The very first step in working with Kinesis streams is getting access to Amazon Web Services (AWS). Please perform the following steps for getting access to AWS:

  1. Open https://aws.amazon.com and click on Create an AWS Account:
    Access to AWS Kinesis
  2. Follow the rest of the instructions as they appear on your screen.

    Note

    For detailed instructions, you can refer to the YouTube video at https://www.youtube.com/watch?v=WviHsoz8yHk.

The sign-up process may involve receiving a phone call and entering the temporary verification pin number.

At the end, you will be provided with the username and password for accessing and managing your AWS account and all its associated services.

Once we complete the sign up process successfully, we will have access to AWS and all its services, including Kinesis.

AWS provides a free tier for some of the services where we are not charged till we reach a certain limit; for example, launching and using micro instances for 750 hours a month is not charged.

Note

Refer to https://aws.amazon.com/free/ for more information on services available in the free tier.

Kinesis is not part of the free tier, so it's chargeable, but the good part is that only we have to pay for what we use. We can start using Kinesis streams for as low as $0.015 per hour.

Note

Refer to https://aws.amazon.com/kinesis/pricing/ for more information on Kinesis pricing.

Assuming that we now have access to AWS, let's move forward and set up our development environment for working with Kinesis streams.

Configuring the development environment

In this section, we will talk about all the essential libraries required for working with Amazon Kinesis, and we will also configure our development environment. This will help us in the development of consumers and producers for our Kinesis streams.

Perform the following steps for configuring your development environment:

  1. Download and install Oracle Java 7 from http://www.oracle.com/technetwork/java/javase/install-linux-self-extracting-138783.html.
  2. Open your Linux console and execute the following command to configure JAVA_HOME as the environment variable:
    export JAVA_HOME=<Path of Java install Dir>
    
  3. Download Eclipse Luna from http://www.eclipse.org/downloads/packages/eclipse-ide-java-ee-developers/lunasr2 and extract it. Let's refer to the directory where we extracted it as ECLIPSE_HOME.
  4. Open Eclipse and follow the instructions provided at http://docs.aws.amazon.com/AWSToolkitEclipse/latest/GettingStartedGuide/tke_setup_install.html for installing the AWS Toolkit for Eclipse.

    You might need to restart your Eclipse instance once the AWS toolkit is installed.

  5. Next, download KCL from AWS's GitHub page (https://github.com/awslabs/amazon-kinesis-client/archive/v1.6.0.zip).
  6. GitHub provides the source files that we need to compile with MVN (https://maven.apache.org/). If you do not want to do that, you can also download the compiled binaries with all dependencies from https://drive.google.com/file/d/0B5oLQERok6YHdnlGb0dWLWZmMmc/view?usp=sharing.
  7. Open your Eclipse instance, create a Java project, and set its name to RealTimeAnalytics-Kinesis.
  8. In your Eclipse project, create a package called chapter.five. Open the Eclipse project properties window and provide the dependencies of KCL, as shown in the following screenshot:
    Configuring the development environment

    The preceding screenshot shows the Eclipse project and its dependencies that need to be added for compiling and running the code for Kinesis streams.

  9. Next, follow the instructions provided at http://docs.aws.amazon.com/AWSToolkitEclipse/latest/GettingStartedGuide/tke_setup_creds.html and configure your Eclipse environment with the AWS access credentials. This is required so that you can directly connect to AWS from Eclipse itself.

That's it! We are done with the configurations. Our development is ready for working with Kinesis streams.

Let's move forward to the next sections where we will create streams, and write producers and consumers for developing a streaming service.

Creating Kinesis streams

In this section, you will learn various ways of creating or hosting Kinesis streams.

Kinesis streams can be created by two different methods: one is using the AWS SDK/toolkit, and the second is directly logging into the AWS and then using the user interface for creating Kinesis streams.

Perform the following steps for creating Kinesis streams using the AWS user interface:

  1. Log into your AWS console and go to https://console.aws.amazon.com/kinesis/home. It will take you the home page for Kinesis streams.
  2. Click on the Create Stream button, as shown on the home page of Kinesis, and you will see the following screen where you have to define the stream configurations:
    Creating Kinesis streams

    The preceding screenshot shows the Kinesis stream configuration page where we need to specify two configurations:

    • Stream Name: This is the user-defined name of the stream. We will refer to this in our producers and consumers, which we will create in the subsequent sections. Let's set its name as StreamingService.
    • Number of Shards: This is the most important configuration where we specify the number of shards. We need to be extra cautious in defining the number of shards as Amazon will charge based on the number of shards we have configured for a stream. We can also click on the Help me decide how many shards I need link and AWS will provide a GUI where we can provide the size of each record and the maximum reads/writes. AWS will calculate and suggest the appropriate number of shards.
  3. Once all configurations are provided, the final step is to click on the Create button provided at the bottom of the screen, as shown in the previous screenshot.

We are done!!! Now sit back and relax, and your stream will be created in a few seconds, which can be further seen on the admin page of the Kinesis stream:

Creating Kinesis streams

The preceding screenshot shows the admin page of Kinesis streams where it lists all the active or inactive Kinesis streams. We can also click on the name of a stream and see the detailed analysis/throughput (read/write/latency) for each stream.

Our streams are ready to consume the data provided by the producers and deliver it to the consumer for further analysis. Let's also see the steps required to create and manage streams using AWS SDK.

Perform the following steps to create Kinesis streams using AWS SDK:

  1. Open your Eclipse project RealTimeAnalytics-Kinesis and create a Java class named ManageKinesisStreams.java within the chapter.five.kinesis.admin package.
  2. Edit ManageKinesisStreams.java and add the following piece of code into the Java class:
    package chapter.five.kinesis.admin;
    
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    import com.amazonaws.services.kinesis.AmazonKinesisClient;
    import com.amazonaws.services.kinesis.model.*;
    
    public class ManageKinesisStreams {
    
      private AmazonKinesisClient kClient;
    
      /**
       * Constructor which initialize the Kinesis Client for working with the 
       * Kinesis streams.
       */
      public ManageKinesisStreams() {
    
        //Initialize the AWSCredentials taken from the profile configured in the Eclipse
        //replace "kinesisCred" with the profile configured in your Eclipse or leave blank to sue default.
        //Ensure that Access and Secret key are present in credentials file
        //Default location of credentials file is $USER_HOME/.aws/credentials
        AWSCredentials cred = new ProfileCredentialsProvider("kinesisCred")
            .getCredentials();
        System.out.println("Access Key = "+ cred.getAWSAccessKeyId());
        System.out.println("Secret Key = " + cred.getAWSSecretKey());
        kClient = new AmazonKinesisClient(cred);
    
      }
    
      /**
       * Create a Kinesis Stream with the given name and the shards.
       * @param streamName - Name of the Stream
       * @param shards - Number of Shards for the given Stream
       */
      public void createKinesisStream(String streamName, int shards) {
    
        System.out.println("Creating new Stream = '"+streamName+"', with Shards = "+shards);
        //Check and create stream only if does not exist.
        if (!checkStreamExists(streamName)) {
          //CreateStreamRequest for creating Kinesis Stream
          CreateStreamRequest createStreamRequest = new CreateStreamRequest();
          createStreamRequest.setStreamName(streamName);
          createStreamRequest.setShardCount(shards);
          kClient.createStream(createStreamRequest);
    
          try {
            //Sleep for 30 seconds so that stream is initialized and created
            Thread.sleep(30000);
    
          } catch (InterruptedException e) {
            //No need to Print Anything
          }
        }
      }
    
      /**
       * Checks and delete a given Kinesis Stream
       *    @param streamName
       *            - Name of the Stream
       */
      public void deleteKinesisStream(String streamName) {
    
        //Check and delete stream only if exists.
        if (checkStreamExists(streamName)) {
          kClient.deleteStream(streamName);
          System.out.println("Deleted the Kinesis Stream = '" + streamName+"'");
          return;
        }
    
        System.out.println("Stream does not exists = " + streamName);
    
      }
    
      /**
       * Utility Method which checks whether a given Kinesis Stream Exists or Not
       * @param streamName - Name of the Stream
       * @return - True in case Stream already exists else False
       */
      public boolean checkStreamExists(String streamName) {
    
        try {
          //DescribeStreamRequest for Describing and checking the 
          //existence of given Kinesis Streams.
          DescribeStreamRequest desc = new DescribeStreamRequest();
          desc.setStreamName(streamName);
          DescribeStreamResult result = kClient.describeStream(desc);
    
          System.out.println("Kinesis Stream '" +streamName+ "' already exists...");
          System.out.println("Status of '"+ streamName + "' = "
              + result.getStreamDescription().getStreamStatus());
    
        } catch (ResourceNotFoundException exception) {
          System.out.println("Stream '"+streamName+"' does Not exists...Need to create One");
          return false;
        }
    
        return true;  }}

The preceding piece of code provides the utility methods for creating and deleting streams. Follow the comments provided in the code itself to understand the functionality and usage of the various methods given. We can also add a main() method directly into the preceding code and invoke any of the given methods for creating/deleting Kinesis streams.

Note

Modification of shards (also known as resharding or merging of shards) is an advanced topic. Please refer to https://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding.html for more information on resharding of streams.

Next, we will create producers that will produce and submit messages to Kinesis streams and we will also create consumers that will consume the messages from Kinesis streams.

Creating Kinesis stream producers

In this section, we will create custom producers using the Kinesis AWS API for producing and submitting messages to the Kinesis streams.

Sample dataset

There are number of free datasets available over the web and we can use any of them for our producers. We will use one such dataset provided by the Research & Development Division of the Chicago Police Department about reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago from 2001 to the present, minus the most recent in the last seven days. For our use case, we will consider the data for 1 month—August 1, 2015 to August 31, 2015—which can be filtered, extracted, and downloaded from https://data.cityofchicago.org/Public-Safety/Crimes-2015/vwwp-7yr9. Alternatively, you can also download the filtered dataset from http://tinyurl.com/qgxjbej. Store the Chicago crimes dataset for August 2015 in a local directory and let's refer to the file as <$CRIME_DATA>.

Note

Please refer to http://tinyurl.com/onul54u to understand the metadata of the crimes dataset.

Use case

Our use case will be to create Kinesis producers which will consume the crimes dataset and submit it to Kinesis streams. In the next section, we will create consumers who will consume the data from Kinesis streams and will generate alerts based on some preconfigured criteria.

Note

In real production scenarios, Kinesis producers can directly connect and consume data from live streams or feeds that publish the crime reports and further submit to Kinesis streams.

Let's perform the following steps for creating producers using APIs provided by the AWS SDK:

  1. Open your Eclipse project RealTimeAnalytics-Kinesis and add a Java class named AWSChicagoCrimesProducers.java within the chapter.five.kinesis.producers package.
  2. Edit AWSChicagoCrimesProducers.java and add following piece of code:
    package chapter.five.kinesis.producers;
    
    import java.io.*;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    import com.amazonaws.services.kinesis.AmazonKinesisClient;
    import com.amazonaws.services.kinesis.model.*;
    
    public class AWSChicagoCrimesProducers{
    
    private AmazonKinesisClient kClient;
      
    //Location of the file from where we need to read the data
    private String filePath="ChicagoCrimes-Aug-2015.csv";
    
    /**
     * Constructor which initialize the Kinesis Client for working with the
     * Kinesis streams.
       */
    public AWSChicagoCrimesProducers() {
    // Initialize the AWSCredentials taken from the profile configured in
    // the Eclipse replace "kinesisCred" with the profile //configured in your Eclipse or leave blank to use default.
    // Ensure that Access and Secret key are present in credentials file
    // Default location of credentials file is $USER_HOME/.aws/credentials
    AWSCredentials cred = new ProfileCredentialsProvider("kinesisCred")
            .getCredentials();
      kClient = new AmazonKinesisClient(cred);
      }
    
      /**
       * Read Each record of the input file and Submit each record to Amazon Kinesis Streams.  
       * @param streamName - Name of the Stream.
       */
      public void readSingleRecordAndSubmit(String streamName) {
    
        String data = "";
    try (BufferedReader br = new BufferedReader(
          new FileReader(new File(filePath)))) {
          //Skipping first line as it has headers;
          br.readLine();
          //Read Complete file - Line by Line
          while ((data = br.readLine()) != null) {
            //Create Record Request
            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(streamName);
            putRecordRequest.setData(ByteBuffer.wrap((data.getBytes())));
            //Data would be partitioned by the IUCR Codes, which is 5 column in the record
            String IUCRcode = (data.split(","))[4]; 
            putRecordRequest.setPartitionKey(IUCRcode);
    //Finally Submit the records with Kinesis Client Object
    System.out.println("Submitting Record = "+data);
    kClient.putRecord(putRecordRequest);
    //Sleep for half a second before we read and submit next record.
        Thread.sleep(500);
      }
    } catch (Exception e) {
      //Print exceptions, in case any
      e.printStackTrace();
        }
    
      }
      
      /**
       * Read Data line by line by Submit to Kinesis Streams in the Batches. 
       * @param streamName - Name of Stream
       * @param batchSize - Batch Size
       */
      public void readAndSubmitBatch(String streamName, int batchSize) {
    
        String data = "";
        try (BufferedReader br = new BufferedReader(
            new FileReader(new File(filePath)))) {
    
        //Skipping first line as it has headers;
        br.readLine();
      //Counter to keep track of size of Batch
      int counter = 0;
      //Collection which will contain the batch of records
          ArrayList<PutRecordsRequestEntry> recordRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
          while ((data = br.readLine()) != null) {
    //Read Data and Create Object of PutRecordsRequestEntry
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
            entry.setData(ByteBuffer.wrap((data.getBytes())));
            //Data would be partitioned by the IUCR Codes, which is 5 column in the record
            String IUCRcode = (data.split(","))[4];
            entry.setPartitionKey(IUCRcode);
            //Add the record the Collection
            recordRequestEntryList.add(entry);
            //Increment the Counter
            counter++;
    
            //Submit Records in case Batch size is reached.
            if (counter == batchSize) {
              PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
              putRecordsRequest.setRecords(recordRequestEntryList);
              putRecordsRequest.setStreamName(streamName);
              //Finally Submit the records with Kinesis Client Object
              System.out.println("Submitting Records = "+recordRequestEntryList.size());
              kClient.putRecords(putRecordsRequest);
              //Reset the collection and Counter/Batch
              recordRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
              counter = 0;
              //Sleep for half a second before processing another batch
              Thread.sleep(500);
            }
          }
        } catch (Exception e) {
          //Print all exceptions
          e.printStackTrace();
        }
      }
    }

This piece of code defines one constructor and two methods. In the constructor, we create a connection to the AWS and then we can use either of the two methods, readSingleRecordAndSubmit(…) or readAndSubmitBatch(…), for reading and posting data to Kinesis streams. The difference between the two methods is that the former reads and submits data line by line, but the latter reads the data and then creates and submits data in batches to the Kinesis streams. We can follow the comments within the code to understand each line of code for AWSChicagoCrimesProducers.

Note

Please refer to http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-sdk.html for more information on the AWS API for creating Kinesis producers.

And we are done with our producers. Now, let's move forward and create consumers that will consume the data published by the Kinesis producers and raise some real-time alerts.

Creating Kinesis stream consumers

In this section, we will create custom consumers using the Kinesis AWS API for consuming the messages from Kinesis streams. Our consumers will consume the messages, but at the same time, consumers will also check for specific crime codes that need special attention and will raise alerts.

Let's perform the following steps for creating consumers using APIs provided by the AWS SDK:

  1. Open the Eclipse project RealTimeAnalytics-Kinesis and add a Java class named AWSChicagoCrimesConsumers.java within the chapter.five.kinesis.consumers package. This consumer will use the AWS API to consume and analyze the data from the Kinesis streams, and then generate alerts whenever they encounter specific crime codes appearing in the messages.
  2. The complete code for AWSChicagoCrimesConsumers.java can be downloaded along with the code samples provided with this book, or you can also download it from https://drive.google.com/folderview?id=0B5oLQERok6YHUlNQdjFxWXF6WWc&usp=sharing.

AWS APIs provide the pull model for receiving the messages from the streams, so our consumers will poll the Kinesis streams every 1 second and pull the crime data from the Kinesis streams.

Note

Refer to http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html for more information on AWS APIs for developing Kinesis consumers.

Generating and consuming crime alerts

In this section, we will discuss the final set of steps required for running Kinesis producers and Kinesis consumers.

Perform the following steps to execute producers and consumers:

  1. Open the Eclipse project RealTimeAnalytics-Kinesis and add MainRunConsumers.java for running consumers and MainRunProducers.java for running producers into the chapter.five.kinesis package.
  2. Next, add the following piece of code into MainRunConsumers.java:
    package chapter.five.kinesis;
    
    import chapter.five.kinesis.admin.ManageKinesisStreams;
    import chapter.five.kinesis.consumers.*;
    
    public class MainRunConsumers {
    
    //This Stream will be used by the producers/consumers using AWS SDK
    public static String streamNameAWS = "AWSStreamingService";
      
      public static void main(String[] args) {
    
        //Using AWS Native API's
        AWSChicagoCrimesConsumers consumers = new AWSChicagoCrimesConsumers();
        consumers.consumeAndAlert(streamNameAWS);
            
    //Enable only when you want to Delete the streams
    ManageKinesisStreams streams = new ManageKinesisStreams();
        //streams.deleteKinesisStream(streamNameAWS);
        //streams.deleteKinesisStream(streamName);
    }}
  3. Add the following piece of code into MainRunProducers.java:
    package chapter.five.kinesis;
    
    import chapter.five.kinesis.admin.ManageKinesisStreams;
    import chapter.five.kinesis.producers.*;
    
    public class MainRunProducers {
    
    //This Stream will be used by the producers/consumers using AWS SDK
      public static String streamNameAWS = "AWSStreamingService";
      
    public static void main(String[] args) {
    ManageKinesisStreams streams = new ManageKinesisStreams();
    streams.createKinesisStream(streamNameAWS, 1);
    //Using AWS Native API's
    AWSChicagoCrimesProducers producers = new AWSChicagoCrimesProducers();
    
    //Read and Submit record by record
    //producers.readSingleRecordAndSubmit(streamName);
    //Submit the records in Batches of 10
    producers.readAndSubmitBatch(streamNameAWS, 10);
      
      //Enable only when you want to Delete the streams
      //streams.deleteKinesisStream(streamNameAWS);
      }}
  4. From Eclipse, execute MainRunProducers.java and you will see log messages appearing on the console, which will be similar to the following screenshot:
    Generating and consuming crime alerts
  5. Next, we will execute MainRunConsumers.java from Eclipse. As soon as you run the consumer, you will see the consumer will start receiving the messages and will log the same on the console, which will be similar to the following screenshot:
    Generating and consuming crime alerts

We can also enhance our consumers to store all these messages in an RDBMS or NoSQL, which can be further used for performing deep analytics. There are endless possibilities once you have scalable architecture for capturing real-time data feeds.

Note

Remember to clean up, that is, delete, the Kinesis streams and Dynamo tables as soon as you are finished with the processing of the feeds, or you can also invoke the ManageKinesisStreams.deleteKinesisStream(…) method to delete the streams and Dynamo tables.

Kinesis producers and consumers can also be developed using KPL (http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) and KCL (http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html). These libraries are high-level APIs that internally leverage AWS APIs and provide proven design patterns that not only help in expediting the overall development but also help in the development of a robust and stable architecture.

We have skipped the development of producers and consumers, but the sample code provided with this book contains producers and consumers using KPL and KCL. Refer to chapter.five.kinesis.prducers.KPLChicagoCrimesProducers.java and chapter.five.kinesis.consumers.KCLChicagoCrimesConsumers.java for producers and consumers within the provided samples.

Note

KPL requires the dependencies for SLF4J (http://www.slf4j.org/) and Commons IO (https://commons.apache.org/proper/commons-io/). For KCL, we need Amazon Kinesis Client 1.6.0 (https://github.com/awslabs/amazon-kinesis-client). Apart from the individual websites, these libraries can be downloaded from https://drive.google.com/open?id=0B5oLQERok6YHTWs5dEJUaHVDNU0.

In this section, we have created a Kinesis streaming service that accepts the Chicago crime records as they are available in the given file, and at the same time the consumers also consume the records and generate alerts for specific crime codes as they appear in the data received from the Kinesis streams.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset