In Chapter 6, Multi-Document ACID Transactions, we worked through two use cases of new transactional capabilities using the code for Ruby and Python. In this chapter, we will dive deeper into the aggregation framework, learning how it can be useful. Additionally, we will look at the aggregation operators that are supported by MongoDB.
Following that, we will learn about the time series collections, which were introduced in version 5 and greatly expanded in version 6. Next, we will learn about MongoDB views, which are similar to traditional database materialized views.
Finally, we will discuss the most major aggregation pipeline limitations and bring everything together with a sample aggregation use case.
To learn this information, we will use aggregations to process transaction data from the Ethereum blockchain. The complete source code is available at https://github.com/PacktPublishing/Mastering-MongoDB-6.x.
In this chapter, we will cover the following topics:
To follow the examples in this chapter, we will need to install MongoDB, the official PyMongo Python driver v.4.1, and import the data files from chapter 7 of the book’s GitHub repository.
The aggregation framework was introduced by MongoDB in version 2.2 (version 2.1 in the development branch). It serves as an alternative to both the MapReduce framework, which is deprecated as of version 5.0, and querying the database directly.
Using the aggregation framework, we can perform GROUP BY operations in the server. Therefore, we can project only the fields that are needed in the result set. Using the $match and $project operators, we can reduce the amount of data passed through the pipeline, resulting in faster data processing.
Self-joins—that is, joining data within the same collection—can also be performed using the aggregation framework, as we will see in our use case.
When comparing the aggregation framework to simply using the queries available via the shell or various other drivers, it is important to remember that there is a use case for both.
For selection and projection queries, it’s almost always better to use simple queries, as the complexity of developing, testing, and deploying an aggregation framework operation cannot easily outweigh the simplicity of using built-in commands. Finding documents with ( db.books.find({price: 50} {price: 1, name: 1}) ), or without ( db.books.find({price: 50}) ) projecting only some of the fields, is simple and fast enough to not warrant the usage of the aggregation framework.
On the other hand, if we want to perform GROUP BY and self-join operations using MongoDB, there might be a case for using the aggregation framework. The most important limitation of the group() command in the MongoDB shell is that the resulting set has to fit in a document, meaning that it can’t be more than 16 MB in size. In addition, the result of any group() command can’t have more than 20,000 results.
Finally, there is always the option of using the database as data storage and performing complex operations using the application. Sometimes, this can be quick to develop but should be avoided, as it will most likely incur memory, networking, and ultimately performance costs down the road.
In the next section, we will describe the available operators before using them in a real use case.
Usually, the use case for aggregation is grouping together attribute values from multiple documents or returning a single result after processing a set of documents.
We can also use the aggregation framework to track how data changes over time in a collection.
One example would be calculating the average value for an attribute across all documents of a collection.
We can use either one or more aggregation pipelines, forming a Directed Acyclic Graph (DAG) for the calculations or a single-purpose aggregation method for simple one-off calculations.
In earlier versions, we could also use Map Reduce, but as of MongoDB 5.0, the Map-reduce framework has been deprecated and, as such, should not be used for new development.
Existing map reduce code can be rewritten using an aggregation pipeline in multiple stages. The $accumulator and $function aggregator operators that were introduced in MongoDB 4.4 can be used to define custom aggregation expressions using JavaScript. This should be useful in the somewhat specific case where our mapper and reducer contain custom code that cannot be expressed using an aggregation pipeline. Writing custom code opens up the possibility of code injection vulnerabilities, and this should be weighed as a security risk.
MongoDB provides three helper aggregation methods that can be used directly from the shell or official drivers to avoid writing code where we want to perform count or distinct operations.
They are estimatedDocumentCount(), count(), and distinct():
Finally, using estimatedDocumentCount() in sharded clusters might not filter out orphaned documents.
Note
The estimatedDocumentCount() method refers to the new mongo shell, mongosh. There might be identical method names in Node.js or other language drivers. Check the documentation, and use the guideline that the older the library is, the greater the odds are that the method is referring to a different definition of estimated count than the previously mentioned calculation that mongosh and compatible drivers use as of MongoDB 5.
Single-purpose aggregation methods can be used as helper methods but should not be, generally, used as a replacement. In the next section, we will learn about the more advanced aggregation operators using pipelines and the recommended usage by MongoDB.
In this section, we will learn how to use aggregation operators. Aggregation operators are divided into two categories. Within each stage, we use expression operators to compare and process values. Between different stages, we use aggregation stage operators to define the data that will get passed on from one stage to the next, as it is considered to be in the same format.
An aggregation pipeline is composed of different stages. These stages are declared in an array and executed sequentially, with the output of every stage being the input of the next one.
The last stage of the pipeline has to be either an $out operator or a $merge operator.
The $out stage has to be the final stage in an aggregation pipeline, outputting data to an output collection that will be completely erased and overwritten if it already exists. The $out operator cannot output to a sharded collection.
Additionally, the $merge stage has to be the final stage in an aggregation pipeline, but it can insert, merge, replace, or keep existing documents. It can also process the documents using a custom update pipeline. It can replace all the documents in a collection but only if the aggregation results output matches all of the existing documents in the collection. The $merge operator can also output to a sharded collection.
At most, the $out and $merge (and $geoNear) stages can only appear once in an aggregation pipeline.
The following list describes the most important aggregation pipeline stages:
Within every stage, we can define one or more expression operators to apply our intermediate calculations. This section will focus on these expression operators.
The Boolean operators are used to pass a true or false value to the next stage of our aggregation pipeline.
We can choose to pass the originating integer, string, or any other type value along, too.
We can use the $and, $or, and $not operators in the same way that we would use them in any programming language.
Note
A Boolean expression will evaluate as false any null, 0, or undefined values.
Comparison operators can be used in conjunction with Boolean operators to construct the expressions that we need to evaluate as true/false for the output of our pipeline’s stage.
The most commonly used operators are listed as follows:
All of these previously mentioned operators return a Boolean value of true or false.
The only operator that doesn’t return a Boolean value is $cmp, which returns 0 if the two arguments are equivalent, 1 if the first value is greater than the second, and -1 if the second value is greater than the first.
As with most programming languages, set operations ignore duplicate entries and the order of elements, treating them as sets. The order of results is unspecified, and duplicates will be deduplicated in the result set. Set expressions do not apply recursively to elements of the set, but only to the top level. This means that if a set contains, for example, a nested array, then this array may or may not contain duplicates.
The available set operators are listed as follows:
The available array operators are listed as follows:
The date operators are used to extract date information from date fields when we want to calculate statistics based on the day of the week, month, or year using the pipeline:
Note
We can also use the $add and $subtract arithmetic operators with dates.
Here, $add will add a date and a number in milliseconds. Additionally, $subtract with two date arguments will return the difference in milliseconds. When subtracting a number (again, in milliseconds) from a date, the date argument must go first.
$literal: We use this operator to pass a value through the pipeline without parsing it. One example of usage would be a string such as $sort that we need to pass along without interpreting it as a field path.
Miscellaneous operators are various general-purpose operators, as follows:
The following operators help in working with objects:
Like date operators, string operators are used when we want to transform our data from one stage of the pipeline to the next. Potential use cases include preprocessing text fields to extract relevant information to be used in later stages of a pipeline:
The equivalent methods for code points (a value in Unicode, regardless of the underlying bytes in its representation) are listed as follows:
We can use a text expression operator to get document-level metadata information such as its textScore, indexKey or searchScore, and searchHighlights snippets when using MongoDB Atlas Search.
Timestamp expression operators will return values from a timestamp object. Timestamp objects are internally used by MongoDB, and it’s recommended that developers use Date() instead. Timestamp objects resolve to second precision, with an incrementing ordinal (starting from 1) for multiple objects with the same value:
Trigonometry expressions help us to perform trigonometric operations on numbers.
The two systems we can convert between are degrees and radians.
Degrees measure an angle in terms of how far we have tilted from our original direction. If we look east and then turn our heads west, we need to tilt our heads by 180 degrees. A full circle has 360 degrees, so if we tilt our head by 360 degrees (or any integer multiple, e.g., 720 or 1,080 degrees), then we will still end up looking east.
A radian measures the angle by the distance traveled. A circle has 2pi (2π) radians, making the 180-degree tilt π radians.
One π is the irrational number 3.14159..., meaning that 1 radian is 180/3.14159 ~= 57.2958... degrees.
Angle values are always input or output in radians.
We can use $degreesToRadians and $radiansToDegrees to convert between the two systems:
Type expression operators in the aggregation framework allow us to inspect and convert between different types:
$toBool: This will convert a value into its Boolean representation or null if the input value is null.
It’s a wrapper around the { $convert: { input: <expression>, to: "bool" } } expression.
It’s a wrapper around the { $convert: { input: <expression>, to: "date" } } expression.
It’s a wrapper around the { $convert: { input: <expression>, to: "decimal" } } expression.
It’s a wrapper around the { $convert: { input: <expression>, to: "double" } } expression.
It’s a wrapper around the { $convert: { input: <expression>, to: "int" } } expression.
It’s a wrapper around the { $convert: { input: <expression>, to: "long" } } expression.
Note
When converting to a numerical value using any of the relevant $to<> expressions mentioned earlier, a string input has to follow the base 10 notation; for example, “1337” is acceptable, whereas its hex representation, “0x539,” will result in an error.
The input string can’t be a float or decimal.
When converting to a numerical value using any of the relevant $to<> expressions mentioned earlier, a date input will output as a result the number of milliseconds since its epoch time. For example, this could be the Unix epoch time, at the beginning of January 1st, 1970, for the ISODate input.
During each stage of the pipeline, we can apply one or more arithmetic operators to perform intermediate calculations. These operators are shown in the following list:
Accumulators are probably the most widely used operators. That’s because they allow us to sum, average, get standard deviation statistics, and perform other operations in each member of our group. The following is a list of aggregation accumulators:
These accumulators are available in the group or project pipeline phases except where otherwise noted.
Some operators are available in stages other than the $group stage, sometimes with different semantics. The following accumulators are stateless in other stages and can take one or multiple arguments as input.
These operators are $avg, $max, $min, $stdDevPop, $stdDevSamp, and $sum, and they are covered in the relevant sections of this chapter.
Expressions can be used to output different data to the next stage in our pipeline based on Boolean truth tests:
$cond
The $cond phrase will evaluate an expression of the if...then...else format, and depending on the result of the if statement, it will return the value of the then statement or else branches. The input can be either three named parameters or three expressions in an ordered list:
$ifNull
The $ifNull phrase will evaluate an expression and return the first expression if it is not null or the second expression if the first expression is null. Null can be either a missing field or a field with an undefined value:
$switch
Similar to a programming language’s switch statement, $switch will execute a specified expression when it evaluates to true and breaks out of the control flow.
We use custom aggregation expression operators to add our own bespoke JavaScript functions to implement custom behavior that’s not covered by built-in operators:
Note
To use these operators, we need to enable server-side scripting. In general, enabling server-side scripting is not recommended by MongoDB due to performance and security concerns.
Implementing custom functions in JavaScript in this context comes with a set of risks around performance and should only be used as a last resort.
We can get the size of the data elements using the following operators.
We can use a variable expression operator to define a variable within a subexpression in the aggregation stage using the $let operator. The $let operator defines variables for use within the scope of a subexpression and returns the result of the subexpression. It accepts named parameters and any number of argument expressions.
We can define a span of documents from a collection by using an expression in the partitionBy field during the $setWindowFields aggregation stage.
Partitioning our collection’s documents in spans will create a “window” into our data, resembling window functions in relational databases:
Introduced in MongoDB 4.0, type conversion operators allow us to convert a value into a specified type. The generic syntax of the command is as follows:
{
{ input: <expression>,
to: <type expression>,
onError: <expression>, // Optional.
onNull: <expression> // Optional.
}
}
In the preceding syntax, input and to (the only mandatory arguments) can be any valid expression. For example, in its simplest form, we could have the following:
$convert: { input: "true", to: "bool" }
This converts a string with the value of true into the Boolean value of true.
Again, the onError phrase can be any valid expression and specifies the value that MongoDB will return if it encounters an error during conversion, including unsupported type conversions. Its default behavior is to throw an error and stop processing.
Additionally, the onNull phrase can be any valid expression and specifies the value that MongoDB will return if the input is null or missing. The default behavior is to return null.
MongoDB also provides some helper functions for the most common $convert operations. These functions are listed as follows:
These are even simpler to use. We could rewrite the previous example as the following:
{ $toBool: "true" }
After going through the extensive list of aggregation operators, in the next section, we will learn about a new feature of MongoDB 5: time series collections.
A time series collection is a special type of collection that is used to collect data measurements over a period of time.
For example, time series collection use cases can include storing Internet of Things (IoT) sensor readings, weather readings, and stock price data.
A time series collection needs to be created as such, and we cannot change a collection type into a time series one. Migrating data from a generic purpose collection to a time series one can be done using a custom script or MongoDB’s own Kafka connector for performance and stability.
To create a time series collection, we need to specify the following fields. In this context, a data point might refer to a sensor reading or the stock price at a specific point in time:
Time series collections create and use an internal index that is not listed in the listIndexes command. However, we can manually create additional (namely, secondary) indexes if needed. MongoDB 6.0 has expanded the number of available secondary indexes, including compound indexes, partial indexes with $in, $or, or $geoWithin, and 2dsphere indexes.
Usually, time series collections use cases refer to data that we insert once, read many times, but rarely get updated. We would rarely need to update a weather sensor reading from last week unless we wanted to check with other sources and update our readings with the correct values.
Due to their unique nature, update and delete operations in a time series type of collection are constrained. Updates and deletes need to operate with multi:true and justOne:false to affect multiple documents, and the query to update or delete might only match the metaField subdocument fields. We can only modify the metaField subdocument using the update command, and the update cannot be an upsert operation as this would create a new document instead of updating an existing one. Transactions cannot write into time series collections.
Time series collections use the zstd compression algorithm instead of the default snappy “for generic purpose” collections. This is configurable at collection creation time using the block_compressor=snappy|zlib|zstd parameter.
We can define the storageEngine, indexOptionDefaults, collation, and writeConcern parameters for a time series collection in the same way as a general-purpose collection.
Note
A time series’ maximum document size is limited to 4 MB instead of the global 16 MB.
Time series are optimized for write once, read many with sporadic updates/deletes, and this reflects the limitations in updates/deletes in favor of improved read/write performance.
We can shard a time series collection using timeField or metaField. In this scenario, timeField is a timestamp, so using it on its own will result in all writes going to the last shard. Therefore, it’s a good practice to combine timeField with one or more fields from metaField.
In certain use cases, the time series collections can be really useful. MongoDB 6.0 is expanding on secondary and compound indexes to make them even more useful. It is also adding performance improvements in sorted queries and when fetching the most recent entry from a time series collection. In the next section, we will go through another recent addition to MongoDB from the relational database world: the views functionality.
Views and materialized views are essential parts of database applications, and they are supported in nearly all relational database management systems.
The main difference between the two is that a view will return the results by querying the underlying data at the time of the query, whereas a materialized view stores the view data in a distinct dataset, meaning that the result set might be outdated by the time of the query.
We can create a MongoDB view using the shell or our programming language’s driver with the following parameters:
Note
Querying a view will execute the aggregation pipeline every time. Its performance is limited by the performance of the aggregation pipeline stages and all pipeline limitations will be applied.
We can create an on-demand materialized view using the aggregation framework and, more specifically, the $merge stage. The $merge operator allows us to keep existing collection documents and merge our output result set into it. We can execute the pipeline periodically to refresh the collection’s data that will be used for querying our materialized view.
In the next section, we will discuss the most important aggregation pipeline limitations that we need to keep in mind when designing pipelines.
The aggregation pipeline can output results in the following three distinct ways:
Inline results are subject to a BSON maximum document size of 16 MB, meaning that we should use this only if our final result is of a fixed size. An example of this would be outputting ObjectId values of the top five most ordered items from an e-commerce site.
A contrary example to this would be outputting the top 1,000 most ordered items, along with the product information, including a description and various other fields of variable size.
Outputting results into a collection is the preferred solution if we want to perform further processing of the data. We can either output into a new collection or replace the contents of an existing collection. The aggregation output results will only be visible once the aggregation command succeeds; otherwise, it will not be visible at all.
Note
The output collection cannot be a sharded or capped collection (as of version 3.4). If the aggregation output violates the indexes (including the built-in index of the unique ObjectId value per document) or document validation rules, the aggregation will fail.
Each pipeline stage can have documents exceeding the 16 MB limit as these are handled by MongoDB internally. However, each pipeline stage can use more than 100 MB of memory by default starting from MongoDB 6.0, flushing excess data to disk at the expense of performance. If we want to return an error instead, we must set {allowDiskUseByDefault: false} at the mongod server level. We can also override this behavior in individual operations using the {allowDiskUse: true|false} operator. The $graphLookup operator does not support datasets that are over 100 MB and will ignore any setting in allowDiskUse.
These limitations should be taken into account when designing pipelines and must be tested thoroughly before being put into production workloads. In the next section, we will discuss how we can optimize aggregation pipelines.
When we submit the aggregation pipeline for execution, MongoDB might reorder and group execution stages to improve performance.
We can inspect these optimizations by including the explain option in the aggregation execution command in the shell or our programming language’s driver.
Further to the optimizations that MongoDB will perform independently, we should aim to design our aggregation commands with an eye on limiting the number of documents that need to pass from one stage to the next, as early as possible.
This can be done in two ways.
The first way is by using indexes to improve the querying speed in every step of the aggregation pipeline. The rules are as follows:
The second way to do this independently is by designing our pipeline stages to filter out as many documents as early as possible in the aggregation pipeline stages.
In this rather lengthy section, we will use the aggregation framework to process data from the Ethereum blockchain.
Using our Python code, we have extracted data from Ethereum and loaded it into our MongoDB database. The relationship of the blockchain to our database is shown in the following diagram:
Figure 7.1: A MongoDB database interacting with the Ethereum public blockchain
Our data resides in two collections: blocks and transactions.
A sample block document has the following fields:
The following code shows the output data from a block:
> db.blocks.findOne()
{
"_id" : ObjectId("595368fbcedea89d3f4fb0ca"),
"number_transactions" : 28,
"timestamp" : NumberLong("1498324744877"),
"gas_used" : 4694483,
"number_internal_transactions" : 4,
"block_hash" : "0x89d235c4e2e4e4978440f3cc1966f1ffb343b9b5cfec9e5cebc331fb810bded3",
"difficulty" : NumberLong("882071747513072"),
"block_height" : 3923788
}
A sample transaction document has the following fields:
The following code shows the output data from a transaction:
> db.transactions.findOne()
{
"_id" : ObjectId("59535748cedea89997e8385a"),
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"txfee" : 28594,
"timestamp" : ISODate("2017-06-06T11:23:10Z"),
"value" : 0,
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"block" : 3923794
}
Sample data for our database is available on GitHub at https://github.com/PacktPublishing/Mastering-MongoDB-6.x.
As curious developers who are using this novel blockchain technology, we want to analyze the Ethereum transactions. We are especially keen to do the following:
We will find the top 10 addresses that the transactions originate from. To calculate this metric, first, we will count the number of occurrences with a 1 count for each one, group them by the value of the from field, and output them into a new field called count.
After this, we will sort by the value of the count field in descending (-1) order, and finally, we will limit the output to the first 10 documents that pass through the pipeline. These documents are the top 10 addresses that we are looking for.
The following is some sample Python code:
def top_ten_addresses_from(self):
pipeline = [
{"$group": {"_id": "$from", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'count': 38, u'_id': u'miningpoolhub_1'}
{u'count': 31, u'_id': u'Ethermine'}
{u'count': 30, u'_id': u'0x3c540be890df69eca5f0099bbedd5d667bd693f3'}
{u'count': 27, u'_id': u'0xb42b20ddbeabdc2a288be7ff847ff94fb48d2579'}
{u'count': 25, u'_id': u'ethfans.org'}
{u'count': 16, u'_id': u'Bittrex'}
{u'count': 8, u'_id': u'0x009735c1f7d06faaf9db5223c795e2d35080e826'}
{u'count': 8, u'_id': u'Oraclize'}
{u'count': 7, u'_id': u'0x1151314c646ce4e0efd76d1af4760ae66a9fe30f'}
{u'count': 7, u'_id': u'0x4d3ef0e8b49999de8fa4d531f07186cc3abe3d6e'}
Now, we will find the top 10 addresses where the transactions end. As we did with from, the calculation for the to addresses is exactly the same, only grouping using the to field instead of from, as shown in the following code:
def top_ten_addresses_to(self):
pipeline = [
{"$group": {"_id": "$to", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'count': 33, u'_id': u'0x6090a6e47849629b7245dfa1ca21d94cd15878ef'}
{u'count': 30, u'_id': u'0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81'}
{u'count': 25, u'_id': u'0x69ea6b31ef305d6b99bb2d4c9d99456fa108b02a'}
{u'count': 23, u'_id': u'0xe94b04a0fed112f3664e45adb2b8915693dd5ff3'}
{u'count': 22, u'_id': u'0x8d12a197cb00d4747a1fe03395095ce2a5cc6819'}
{u'count': 18, u'_id': u'0x91337a300e0361bddb2e377dd4e88ccb7796663d'}
{u'count': 13, u'_id': u'0x1c3f580daeaac2f540c998c8ae3e4b18440f7c45'}
{u'count': 12, u'_id': u'0xeef274b28bd40b717f5fea9b806d1203daad0807'}
{u'count': 9, u'_id': u'0x96fc4553a00c117c5b0bed950dd625d1c16dc894'}
{u'count': 9, u'_id': u'0xd43d09ec1bc5e57c8f3d0c64020d403b04c7f783'}
Let’s find the average value per transaction, with statistics concerning the standard deviation. In this example, we are using the $avg and $stdDevPop operators of the values of the value field to calculate the statistics for this field. Using a simple $group operation, we output a single document with the ID of our choice (here, it is value) and averageValues, as shown in the following code:
def average_value_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageValues": {"$avg": "$value"}, "stdDevValues": {"$stdDevPop": "$value"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'averageValues': 5.227238976440972, u'_id': u'value', u'stdDevValues': 38.90322689649576}
Let’s find the average fee required per transaction, returning statistics that showcase the deviation. Average fees are similar to average values, replacing $value with $txfee, as shown in the following code:
def average_fee_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageFees": {"$avg": "$txfee"}, "stdDevValues": {"$stdDevPop": "$txfee"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code snippet is as follows:
{u'_id': u'value', u'averageFees': 320842.0729166667, u'stdDevValues': 1798081.7305142984}
We find the time of day that the network is more active according to the number or value of transactions at specific times.
To find out the most active hour for transactions, we use the $hour operator to extract the hour field from the ISODate() field in which we stored our datetime values and called timestamp, as shown in the following code:
def active_hour_of_day_transactions(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output is as follows:
{u'_id': 11, u'transactions': 34}
The following code will calculate the sum of transaction values for the most active hour of the day:
def active_hour_of_day_values(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'transaction_values': 33.17773841, u'_id': 20}
Let’s find the day of the week that the network is more active according to the number of transactions or the value of transactions. As we did with the hour of the day, we use the $dayOfWeek operator to extract the day of the week from the ISODate() objects, as shown in the following code. Days are numbered ranging from one for Sunday to seven for Saturday, following the US convention:
def active_day_of_week_transactions(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'_id': 3, u'transactions': 92}
The following code will calculate the sum of transaction values for the most active day of the week:
def active_day_of_week_values(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'transaction_values': 547.62439312, u'_id': 2}
The aggregations that we calculated can be described in the following diagram:
Figure 7.2: Aggregations performed in the transactions collection
In terms of blocks, we would like to know the following:
Averaging over the number_transactions field, we can get the number of transactions per block, as shown in the following code:
def average_number_transactions_total_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_per_block", "count": {"$avg": "$number_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'count': 39.458333333333336, u'_id': u'average_transactions_per_block'}
Whereas with the following code, we can get the average number of internal transactions per block:
def average_number_transactions_internal_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_internal_per_block", "count": {"$avg": "$number_internal_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output of the preceding code is as follows:
{u'count': 8.0, u'_id': u'average_transactions_internal_per_block'}
The average amount of gas used per block can be acquired as follows:
def average_gas_block(self):
pipeline = [
{"$group": {"_id": "average_gas_used_per_block",
"count": {"$avg": "$gas_used"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output is as follows:
{u'count': 2563647.9166666665, u'_id': u'average_gas_used_per_block'}
The average difficulty per block and how it deviates can be acquired as follows:
def average_difficulty_block(self):
pipeline = [
{"$group": {"_id": "average_difficulty_per_block",
"count": {"$avg": "$difficulty"}, "stddev": {"$stdDevPop": "$difficulty"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
The output is as follows:
{u'count': 881676386932100.0, u'_id': u'average_difficulty_per_block', u'stddev': 446694674991.6385}
Our aggregations are described in the following diagram:
Figure 7.3: Aggregations performed in the blocks collection
Now that we have the basic statistics calculated, we want to up our game and find out more information about our transactions. Through our sophisticated machine learning algorithms, we have identified some of the transactions as either a scam or an initial coin offering (ICO), or maybe both.
In these documents, we have marked the attributes in an array called tags, as follows:
{
"_id" : ObjectId("59554977cedea8f696a416dd"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"timestamp" : ISODate("2017-06-10T09:59:35Z"),
"tags" : [
"scam",
"ico"
],
"value" : 0
}
Now we want to get the transactions from June 2017, remove the _id field, and produce different documents according to the tags that we have identified. So, in our example, we would output two documents in our new collection, scam_ico_documents, for separate processing.
The way to do this is via the aggregation framework, as shown in the following code:
def scam_or_ico_aggregation(self):
pipeline = [
{"$match": {"timestamp": {"$gte": datetime.datetime(2017,6,1), "$lte": datetime.datetime(2017,7,1)}}},
{"$project": {
"to": 1,
"txhash": 1,
"from": 1,
"block": 1,
"txfee": 1,
"tags": 1,
"value": 1,
"report_period": "June 2017",
"_id": 0,
}
},
{"$unwind": "$tags"},
{"$out": "scam_ico_documents"}
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)
Here, we have the following four distinct steps in our aggregation framework pipeline:
Since we used the $out operator, we will get no results in the command line. If we comment out {"$out": "scam_ico_documents"}, we get documents that look like the following:
{u'from': u'miningpoolhub_1', u'tags': u'scam', u'report_period': u'June 2017', u'value': 0.52415349, u'to': u'0xdaf112bcbd38d231b1be4ae92a72a41aa2bb231d', u'txhash': u'0xe11ea11df4190bf06cbdaf19ae88a707766b007b3d9f35270cde37ceccba9a5c', u'txfee': 21.0, u'block': 3923785}
The final result in our database will look like this:
{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017"
}
Now that we have documents that are clearly separated in the scam_ico_documents collection, we can perform further analysis pretty easily. An example of this analysis would be to append more information on some of these scammers. Luckily, our data scientists have come up with some additional information that we have extracted into a new scam_details collection, which looks like this:
{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
Email_address": [email protected]"
}
Now, we can create a new aggregation pipeline job to join scam_ico_documents with the scam_details collection and output these extended results into a new collection, called scam_ico_documents_extended, as follows:
def scam_add_information(self):
client = MongoClient()
db = client.mongo_book
scam_collection = db.scam_ico_documents
pipeline = [
{"$lookup": {"from": "scam_details", "localField": "from", "foreignField": "scam_address", "as": "scam_details"}},
{"$match": {"scam_details": { "$ne": [] }}},
{"$out": "scam_ico_documents_extended"}
]
result = scam_collection.aggregate(pipeline)
for res in result:
print(res)
Here, we are using the following three-step aggregation pipeline:
These documents will now look like the following:
> db.scam_ico_documents_extended.findOne()
{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017",
"scam_details_data" : [
{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
email_address": [email protected]"
}]}
Using the aggregation framework, we have identified our data and can process it rapidly and efficiently.
The previous steps can be summed up in the following diagram:
Figure 7.4: The overall data flow of our use case
In this chapter, we dived deep into the aggregation framework. We discussed why and when we should use aggregation as opposed to simply using MapReduce or querying the database. We went through a vast array of options and functionalities for aggregation.
We discussed the aggregation stages and the various operators, such as Boolean operators, comparison operators, set operators, array operators, date operators, string operators, expression arithmetic operators, aggregation accumulators, conditional expressions, and variables, along with the literal parsing data type operators.
Using the Ethereum use case, we went through aggregation with working code and learned how to approach an engineering problem to solve it.
Finally, we learned about the limitations that the aggregation framework currently has and when to avoid them.
In the next chapter, we will move on to the topic of indexing and learn how to design and implement performant indexes for our read and write workloads.