Aggregation use case

In this rather lengthy section, we will use the aggregation framework to process data from the Ethereum blockchain.

Using our Python code, we have extracted data from Ethereum and loaded it into our MongoDB database. The relation of the blockchain to our database is shown in the following diagram:

Our data resides in two collections: blocks and transactions.

A sample block document has the following fields:

  • Number of transactions
  • Number of contracted internal transactions
  • Block hash
  • Parent block hash
  • Mining difficulty
  • Gas used
  • Block height

The following code shows the output data from a block:

> db.blocks.findOne()
{
"_id" : ObjectId("595368fbcedea89d3f4fb0ca"),
"number_transactions" : 28,
"timestamp" : NumberLong("1498324744877"),
"gas_used" : 4694483,
"number_internal_transactions" : 4,
"block_hash" : "0x89d235c4e2e4e4978440f3cc1966f1ffb343b9b5cfec9e5cebc331fb810bded3",
"difficulty" : NumberLong("882071747513072"),
"block_height" : 3923788
}

A sample transaction document has the following fields:

  • Transaction hash
  • The block height it belongs to
  • From hash address
  • To hash address
  • Transaction value
  • Transaction fee

The following code shows the output data from a transaction:

> db.transactions.findOne()
{
"_id" : ObjectId("59535748cedea89997e8385a"),
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"txfee" : 28594,
"timestamp" : ISODate("2017-06-06T11:23:10Z"),
"value" : 0,
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"block" : 3923794
}

Sample data for our database is available on GitHub at: https://github.com/PacktPublishing/Mastering-MongoDB-4.x-Second-Edition.

As curious developers who are using this novel blockchain technology, we want to analyze Ethereum transactions. We are especially keen to do the following:

  • Find the top ten addresses that transactions originate from
  • Find the top ten addresses that transactions end in
  • Find the average value per transaction, with statistics concerning the deviation
  • Find the average fee required per transaction, with statistics concerning the deviation
  • Find the time of day that the network is more active according to the number or value of transactions
  • Find the day of week that the network is more active according to the number or value of transactions

We find the top ten addresses that transactions originate from. To calculate this metric, we first count the number of occurrences with a 1 count for each one, group them by the value of the from field, and output them into a new field called count.

After this, we sort by the value of the count field in descending (-1) order, and finally, we limit the output to the first ten documents that pass through the pipeline. These documents are the top ten addresses that we are looking for.

The following is some sample Python code:

   def top_ten_addresses_from(self):
pipeline = [
{"$group": {"_id": "$from", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'count': 38, u'_id': u'miningpoolhub_1'}
{u'count': 31, u'_id': u'Ethermine'}
{u'count': 30, u'_id': u'0x3c540be890df69eca5f0099bbedd5d667bd693f3'}
{u'count': 27, u'_id': u'0xb42b20ddbeabdc2a288be7ff847ff94fb48d2579'}
{u'count': 25, u'_id': u'ethfans.org'}
{u'count': 16, u'_id': u'Bittrex'}
{u'count': 8, u'_id': u'0x009735c1f7d06faaf9db5223c795e2d35080e826'}
{u'count': 8, u'_id': u'Oraclize'}
{u'count': 7, u'_id': u'0x1151314c646ce4e0efd76d1af4760ae66a9fe30f'}
{u'count': 7, u'_id': u'0x4d3ef0e8b49999de8fa4d531f07186cc3abe3d6e'}

Now we find the top ten addresses that transactions end in. As we did with from, the calculation for the to addresses is exactly the same, only grouping using the to field instead of from, as shown in the following code:

   def top_ten_addresses_to(self):
pipeline = [
{"$group": {"_id": "$to", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1)])},
{"$limit": 10},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'count': 33, u'_id': u'0x6090a6e47849629b7245dfa1ca21d94cd15878ef'}
{u'count': 30, u'_id': u'0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81'}
{u'count': 25, u'_id': u'0x69ea6b31ef305d6b99bb2d4c9d99456fa108b02a'}
{u'count': 23, u'_id': u'0xe94b04a0fed112f3664e45adb2b8915693dd5ff3'}
{u'count': 22, u'_id': u'0x8d12a197cb00d4747a1fe03395095ce2a5cc6819'}
{u'count': 18, u'_id': u'0x91337a300e0361bddb2e377dd4e88ccb7796663d'}
{u'count': 13, u'_id': u'0x1c3f580daeaac2f540c998c8ae3e4b18440f7c45'}
{u'count': 12, u'_id': u'0xeef274b28bd40b717f5fea9b806d1203daad0807'}
{u'count': 9, u'_id': u'0x96fc4553a00c117c5b0bed950dd625d1c16dc894'}
{u'count': 9, u'_id': u'0xd43d09ec1bc5e57c8f3d0c64020d403b04c7f783'}

Let's find the average value per transaction, with statistics concerning the standard deviation. In this example, we are using the $avg and $stdDevPop operators of the values of the value field to calculate the statistics for this field. Using a simple $group operation, we output a single document with the ID of our choice (here, value) and the averageValues, as shown in the following code:

   def average_value_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageValues": {"$avg": "$value"}, "stdDevValues": {"$stdDevPop": "$value"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'averageValues': 5.227238976440972, u'_id': u'value', u'stdDevValues': 38.90322689649576}

Let's find the average fee required per transaction, returning statistics concerning the deviation. Average fees are similar to average values, replacing $value with $txfee, as shown in the following code:

   def average_fee_per_transaction(self):
pipeline = [
{"$group": {"_id": "value", "averageFees": {"$avg": "$txfee"}, "stdDevValues": {"$stdDevPop": "$txfee"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code snippet is as follows:

{u'_id': u'value', u'averageFees': 320842.0729166667, u'stdDevValues': 1798081.7305142984}

We find the time of day that the network is more active according to the number or value of transactions at specific times.

To find out the most active hour for transactions, we use the $hour operator to extract the hour field from the ISODate() field in which we stored our datetime values and called timestamp, as shown in the following code:

   def active_hour_of_day_transactions(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output is as follows:

{u'_id': 11, u'transactions': 34}

The following code will calculate the sum of transaction values for the most active hour of the day:

  def active_hour_of_day_values(self):
pipeline = [
{"$group": {"_id": {"$hour": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'transaction_values': 33.17773841, u'_id': 20}

Let's find the day of the week that the network is more active according to the number of transactions or value of transactions. As we did with the hour of the day, we use the $dayOfWeek operator to extract the day of the week from ISODate() objects, as shown in the following code. Days are numbered ranging from one for Sunday to seven for Saturday, following the US convention:

   def active_day_of_week_transactions(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transactions": {"$sum": 1}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'_id': 3, u'transactions': 92}

The following code will calculate the sum of transaction values for the most active day of the week:

  def active_day_of_week_values(self):
pipeline = [
{"$group": {"_id": {"$dayOfWeek": "$timestamp"}, "transaction_values": {"$sum": "$value"}}},
{"$sort": SON([("transactions", -1)])},
{"$limit": 1},
]
       result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:


{u'transaction_values': 547.62439312, u'_id': 2}

The aggregations that we calculated can be described in the following diagram:

In terms of blocks, we would like to know the following:

  • Average number of transactions per block, for both the total overall transactions and the total contracted internal transactions.
  • Average gas used per block.
  • Average gas used per transaction to a block. Is there a window of opportunity to submit my smart contract in a block?
  • Average difficulty per block and how it deviates.
  • Average number of transactions per block, both in total and also in contracted internal transactions.

Averaging over the number_transactions field, we can get the number of transactions per block, as shown in the following code:

   def average_number_transactions_total_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_per_block", "count": {"$avg": "$number_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

 {u'count': 39.458333333333336, u'_id': u'average_transactions_per_block'}

Whereas with the following code, we can get the average number of internal transactions per block:

  def average_number_transactions_internal_block(self):
pipeline = [
{"$group": {"_id": "average_transactions_internal_per_block", "count": {"$avg": "$number_internal_transactions"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output of the preceding code is as follows:

{u'count': 8.0, u'_id': u'average_transactions_internal_per_block'}

 The average gas used per block can be acquired as follows:

def average_gas_block(self):
pipeline = [
{"$group": {"_id": "average_gas_used_per_block",
"count": {"$avg": "$gas_used"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output is as follows:

{u'count': 2563647.9166666665, u'_id': u'average_gas_used_per_block'}

The average difficulty per block and how it deviates can be acquired as follows:

  def average_difficulty_block(self):
pipeline = [
{"$group": {"_id": "average_difficulty_per_block",
"count": {"$avg": "$difficulty"}, "stddev": {"$stdDevPop": "$difficulty"}}},
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

The output is as follows:

{u'count': 881676386932100.0, u'_id': u'average_difficulty_per_block', u'stddev': 446694674991.6385}

Our aggregations are described in the following schema:

Now that we have the basic statistics calculated, we want to up our game and find out more information about our transactions. Through our sophisticated machine learning algorithms, we have identified some of the transactions as either scams or initial coin offering (ICO), or maybe both.

In these documents, we have marked these attributes in an array called tags, as follows:

{
"_id" : ObjectId("59554977cedea8f696a416dd"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"timestamp" : ISODate("2017-06-10T09:59:35Z"),
"tags" : [
"scam",
"ico"
],
"value" : 0
}

Now we want to get the transactions from June, 2017, remove the _id field, and produce different documents according to the tags that we have identified. So in our example, we would output two documents in our new collection, scam_ico_documents, for separate processing.

The way to do this via the aggregation framework is shown in the following code:

def scam_or_ico_aggregation(self):
pipeline = [
{"$match": {"timestamp": {"$gte": datetime.datetime(2017,6,1), "$lte": datetime.datetime(2017,7,1)}}},
{"$project": {
"to": 1,
"txhash": 1,
"from": 1,
"block": 1,
"txfee": 1,
"tags": 1,
"value": 1,
"report_period": "June 2017",
"_id": 0,
}

},
{"$unwind": "$tags"},
{"$out": "scam_ico_documents"}
]
result = self.collection.aggregate(pipeline)
for res in result:
print(res)

Here, we have the following four distinct steps in our aggregation framework pipeline:

  1. Using $match, we only extract documents that have a timestamp  field value of June 01, 2017.
  2. Using $project, we add a new  report_period field with a value of June 2017 and remove the _id field by setting its value to 0. We keep the rest of the fields intact by using the value 1, as shown in the preceding code.
  3. Using $unwind, we output one new document per tag in our $tags array.
  4. Finally, using $out, we output all of our documents to a new scam_ico_documents collection.

Since we used the $out operator, we will get no results in the command line. If we comment out {"$out": "scam_ico_documents"}, we get documents that look like the following:

{u'from': u'miningpoolhub_1', u'tags': u'scam', u'report_period': u'June 2017', u'value': 0.52415349, u'to': u'0xdaf112bcbd38d231b1be4ae92a72a41aa2bb231d', u'txhash': u'0xe11ea11df4190bf06cbdaf19ae88a707766b007b3d9f35270cde37ceccba9a5c', u'txfee': 21.0, u'block': 3923785}

The final result in our database will look like this:

{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017"
}

Now that we have documents that are clearly separated in the scam_ico_documents collection, we can perform further analysis pretty easily. An example of this analysis would be to append more information on some of these scammers. Luckily, our data scientists have come up with some additional information that we have extracted into a new collection scam_details, which looks like this:

{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
Email_address": [email protected]"
}

We can now create a new aggregation pipeline job to join our scam_ico_documents with the scam_details collection and output these extended results in a new collection, called scam_ico_documents_extended, like this:

def scam_add_information(self):
client = MongoClient()
db = client.mongo_book
scam_collection = db.scam_ico_documents
pipeline = [
{"$lookup": {"from": "scam_details", "localField": "from", "foreignField": "scam_address", "as": "scam_details"}},
{"$match": {"scam_details": { "$ne": [] }}},
{"$out": "scam_ico_documents_extended"}
]
result = scam_collection.aggregate(pipeline)
for res in result:
print(res)

Here, we are using the following three-step aggregation pipeline:

  1. Use the $lookup command to join data from the scam_details collection and the scam_address field with data from our local collection (scam_ico_documents) based on the value from the local collection attribute from being equal to the value in the scam_details collection's scam_address field. If these are equal, then the pipeline adds a new field to the document called scam_details.
  2. Next, we only match the documents that have a scam_details field—the ones that matched with the lookup aggregation framework step.
  3. Finally, we output these documents in a new collection called scam_ico_documents_extended.

These documents will now look like this:

> db.scam_ico_documents_extended.findOne()
{
"_id" : ObjectId("5955533be9ec57bdb074074e"),
"to" : "0x4b9e0d224dabcc96191cace2d367a8d8b75c9c81",
"txhash" : "0xf205991d937bcb60955733e760356070319d95131a2d9643e3c48f2dfca39e77",
"from" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
"block" : 3923794,
"txfee" : 28594,
"tags" : "scam",
"value" : 0,
"report_period" : "June 2017",
"scam_details_data" : [
{
"_id" : ObjectId("5955510e14ae9238fe76d7f0"),
"scam_address" : "0x3c540be890df69eca5f0099bbedd5d667bd693f3",
email_address": [email protected]"
}]}

Using the aggregation framework, we have identified our data and can process it rapidly and efficiently.

The previous steps can be summed up in the following diagram:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset