mirror of
https://github.com/pommi/python-itho-wpu.git
synced 2024-11-14 12:42:15 +01:00
feat(itho-wpu): basic slave functionality
This uses the pigpio library as I'm not quite sure yet how to read this data directly from /dev/i2c-1, without polling and high CPU usage.
This commit is contained in:
parent
ff1fabce2c
commit
bec3415c4e
44
itho-wpu.py
44
itho-wpu.py
@ -3,7 +3,10 @@
|
||||
import argparse
|
||||
import i2c_raw
|
||||
import logging
|
||||
import pigpio
|
||||
import queue
|
||||
import sys
|
||||
import threading
|
||||
|
||||
consolelogformatter = logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s")
|
||||
logger = logging.getLogger('stdout')
|
||||
@ -32,6 +35,38 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
class I2CSlave():
|
||||
def __init__(self, address):
|
||||
self.address = address
|
||||
self.pi = pigpio.pi()
|
||||
if not self.pi.connected:
|
||||
logger.error("not pi.connected")
|
||||
return
|
||||
|
||||
def run(self, q):
|
||||
try:
|
||||
self.pi.bsc_i2c(self.address)
|
||||
if self.pi.wait_for_event(pigpio.EVENT_BSC, 5):
|
||||
s, b, d = self.pi.bsc_i2c(self.address)
|
||||
result = None
|
||||
if b:
|
||||
logger.debug(f"Received {b} bytes! Status {s}")
|
||||
result = [hex(c) for c in d]
|
||||
q.put(result)
|
||||
else:
|
||||
logger.error(f"Received number of bytes was {b}")
|
||||
else:
|
||||
logger.error("pi.wait_for_event timed out")
|
||||
except(KeyboardInterrupt):
|
||||
self.close()
|
||||
else:
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
self.pi.bsc_i2c(0)
|
||||
self.pi.stop()
|
||||
|
||||
|
||||
class I2CMaster:
|
||||
def __init__(self, address, bus):
|
||||
self.i = i2c_raw.I2CRaw(address=address, bus=bus)
|
||||
@ -56,7 +91,16 @@ if __name__ == "__main__":
|
||||
if args.loglevel:
|
||||
logger.setLevel(args.loglevel.upper())
|
||||
|
||||
q = queue.Queue()
|
||||
|
||||
slave = I2CSlave(address=0x40)
|
||||
slave_thread = threading.Thread(target=slave.run, args=[q])
|
||||
slave_thread.start()
|
||||
|
||||
master = I2CMaster(address=0x41, bus=1)
|
||||
if args.action:
|
||||
master.execute_action(args.action)
|
||||
master.close()
|
||||
|
||||
result = q.get()
|
||||
logger.info(f"Response: {result}")
|
||||
|
Loading…
Reference in New Issue
Block a user