Implementing our analytics

We are now ready to run our analytics. On the <KAIROSDB_HOME>/plugins  directory, we need to create a Python file called mymean_analytic.py. The code for this is as follows:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import KairosDBOperator
import datetime
import logging


def my_mean(ds, **kwargs):
logging.info("kwargs: %s" % kwargs)
logging.info("ds: %s" % ds)
ti = kwargs['ti']
data = ti.xcom_pull(key=None, task_ids=['get_data'])
return _mean(data)

def _mean(data):
ret={}
for d in data:
results = d['results']
for r in results:
m = [float(sum(l))/len(l) for l in zip(*r['values'])]
ret[r['name']] = m[1]
print(ret)
return ret

dag = DAG('mymean', description='Simple Mean of the temperature from last year',
default_args = {'owner': 'iiot-book'},
schedule_interval='* * * * 0',
start_date=datetime.datetime(2018, 6, 21), catchup=False)

kairos_operator = KairosDBOperator(
task_id='get_data',
query={
"metrics": [
{
"tags": {},
"name": "device0.my.measure.temperature",
"aggregators": [
{
"name": "scale",
"factor": "1.0"
}
]
}
],
"plugins": [],
"cache_time": 0,
"start_relative": {
"value": "1",
"unit": "years"
}
},
dag=dag)
myanalytic_task= PythonOperator(
task_id='myanalytic',
provide_context=True,
python_callable=print_context,
dag=dag)

kairos_operator >> print_task >> myanalytic_task

The first block imports the Airflow dependencies. The functions my_mean and _mean define the main functionalities. These functions extract data and calculate the mean. We then have to define our workflow:

kairos_operator >> myanalytic_task

In the first step, we get the data from the last year. During the second step, we calculate the mean for each tag. To test our code:

  1. Enable the scheduler
  2. Click on the run button of the interface

The following screenshot highlights these two steps:

Triggering the execution of the mymean analytics

To see the status of execution of the workflow, we can click on DAG: mymean to see the details. The following screenshot shows the expected output:

Airflow workflow DAGs

What's still missing here? We have discovered the capabilities of Apache Airflow to orchestrate our analytics and to develop extensions for our data source. To make Apache Airflow a platform for I-IoT, however, we need a connector to our asset registry  and a simple task able to run analytics for each asset we want to monitor. It is very easy to extend our plugin for KairosDB to Neo4j using the Neo4j driver for Python. This is described in more detail in the following section. To scale the execution of our analytics across our fleet according to our asset list, we can build our DAG dynamically.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset