feat(itho-wpu): export to influxdb support

Export getdatalog measurements to InfluxDB by giving the
'--export-to-influxdb' argument. Additionally configure the InfluxDB
credentials via environment variables:
- INFLUXDB_HOST (default: localhost)
- INFLUXDB_PORT (default: 8086)
- INFLUXDB_USERNAME (default: root)
- INFLUXDB_PASSWORD (default: root)
- INFLUXDB_DATABASE
This commit is contained in:
Pim van den Berg 2020-11-08 15:05:13 +01:00
parent c867203913
commit c43de957e9
1 changed files with 32 additions and 2 deletions

View File

@ -7,6 +7,8 @@ import pigpio
import queue
import sys
import time
import os
import datetime
from collections import namedtuple
consolelogformatter = logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s")
@ -17,6 +19,29 @@ stdout_log_handler.setFormatter(consolelogformatter)
logger.addHandler(stdout_log_handler)
def export_to_influxdb(action, measurements):
from influxdb import InfluxDBClient
influx_client = InfluxDBClient(
host=os.getenv('INFLUXDB_HOST', 'localhost'),
port=os.getenv('INFLUXDB_PORT', 8086),
username=os.getenv('INFLUXDB_USERNAME', 'root'),
password=os.getenv('INFLUXDB_PASSWORD', 'root'),
database=os.getenv('INFLUXDB_DATABASE')
)
json_body = [
{
"measurement": action,
"time": datetime.datetime.utcnow().replace(microsecond=0).isoformat(),
"fields": measurements,
}
]
try:
influx_client.write_points(json_body)
except Exception as e:
print('Failed to write to influxdb: ', e)
def parse_args():
parser = argparse.ArgumentParser(description='Itho WPU i2c master')
@ -35,6 +60,8 @@ def parse_args():
parser.add_argument('--slave-only', action='store_true', help="Only run I2C slave")
parser.add_argument('--slave-timeout', nargs='?', type=int, default=60,
help="Slave timeout in seconds when --slave-only")
parser.add_argument('--export-to-influxdb', action='store_true',
help="Export results to InfluxDB")
args = parser.parse_args()
return args
@ -115,10 +142,13 @@ class I2CMaster:
self.i.close()
def process_response(action, response):
def process_response(action, response, args):
if action == "getdatalog" and int(response[1], 0) == 0xA4 and int(response[2], 0) == 0x01:
measurements = process_datalog(response)
if args.export_to_influxdb:
export_to_influxdb(action, measurements)
def process_datalog(response):
# 0 = Byte
@ -195,7 +225,7 @@ if __name__ == "__main__":
master.close()
if result is not None:
process_response(args.action, result)
process_response(args.action, result, args)
if not args.master_only:
slave.close()