From a11df838f8601b44b00f04608c043e258f8e0096 Mon Sep 17 00:00:00 2001 From: Pim van den Berg Date: Mon, 4 Sep 2023 19:46:00 +0200 Subject: [PATCH] feat: add support for writing manual operations Initiating and finishing a manual operation works a bit strange. First of all the Maximum Manual Operation Time is controlled by a setting (--id 4) and defaults to 0 (infinite). And instead of waiting for the timer to expire (which you have to do when its set to 0) you can finish a manual operation by passing "--no-check". Note: For some reason no response is given when a manual operation is written to. That's why execute_action() returns succesfully (with None) when its executing the "setmanual" action and the response queue size is 0. --- README.md | 40 ++++++++++++++++++++++++++++++++++++ itho-wpu.py | 59 +++++++++++++++++++++++++++++++++++++++++++++++++---- itho_i2c.py | 25 +++++++++++++++++++---- 3 files changed, 116 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 2ad6526..54a0257 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,46 @@ See the [pislave](https://github.com/ootjersb/pislave#wiring) project 0. Buitentemp (°C): 10.0 ``` +* Initiate a manual operation + ``` + # ./itho-wpu.py --action setmanual --id 0 --value 29.00 + Current manual operation: + 0. Buitentemp (°C): 10.0 + Manual `0` will be changed to `29.0`? [y/N] y + Updating manual operation 0 to `29.0` + Are you really sure? (Type uppercase yes): YES + ``` + +* Finish a manual operation + ``` + # ./itho-wpu.py --action setmanual --id 0 --value 29.00 --no-check + Current manual operation: + 0. Buitentemp (°C): 29.0 + Manual `0` will be changed to `29.0`? [y/N] y + Updating manual operation 0 to `29.0` + Are you really sure? (Type uppercase yes): YES + ``` + +* Reset all errors: + ``` + # ./itho-wpu.py --action setmanual --id 37 --value 1 + Current manual operation: + 37. Reset Alle Fouten: 0 + Manual `37` will be changed to `1`? [y/N] y + Updating manual operation 37 to `1` + Are you really sure? (Type uppercase yes): YES + ``` + +* Reset timer: + ``` + # ./itho-wpu.py --action setmanual --id 38 --value 1 + Current manual operation: + 38. Reset Timer: 0 + Manual `38` will be changed to `1`? [y/N] y + Updating manual operation 38 to `1` + Are you really sure? (Type uppercase yes): YES + ``` + # Exporting measurements ## InfluxDB diff --git a/itho-wpu.py b/itho-wpu.py index 59bd02f..3bcc1dd 100755 --- a/itho-wpu.py +++ b/itho-wpu.py @@ -26,6 +26,7 @@ actions = { "getsetting": [0xA4, 0x10], "setsetting": [0xA4, 0x10], "getmanual": [0x40, 0x30], + "setmanual": [0x40, 0x30], } @@ -50,6 +51,12 @@ def parse_args(): nargs="?", help="Setting value", ) + parser.add_argument( + "--check", + default=True, + action=argparse.BooleanOptionalAction, + help="Enable/disable manual operation (used with --setmanual)", + ) parser.add_argument( "--loglevel", nargs="?", @@ -89,7 +96,7 @@ class IthoWPU: self.datatype = self.get("getdatatype") self.heatpump_db = db.sqlite("heatpump.sqlite") - def get(self, action, identifier=None, value=None): + def get(self, action, identifier=None, datatype=None, value=None, check=True): if not self.no_cache: response = self.cache.get(action.replace("get", "")) if response is not None: @@ -107,7 +114,7 @@ class IthoWPU: if not self.slave_only: master = I2CMaster(address=0x41, bus=1, queue=self._q) if action: - response = master.execute_action(action, identifier, value) + response = master.execute_action(action, identifier, datatype, value, check) logger.debug(f"Response: {response}") master.close() @@ -426,7 +433,7 @@ def process_setsetting(wpu, args): logger.error("Aborted") return - response = wpu.get("setsetting", args.id, normalized_value) + response = wpu.get("setsetting", args.id, None, normalized_value) if response is None: return process_response("getsetting", response, args, wpu) @@ -454,6 +461,46 @@ def process_manual(response, wpu): ) +def process_setmanual(wpu, args): + logger.info("Current manual operation:") + response = wpu.get("getmanual", int(args.id)) + if response is None: + return + process_response("getmanual", response, args, wpu) + message = response[5:] + datatype = message[3] + + # TODO: check if Max Handbedieningstijd > 0 + + if args.value is None: + value = input("Provide a new value: ") + else: + value = args.value + + # TODO: add support for negative values + if float(value) < 0: + logger.error("Negative values are not supported yet.") + return + + logger.debug(f"New manual operation datatype: {datatype}") + logger.debug(f"New manual operation (input): {value}") + normalized_value = int(value.replace(".", "")) + logger.debug(f"New manual operation (normalized): {normalized_value}") + hex_list_value = [hex(v) for v in list(normalized_value.to_bytes(2, byteorder="big"))] + logger.debug(f"New manual operation (hex): {hex_list_value}") + parsed_value = format_datatype(args.id, hex_list_value, datatype) + logger.debug(f"New manual operation (parsed): {parsed_value}") + + sure = input(f"Manual `{args.id}` will be changed to `{parsed_value}`? [y/N] ") + if sure in ["y", "Y"]: + logger.info(f"Updating manual operation {args.id} to `{parsed_value}`") + else: + logger.error("Aborted") + return + + response = wpu.get("setmanual", args.id, int(datatype, 0), normalized_value, args.check) + + def format_datatype(name, m, dt): """ Transform a list of bytes to a readable number based on the datatype. @@ -534,7 +581,7 @@ def main(): logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s") ) - if args.action in ["getsetting", "setsetting", "getmanual"] and args.id is None: + if args.action in ["getsetting", "setsetting", "getmanual", "setmanual"] and args.id is None: logger.error(f"`--id` is required with `--action {args.action}`") return @@ -548,6 +595,10 @@ def main(): process_setsetting(wpu, args) return + if args.action == "setmanual": + process_setmanual(wpu, args) + return + response = wpu.get(args.action, args.id) if response is not None: process_response(args.action, response, args, wpu) diff --git a/itho_i2c.py b/itho_i2c.py index aa5986d..f826bd3 100644 --- a/itho_i2c.py +++ b/itho_i2c.py @@ -20,6 +20,7 @@ actions = { "getsetting": [0xA4, 0x10], "setsetting": [0xA4, 0x10], "getmanual": [0x40, 0x30], + "setmanual": [0x40, 0x30], } @@ -53,7 +54,7 @@ class I2CMaster: self.i = I2CRaw(address=address, bus=bus) self.queue = queue - def compose_request(self, action, identifier, value): + def compose_request(self, action, identifier, datatype, value, check): if action == "getsetting": request = ( [0x80] @@ -87,6 +88,20 @@ class I2CMaster: + byte_identifier + [0x01] # 1 = manual ) + elif action == "setmanual": + byte_identifier = list(identifier.to_bytes(2, byteorder="big")) + byte_list_value = list(value.to_bytes(2, byteorder="big")) + byte_check = [0x01] if check else [0x00] + request = ( + [0x80] + + actions[action] + + [0x06, 0x07] # write, length + + [0x01] # bank + + byte_identifier + + [datatype] # datatype + + byte_list_value # new + + byte_check + ) else: # 0x80 = source, 0x04 = msg_type, 0x00 = length request = [0x80] + actions[action] + [0x04, 0x00] @@ -102,12 +117,12 @@ class I2CMaster: checksum = 0 return checksum - def execute_action(self, action, identifier, value): - request = self.compose_request(action, identifier, value) + def execute_action(self, action, identifier, datatype, value, check): + request = self.compose_request(action, identifier, datatype, value, check) request_in_hex = [hex(c) for c in request] logger.debug(f"Request: {request_in_hex}") result = None - if action == "setsetting": + if action in ["setsetting", "setmanual"]: sure = input("Are you really sure? (Type uppercase yes): ") if sure != "YES": logger.error("Aborted") @@ -120,6 +135,8 @@ class I2CMaster: if self.queue.qsize() > 0: result = self.queue.get() break + elif action == "setmanual" and self.queue.qsize() == 0: + return None if result is None: logger.error("No valid result in 20 requests")