qmk_firmware/lib/python/xap_client/device.py

175 lines
4.9 KiB
Python
Raw Normal View History

2022-07-17 00:28:54 +00:00
"""XAP Device
2022-07-06 16:10:04 +00:00
"""
2022-07-17 00:28:54 +00:00
import hid
2022-07-06 16:10:04 +00:00
import json
import random
import gzip
import threading
import functools
from struct import Struct, pack, unpack
from collections import namedtuple
from platform import platform
2022-07-17 00:28:54 +00:00
from .types import XAPSecureStatus, XAPFlags, XAPRouteError
2022-07-17 01:58:14 +00:00
from .routes import XAP_VERSION_QUERY
2022-07-17 00:28:54 +00:00
2022-07-06 16:10:04 +00:00
RequestPacket = namedtuple('RequestPacket', 'token length data')
RequestStruct = Struct('<HB61s')
ResponsePacket = namedtuple('ResponsePacket', 'token flags length data')
ResponseStruct = Struct('<HBB60s')
def _gen_token():
2022-07-06 23:57:59 +00:00
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
2022-07-06 16:10:04 +00:00
"""
2022-07-06 23:57:59 +00:00
token = random.randrange(0x0100, 0xFFFD)
2022-07-06 16:10:04 +00:00
# swap endianness
return unpack('<H', pack('>H', token))[0]
def _u32toBCD(val): # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
while 1:
2022-07-08 23:21:34 +00:00
array_alpha = self.dev.read(ResponseStruct.size, 100)
2022-07-06 16:10:04 +00:00
if array_alpha:
token = int.from_bytes(array_alpha[:2], 'little')
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
datalen = int.from_bytes(self.transaction(b'\x01\x05') or bytes(0), 'little')
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(b'\x01\x06', offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
2022-07-13 02:01:03 +00:00
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
2022-07-06 16:10:04 +00:00
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
return (r.flags, r.data[:r.length])
2022-07-13 02:01:03 +00:00
def _transaction(self, *args):
2022-07-06 16:10:04 +00:00
"""Request/Receive
"""
# convert args to array of bytes
data = bytes()
for arg in args:
if isinstance(arg, (bytes, bytearray)):
data += arg
if isinstance(arg, int): # TODO: remove terrible assumption of u16
data += arg.to_bytes(2, byteorder='little')
token = _gen_token()
p = RequestPacket(token, len(data), data)
buffer = RequestStruct.pack(*list(p))
event = threading.Event()
self.responses[token] = event
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b'\x00' + buffer
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(token, None)
if not hasattr(event, '_ret'):
return None
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
2022-07-13 22:49:55 +00:00
if r.flags & XAPFlags.SUCCESS == 0:
2022-07-06 16:10:04 +00:00
return None
return r.data[:r.length]
2022-07-13 02:01:03 +00:00
@functools.lru_cache
def capability(self, route):
cap = int.from_bytes(self._transaction(route) or bytes(0), 'little')
return cap
@functools.lru_cache
def subsystem(self):
sub = int.from_bytes(self._transaction(b'\x00\x02') or bytes(0), 'little')
return sub
@functools.lru_cache
2022-07-06 16:10:04 +00:00
def version(self):
2022-07-17 01:58:14 +00:00
ver = int.from_bytes(self._transaction(XAP_VERSION_QUERY) or bytes(0), 'little')
2022-07-06 16:10:04 +00:00
return {'xap': _u32toBCD(ver)}
2022-07-13 02:01:03 +00:00
def _ensure_route(self, route):
(sub, rt) = route
cap = bytes([sub, 1])
if self.subsystem() & (1 << sub) == 0:
raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available")
def transaction(self, route, *args):
self._ensure_route(route)
return self._transaction(route, *args)
@functools.lru_cache
2022-07-06 16:10:04 +00:00
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(b'\x01\x08')
data['xap'] = self.version()['xap']
return data
2022-07-07 00:57:41 +00:00
def status(self):
lock = int.from_bytes(self.transaction(b'\x00\x03') or bytes(0), 'little')
data = {}
data['lock'] = XAPSecureStatus(lock).name
return data
2022-07-06 16:10:04 +00:00
def unlock(self):
self.transaction(b'\x00\x04')
2022-07-07 00:57:41 +00:00
def lock(self):
self.transaction(b'\x00\x05')
def reset(self):
status = int.from_bytes(self.transaction(b'\x01\x07') or bytes(0), 'little')
return status == 1