Using change streams

To use a change stream, we need to connect to our replica set. A replica set is a prerequisite to using change streams. As change streams internally use the oplog, it's not possible to work without it. Change streams will also output documents that won't be rolled back in a replica set setting, so they need to follow a majority read concern. Either way, it's a good practice to develop and test locally using a replica set, as this is the recommended deployment for production. As an example, we are going to use a signals collection within our database named streams.

We will use the following sample Python code:

from pymongo import MongoClient

class MongoExamples:
def __init__(self):
self.client = MongoClient('localhost', 27017)
db = self.client.streams
self.signals = db.signals
# a basic watch on signals collection
def change_books(self):
with self.client.watch() as stream:
for change in stream:
print(change)
def main():
MongoExamples().change_books()
if __name__ == '__main__':
main()

We can open one Terminal and run it using python change_streams.py.

Then, in another Terminal, we connect to our MongoDB replica set using the following code:

> mongo
> use streams
> db.signals.insert({value: 114.3, signal:1})

Going back to our first Terminal window, we can now observe that the output is similar to the following code block:

{'_id': {'_data': '825BB7A25E0000000129295A1004A34408FB07864F8F960BF14453DFB98546645F696400645BB7A25EE10ED33145BCF7A70004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1538761310, 1), 'fullDocument': {'_id': ObjectId('5bb7a25ee10ed33145bcf7a7'), 'value': 114.3, 'signal': 1.0}, 'ns': {'db': 'streams', 'coll': 'signals'}, 'documentKey': {'_id': ObjectId('5bb7a25ee10ed33145bcf7a7')}}

What has happened here is that we have opened a cursor watching the entire streams database for changes. Every data update in our database will be logged and outputted in the console.

For example, if we go back to the mongo shell, we can issue the following code:

> db.a_random_collection.insert({test: 'bar'})

The Python code output should then be similar to the following code:

{'_id': {'_data': '825BB7A3770000000229295A10044AB37F707D104634B646CC5810A40EF246645F696400645BB7A377E10ED33145BCF7A80004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1538761591, 2), 'fullDocument': {'_id': ObjectId('5bb7a377e10ed33145bcf7a8'), 'test': 'bar'}, 'ns': {'db': 'streams', 'coll': 'a_random_collection'}, 'documentKey': {'_id': ObjectId('5bb7a377e10ed33145bcf7a8')}}

This means that we are getting notifications for each and every data update across all collections in our database.

We can then change line 11 of our code to the following:

> with self.signals.watch() as stream:

This will result in only watching the signals collection, as should be the most common use case.

PyMongo's watch command can take several parameters, as follows:

watch(pipeline=None, full_document='default', resume_after=None, max_await_time_ms=None, batch_size=None, collation=None, start_at_operation_time=None, session=None)

The most important parameters are as follows:

  • Pipeline: This is an optional parameter that we can use to define an aggregation pipeline to be executed on each document that matches watch(). Because the change stream itself uses the aggregation pipeline, we can attach events to it. The aggregation pipeline events we can use are as follows:
$match
$project
$addFields
$replaceRoot
$redact
  • Full_document: This is an optional parameter that we can use by setting it to 'updateLookup' to make change streams return both a delta describing the changes to the document, and a copy of the entire document that was changed from some time after the change occurred in the case of a partial update.
  • Start_at_operation_time: This is an optional parameter that we can use to only watch for changes that occurred at, or after, the specified timestamp.
  • Session: This is an optional parameter in case our driver supports passing a ClientSession object to watch for updates.

Change streams response documents have to be under 16 MB in size. This is a global limit in MongoDB for BSON documents and the change stream has to follow this rule.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset