We are now ready to run our analytics. On the <KAIROSDB_HOME>/plugins directory, we need to create a Python file called mymean_analytic.py. The code for this is as follows:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import KairosDBOperator
import datetime
import logging
def my_mean(ds, **kwargs):
logging.info("kwargs: %s" % kwargs)
logging.info("ds: %s" % ds)
ti = kwargs['ti']
data = ti.xcom_pull(key=None, task_ids=['get_data'])
return _mean(data)
def _mean(data):
ret={}
for d in data:
results = d['results']
for r in results:
m = [float(sum(l))/len(l) for l in zip(*r['values'])]
ret[r['name']] = m[1]
print(ret)
return ret
dag = DAG('mymean', description='Simple Mean of the temperature from last year',
default_args = {'owner': 'iiot-book'},
schedule_interval='* * * * 0',
start_date=datetime.datetime(2018, 6, 21), catchup=False)
kairos_operator = KairosDBOperator(
task_id='get_data',
query={
"metrics": [
{
"tags": {},
"name": "device0.my.measure.temperature",
"aggregators": [
{
"name": "scale",
"factor": "1.0"
}
]
}
],
"plugins": [],
"cache_time": 0,
"start_relative": {
"value": "1",
"unit": "years"
}
},
dag=dag)
myanalytic_task= PythonOperator(
task_id='myanalytic',
provide_context=True,
python_callable=print_context,
dag=dag)
kairos_operator >> print_task >> myanalytic_task
The first block imports the Airflow dependencies. The functions my_mean and _mean define the main functionalities. These functions extract data and calculate the mean. We then have to define our workflow:
kairos_operator >> myanalytic_task
In the first step, we get the data from the last year. During the second step, we calculate the mean for each tag. To test our code:
- Enable the scheduler
- Click on the run button of the interface
The following screenshot highlights these two steps:
Triggering the execution of the mymean analytics
To see the status of execution of the workflow, we can click on DAG: mymean to see the details. The following screenshot shows the expected output:
What's still missing here? We have discovered the capabilities of Apache Airflow to orchestrate our analytics and to develop extensions for our data source. To make Apache Airflow a platform for I-IoT, however, we need a connector to our asset registry and a simple task able to run analytics for each asset we want to monitor. It is very easy to extend our plugin for KairosDB to Neo4j using the Neo4j driver for Python. This is described in more detail in the following section. To scale the execution of our analytics across our fleet according to our asset list, we can build our DAG dynamically.