From ff1fabce2cc986dbdd91076bb1da17409318ec5b Mon Sep 17 00:00:00 2001 From: Pim van den Berg Date: Thu, 8 Oct 2020 13:39:10 +0200 Subject: [PATCH] feat(itho-wpu): initial i2c master functionality Writing directly to the /dev/i2c-1 device. Support for 4 actions: - getregelaar - getserial - getdatatype - getdatalog --- i2c_raw.py | 29 +++++++++++++++++++++++++ itho-wpu.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 i2c_raw.py create mode 100755 itho-wpu.py diff --git a/i2c_raw.py b/i2c_raw.py new file mode 100644 index 0000000..4389f90 --- /dev/null +++ b/i2c_raw.py @@ -0,0 +1,29 @@ +import io +import fcntl +import struct + +I2C_SLAVE = 0x0703 + + +class I2CRaw: + def __init__(self, address, bus): + self.fr = io.open(f"/dev/i2c-{bus}", "rb", buffering=0) + self.fw = io.open(f"/dev/i2c-{bus}", "wb", buffering=0) + fcntl.ioctl(self.fr, I2C_SLAVE, address) + fcntl.ioctl(self.fw, I2C_SLAVE, address) + + def write_i2c_block_data(self, data): + if type(data) is not list: + return -1 + data = bytearray(data) + self.fw.write(data) + return 0 + + def read_i2c_block_data(self, n_bytes): + data_raw = self.fr.read(n_bytes) + unpack_format = 'B'*n_bytes + return list(struct.unpack(unpack_format, data_raw)) + + def close(self): + self.fr.close() + self.fw.close() diff --git a/itho-wpu.py b/itho-wpu.py new file mode 100755 index 0000000..9541100 --- /dev/null +++ b/itho-wpu.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +import argparse +import i2c_raw +import logging +import sys + +consolelogformatter = logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s") +logger = logging.getLogger('stdout') +logger.setLevel(logging.INFO) +stdout_log_handler = logging.StreamHandler(sys.stdout) +stdout_log_handler.setFormatter(consolelogformatter) +logger.addHandler(stdout_log_handler) + + +def parse_args(): + parser = argparse.ArgumentParser(description='Itho WPU i2c master') + + actions = [ + "getregelaar", + "getserial", + "getdatatype", + "getdatalog", + ] + parser.add_argument('--action', nargs='?', required=True, + choices=actions, help="Execute an action") + parser.add_argument('--loglevel', nargs='?', + choices=["debug", "info", "warning", "error", "critical"], + help="Loglevel") + + args = parser.parse_args() + return args + + +class I2CMaster: + def __init__(self, address, bus): + self.i = i2c_raw.I2CRaw(address=address, bus=bus) + + def execute_action(self, action): + actions = { + "getregelaar": [0x80, 0x90, 0xE0, 0x04, 0x00, 0x8A], + "getserial": [0x80, 0x90, 0xE1, 0x04, 0x00, 0x89], + "getdatatype": [0x80, 0xA4, 0x00, 0x04, 0x00, 0x56], + "getdatalog": [0x80, 0xA4, 0x01, 0x04, 0x00, 0x55], + } + logger.info(f"Executing action: {action}") + self.i.write_i2c_block_data(actions[action]) + + def close(self): + self.i.close() + + +if __name__ == "__main__": + args = parse_args() + + if args.loglevel: + logger.setLevel(args.loglevel.upper()) + + master = I2CMaster(address=0x41, bus=1) + if args.action: + master.execute_action(args.action) + master.close()