Setting up Kafka 

The first step in setting up the environment for our big data use case is to establish a Kafka node. Kafka is essentially a FIFO queue, so we will use the simplest single node (broker) setup. Kafka organizes data using topics, producers, consumers, and brokers.

The important Kafka terminologies are as follows:

  • A broker is essentially a node.
  • A producer is a process that writes data to the message queue.
  • A consumer is a process that reads data from the message queue.
  • A topic is the specific queue that we write to and read data from.

A Kafka topic is further subdivided into a number of partitions. We can split data from a particular topic into multiple brokers (nodes), both when we write to the topic and also when we read our data at the other end of the queue.

After installing Kafka on our local machine, or any cloud provider of our choice (there are excellent tutorials for EC2 to be found just a search away), we can create a topic using this single command:

$ kafka-topics  --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic xmr-btc
Created topic "xmr-btc".

This will create a new topic called xmr-btc.

Deleting the topic is similar to creating one, by using this command:

$ kafka-topics --delete --zookeeper localhost:2181 --topic xmr-btc

We can then get a list of all topics by issuing the following command:

$ kafka-topics --list --zookeeper localhost:2181
xmr-btc

We can then create a command-line producer for our topic, just to test that we can send messages to the queue, like this:

$ kafka-console-producer --broker-list localhost:9092 --topic xmr-btc

Data on every line will be sent as a string encoded message to our topic, and we can end the process by sending a SIGINT signal (typically Ctrl + C).

Afterwards we can view the messages that are waiting in our queue by spinning up a consumer:

$ kafka-console-consumer --zookeeper localhost:2181 --topic xmr-btc --from-beginning

This consumer will read all messages in our xmr-btc topic, starting from the beginning of history. This is useful for our test purposes, but we will change this configuration in real-world applications.

You will keep seeing zookeeper, in addition to kafka, mentioned in the commands. Apache Zookeeper comes together with Apache Kafka, and is a centralized service that is used internally by Kafka for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Now we have set up our broker, we can use the code at https://github.com/agiamas/mastering-mongodb/tree/master/chapter_9 to start reading (consuming) and writing (producing) messages to the queue. For our purposes, we are using the ruby-kafka gem, developed by Zendesk.

For simplicity, we are using a single class to read from a file stored on disk and to write to our Kafka queue.

Our produce method will be used to write messages to Kafka as follows:

def produce
options = { converters: :numeric, headers: true }
CSV.foreach('xmr_btc.csv', options) do |row|
json_line = JSON.generate(row.to_hash)
@kafka.deliver_message(json_line, topic: 'xmr-btc')
end
end

Our consume method will read messages from Kafka as follows:

def consume
consumer = @kafka.consumer(group_id: 'xmr-consumers')
consumer.subscribe('xmr-btc', start_from_beginning: true)
trap('TERM') { consumer.stop }
consumer.each_message(automatically_mark_as_processed: false) do |message|
puts message.value
if valid_json?(message.value)
MongoExchangeClient.new.insert(message.value)
consumer.mark_message_as_processed(message)
end
end
consumer.stop
end
Notice that we are using the consumer group API feature (added in Kafka 0.9) to get multiple consumers to access a single topic by assigning each partition to a single consumer. In the event of a consumer failure, its partitions will be reallocated to the remaining members of the group.

The next step is to write these messages to MongoDB, as follows:

  1. First, we create our collection so that our documents expire after one minute. Enter the following in the mongo shell:
> use exchange_data
> db.xmr_btc.createIndex( { "createdAt": 1 }, { expireAfterSeconds: 60 })
{
"createdCollectionAutomatically" : true,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}

This way, we create a new database called exchange_data with a new collection called xmr_btc that has auto-expiration after one minute. For MongoDB to auto-expire documents, we need to provide a field with a datetime value to compare its value against the current server time. In our case, this is the createdAt field.

  1. For our use case, we will use the low-level MongoDB Ruby driver. The code for MongoExchangeClient is as follows:
class MongoExchangeClient
def initialize
@collection = Mongo::Client.new([ '127.0.0.1:27017' ], database: :exchange_data).database[:xmr_btc]
end
def insert(document)
document = JSON.parse(document)
document['createdAt'] = Time.now
@collection.insert_one(document)
end
end

This client connects to our local database, sets the createdAt field for the TTL document expiration, and saves the message to our collection.

With this setup, we can write messages to Kafka, read them at the other end of the queue, and write them into our MongoDB collection.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset