XAP Client - Ensure route exists

This commit is contained in:
zvecr 2022-07-13 03:01:03 +01:00
parent 628bc60292
commit 98fd483611

View File

@ -52,6 +52,10 @@ class XAPEventType(IntEnum):
USER = 0x03
class XAPRouteError(Exception):
pass
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
@ -96,12 +100,13 @@ class XAPDevice:
event = threading.Event()
self.responses[token] = event
event.wait()
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
return (r.flags, r.data[:r.length])
def transaction(self, *args):
def _transaction(self, *args):
"""Request/Receive
"""
# convert args to array of bytes
@ -136,12 +141,36 @@ class XAPDevice:
return r.data[:r.length]
@functools.cache
@functools.lru_cache
def capability(self, route):
cap = int.from_bytes(self._transaction(route) or bytes(0), 'little')
return cap
@functools.lru_cache
def subsystem(self):
sub = int.from_bytes(self._transaction(b'\x00\x02') or bytes(0), 'little')
return sub
@functools.lru_cache
def version(self):
ver = int.from_bytes(self.transaction(b'\x00\x00') or bytes(0), 'little')
ver = int.from_bytes(self._transaction(b'\x00\x00') or bytes(0), 'little')
return {'xap': _u32toBCD(ver)}
@functools.cache
def _ensure_route(self, route):
(sub, rt) = route
cap = bytes([sub, 1])
if self.subsystem() & (1 << sub) == 0:
raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available")
def transaction(self, route, *args):
self._ensure_route(route)
return self._transaction(route, *args)
@functools.lru_cache
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(b'\x01\x08')