mirror of
https://github.com/pommi/python-itho-wpu.git
synced 2024-11-21 13:52:15 +01:00
feat(itho-wpu): basic slave functionality
This uses the pigpio library as I'm not quite sure yet how to read this data directly from /dev/i2c-1, without polling and high CPU usage.
This commit is contained in:
parent
91913c4fe7
commit
15c6103820
44
itho-wpu.py
44
itho-wpu.py
@ -3,7 +3,10 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import i2c_raw
|
import i2c_raw
|
||||||
import logging
|
import logging
|
||||||
|
import pigpio
|
||||||
|
import queue
|
||||||
import sys
|
import sys
|
||||||
|
import threading
|
||||||
|
|
||||||
consolelogformatter = logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s")
|
consolelogformatter = logging.Formatter("%(asctime)-15s %(levelname)s: %(message)s")
|
||||||
logger = logging.getLogger('stdout')
|
logger = logging.getLogger('stdout')
|
||||||
@ -32,6 +35,38 @@ def parse_args():
|
|||||||
return args
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
class I2CSlave():
|
||||||
|
def __init__(self, address):
|
||||||
|
self.address = address
|
||||||
|
self.pi = pigpio.pi()
|
||||||
|
if not self.pi.connected:
|
||||||
|
logger.error("not pi.connected")
|
||||||
|
return
|
||||||
|
|
||||||
|
def run(self, q):
|
||||||
|
try:
|
||||||
|
self.pi.bsc_i2c(self.address)
|
||||||
|
if self.pi.wait_for_event(pigpio.EVENT_BSC, 5):
|
||||||
|
s, b, d = self.pi.bsc_i2c(self.address)
|
||||||
|
result = None
|
||||||
|
if b:
|
||||||
|
logger.debug(f"Received {b} bytes! Status {s}")
|
||||||
|
result = [hex(c) for c in d]
|
||||||
|
q.put(result)
|
||||||
|
else:
|
||||||
|
logger.error(f"Received number of bytes was {b}")
|
||||||
|
else:
|
||||||
|
logger.error("pi.wait_for_event timed out")
|
||||||
|
except(KeyboardInterrupt):
|
||||||
|
self.close()
|
||||||
|
else:
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.pi.bsc_i2c(0)
|
||||||
|
self.pi.stop()
|
||||||
|
|
||||||
|
|
||||||
class I2CMaster:
|
class I2CMaster:
|
||||||
def __init__(self, address, bus):
|
def __init__(self, address, bus):
|
||||||
self.i = i2c_raw.I2CRaw(address=address, bus=bus)
|
self.i = i2c_raw.I2CRaw(address=address, bus=bus)
|
||||||
@ -56,7 +91,16 @@ if __name__ == "__main__":
|
|||||||
if args.loglevel:
|
if args.loglevel:
|
||||||
logger.setLevel(args.loglevel.upper())
|
logger.setLevel(args.loglevel.upper())
|
||||||
|
|
||||||
|
q = queue.Queue()
|
||||||
|
|
||||||
|
slave = I2CSlave(address=0x40)
|
||||||
|
slave_thread = threading.Thread(target=slave.run, args=[q])
|
||||||
|
slave_thread.start()
|
||||||
|
|
||||||
master = I2CMaster(address=0x41, bus=1)
|
master = I2CMaster(address=0x41, bus=1)
|
||||||
if args.action:
|
if args.action:
|
||||||
master.execute_action(args.action)
|
master.execute_action(args.action)
|
||||||
master.close()
|
master.close()
|
||||||
|
|
||||||
|
result = q.get()
|
||||||
|
logger.info(f"Response: {result}")
|
||||||
|
Loading…
Reference in New Issue
Block a user