The first step in setting up the environment for our big data use case is to establish a Kafka node. Kafka is essentially a FIFO queue, so we will use the simplest single node (broker) setup. Kafka organizes data using topics, producers, consumers, and brokers.
The important Kafka terminologies are as follows:
- A broker is essentially a node.
- A producer is a process that writes data to the message queue.
- A consumer is a process that reads data from the message queue.
- A topic is the specific queue that we write to and read data from.
A Kafka topic is further subdivided into a number of partitions. We can split data from a particular topic into multiple brokers (nodes), both when we write to the topic and also when we read our data at the other end of the queue.
After installing Kafka on our local machine, or any cloud provider of our choice (there are excellent tutorials for EC2 to be found just a search away), we can create a topic using this single command:
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xmr-btc
Created topic "xmr-btc".
This will create a new topic called xmr-btc.
Deleting the topic is similar to creating one, by using this command:
$ kafka-topics --delete --zookeeper localhost:2181 --topic xmr-btc
We can then get a list of all topics by issuing the following command:
$ kafka-topics --list --zookeeper localhost:2181
xmr-btc
We can then create a command-line producer for our topic, just to test that we can send messages to the queue, like this:
$ kafka-console-producer --broker-list localhost:9092 --topic xmr-btc
Data on every line will be sent as a string encoded message to our topic, and we can end the process by sending a SIGINT signal (typically Ctrl + C).
Afterwards we can view the messages that are waiting in our queue by spinning up a consumer:
$ kafka-console-consumer --zookeeper localhost:2181 --topic xmr-btc --from-beginning
This consumer will read all messages in our xmr-btc topic, starting from the beginning of history. This is useful for our test purposes, but we will change this configuration in real-world applications.
Now we have set up our broker, we can use the code at https://github.com/agiamas/mastering-mongodb/tree/master/chapter_9 to start reading (consuming) and writing (producing) messages to the queue. For our purposes, we are using the ruby-kafka gem, developed by Zendesk.
For simplicity, we are using a single class to read from a file stored on disk and to write to our Kafka queue.
Our produce method will be used to write messages to Kafka as follows:
def produce
options = { converters: :numeric, headers: true }
CSV.foreach('xmr_btc.csv', options) do |row|
json_line = JSON.generate(row.to_hash)
@kafka.deliver_message(json_line, topic: 'xmr-btc')
end
end
Our consume method will read messages from Kafka as follows:
def consume
consumer = @kafka.consumer(group_id: 'xmr-consumers')
consumer.subscribe('xmr-btc', start_from_beginning: true)
trap('TERM') { consumer.stop }
consumer.each_message(automatically_mark_as_processed: false) do |message|
puts message.value
if valid_json?(message.value)
MongoExchangeClient.new.insert(message.value)
consumer.mark_message_as_processed(message)
end
end
consumer.stop
end
The next step is to write these messages to MongoDB, as follows:
- First, we create our collection so that our documents expire after one minute. Enter the following in the mongo shell:
> use exchange_data
> db.xmr_btc.createIndex( { "createdAt": 1 }, { expireAfterSeconds: 60 })
{
"createdCollectionAutomatically" : true,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}
This way, we create a new database called exchange_data with a new collection called xmr_btc that has auto-expiration after one minute. For MongoDB to auto-expire documents, we need to provide a field with a datetime value to compare its value against the current server time. In our case, this is the createdAt field.
- For our use case, we will use the low-level MongoDB Ruby driver. The code for MongoExchangeClient is as follows:
class MongoExchangeClient
def initialize
@collection = Mongo::Client.new([ '127.0.0.1:27017' ], database: :exchange_data).database[:xmr_btc]
end
def insert(document)
document = JSON.parse(document)
document['createdAt'] = Time.now
@collection.insert_one(document)
end
end
This client connects to our local database, sets the createdAt field for the TTL document expiration, and saves the message to our collection.
With this setup, we can write messages to Kafka, read them at the other end of the queue, and write them into our MongoDB collection.