feat(itho-wpu): move itho interaction to an IthoWPU class
To be able to easily execute multiple actions.
This commit is contained in:
		
							parent
							
								
									8b34069fc6
								
							
						
					
					
						commit
						0b4b9a3d79
					
				
					 1 changed files with 33 additions and 20 deletions
				
			
		
							
								
								
									
										53
									
								
								itho-wpu.py
									
										
									
									
									
								
							
							
						
						
									
										53
									
								
								itho-wpu.py
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -162,6 +162,35 @@ class I2CMaster:
 | 
			
		|||
        self.i.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IthoWPU():
 | 
			
		||||
    def __init__(self, master_only, slave_only, slave_timeout):
 | 
			
		||||
        self.master_only = master_only
 | 
			
		||||
        self.slave_only = slave_only
 | 
			
		||||
        self.slave_timeout = slave_timeout
 | 
			
		||||
        self._q = queue.Queue()
 | 
			
		||||
 | 
			
		||||
    def get(self, action):
 | 
			
		||||
        reponse = None
 | 
			
		||||
 | 
			
		||||
        if not self.master_only:
 | 
			
		||||
            slave = I2CSlave(address=0x40, queue=self._q)
 | 
			
		||||
            slave.set_callback()
 | 
			
		||||
            if self.slave_only:
 | 
			
		||||
                time.sleep(self.slave_timeout)
 | 
			
		||||
 | 
			
		||||
        if not self.slave_only:
 | 
			
		||||
            master = I2CMaster(address=0x41, bus=1, queue=self._q)
 | 
			
		||||
            if action:
 | 
			
		||||
                response = master.execute_action(action)
 | 
			
		||||
                logger.debug(f"Response: {response}")
 | 
			
		||||
            master.close()
 | 
			
		||||
 | 
			
		||||
        if not self.master_only:
 | 
			
		||||
            slave.close()
 | 
			
		||||
 | 
			
		||||
        return response
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_messageclass_valid(action, response):
 | 
			
		||||
    if int(response[1], 0) != actions[action][0] and int(response[2], 0) != actions[action][1]:
 | 
			
		||||
        logger.error(f"Response MessageClass != {actions[action][0]} {actions[action][1]} "
 | 
			
		||||
| 
						 | 
				
			
			@ -275,23 +304,7 @@ if __name__ == "__main__":
 | 
			
		|||
    if args.loglevel:
 | 
			
		||||
        logger.setLevel(args.loglevel.upper())
 | 
			
		||||
 | 
			
		||||
    q = queue.Queue()
 | 
			
		||||
 | 
			
		||||
    if not args.master_only:
 | 
			
		||||
        slave = I2CSlave(address=0x40, queue=q)
 | 
			
		||||
        slave.set_callback()
 | 
			
		||||
        if args.slave_only:
 | 
			
		||||
            time.sleep(args.slave_timeout)
 | 
			
		||||
 | 
			
		||||
    if not args.slave_only:
 | 
			
		||||
        master = I2CMaster(address=0x41, bus=1, queue=q)
 | 
			
		||||
        if args.action:
 | 
			
		||||
            result = master.execute_action(args.action)
 | 
			
		||||
            logger.debug(f"Response: {result}")
 | 
			
		||||
        master.close()
 | 
			
		||||
 | 
			
		||||
        if result is not None:
 | 
			
		||||
            process_response(args.action, result, args)
 | 
			
		||||
 | 
			
		||||
    if not args.master_only:
 | 
			
		||||
        slave.close()
 | 
			
		||||
    wpu = IthoWPU(args.master_only, args.slave_only, args.slave_timeout)
 | 
			
		||||
    response = wpu.get(args.action)
 | 
			
		||||
    if response is not None:
 | 
			
		||||
        process_response(args.action, response, args)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue