qmk_firmware/lib/python/xap_client/device.py

184 lines
5.3 KiB
Python
Raw Normal View History

2022-07-17 00:28:54 +00:00
"""XAP Device
2022-07-06 16:10:04 +00:00
"""
2022-07-17 00:28:54 +00:00
import hid
2022-07-06 16:10:04 +00:00
import json
2022-07-17 21:02:18 +00:00
import time
2022-07-06 16:10:04 +00:00
import gzip
2022-07-17 21:02:18 +00:00
import random
2022-07-06 16:10:04 +00:00
import threading
import functools
from struct import Struct, pack, unpack
from collections import namedtuple
from platform import platform
2022-07-17 00:28:54 +00:00
from .types import XAPSecureStatus, XAPFlags, XAPRouteError
2022-07-17 21:02:18 +00:00
from .routes import XAPRoutes
from .util import u32toBCD
2022-07-17 00:28:54 +00:00
2022-07-06 16:10:04 +00:00
RequestPacket = namedtuple('RequestPacket', 'token length data')
RequestStruct = Struct('<HB61s')
ResponsePacket = namedtuple('ResponsePacket', 'token flags length data')
ResponseStruct = Struct('<HBB60s')
def _gen_token():
2022-07-06 23:57:59 +00:00
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
2022-07-06 16:10:04 +00:00
"""
2022-07-06 23:57:59 +00:00
token = random.randrange(0x0100, 0xFFFD)
2022-07-06 16:10:04 +00:00
# swap endianness
return unpack('<H', pack('>H', token))[0]
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
2022-07-17 21:02:18 +00:00
self.do_read = True
2022-07-06 16:10:04 +00:00
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
2022-07-17 21:02:18 +00:00
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def close(self):
self.do_read = False
time.sleep(1)
self.dev.close()
2022-07-06 16:10:04 +00:00
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
2022-07-17 21:02:18 +00:00
while self.do_read:
2022-07-08 23:21:34 +00:00
array_alpha = self.dev.read(ResponseStruct.size, 100)
2022-07-06 16:10:04 +00:00
if array_alpha:
token = int.from_bytes(array_alpha[:2], 'little')
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
2022-07-17 21:02:18 +00:00
datalen = int.from_bytes(self.transaction(XAPRoutes.QMK_CONFIG_BLOB_LEN) or bytes(0), 'little')
2022-07-06 16:10:04 +00:00
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
2022-07-17 21:02:18 +00:00
chunk = self.transaction(XAPRoutes.QMK_CONFIG_BLOB_CHUNK, offset)
2022-07-06 16:10:04 +00:00
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
2022-07-13 02:01:03 +00:00
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
2022-07-06 16:10:04 +00:00
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
return (r.flags, r.data[:r.length])
2022-07-13 02:01:03 +00:00
def _transaction(self, *args):
2022-07-06 16:10:04 +00:00
"""Request/Receive
"""
# convert args to array of bytes
data = bytes()
for arg in args:
if isinstance(arg, (bytes, bytearray)):
data += arg
if isinstance(arg, int): # TODO: remove terrible assumption of u16
data += arg.to_bytes(2, byteorder='little')
token = _gen_token()
p = RequestPacket(token, len(data), data)
buffer = RequestStruct.pack(*list(p))
event = threading.Event()
self.responses[token] = event
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b'\x00' + buffer
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(token, None)
if not hasattr(event, '_ret'):
return None
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
2022-07-13 22:49:55 +00:00
if r.flags & XAPFlags.SUCCESS == 0:
2022-07-06 16:10:04 +00:00
return None
return r.data[:r.length]
2022-07-13 02:01:03 +00:00
@functools.lru_cache
def capability(self, route):
cap = int.from_bytes(self._transaction(route) or bytes(0), 'little')
return cap
@functools.lru_cache
def subsystem(self):
2022-07-17 21:02:18 +00:00
sub = int.from_bytes(self._transaction(XAPRoutes.XAP_SUBSYSTEM_QUERY) or bytes(0), 'little')
2022-07-13 02:01:03 +00:00
return sub
@functools.lru_cache
2022-07-06 16:10:04 +00:00
def version(self):
2022-07-17 21:02:18 +00:00
xap = int.from_bytes(self._transaction(XAPRoutes.XAP_VERSION_QUERY) or bytes(0), 'little')
qmk = int.from_bytes(self._transaction(XAPRoutes.QMK_VERSION_QUERY) or bytes(0), 'little')
return {'xap': u32toBCD(xap), 'qmk': u32toBCD(qmk)}
2022-07-06 16:10:04 +00:00
2022-07-13 02:01:03 +00:00
def _ensure_route(self, route):
(sub, rt) = route
cap = bytes([sub, 1])
if self.subsystem() & (1 << sub) == 0:
raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available")
def transaction(self, route, *args):
self._ensure_route(route)
return self._transaction(route, *args)
@functools.lru_cache
2022-07-06 16:10:04 +00:00
def info(self):
data = self._query_device_info()
2022-07-17 21:02:18 +00:00
data['_id'] = self.transaction(XAPRoutes.QMK_HARDWARE_ID)
data['_version'] = self.version()
2022-07-06 16:10:04 +00:00
return data
2022-07-07 00:57:41 +00:00
def status(self):
2022-07-17 21:02:18 +00:00
lock = int.from_bytes(self.transaction(XAPRoutes.XAP_SECURE_STATUS) or bytes(0), 'little')
2022-07-07 00:57:41 +00:00
data = {}
data['lock'] = XAPSecureStatus(lock).name
return data
2022-07-06 16:10:04 +00:00
def unlock(self):
2022-07-17 21:02:18 +00:00
self.transaction(XAPRoutes.XAP_SECURE_UNLOCK)
2022-07-06 16:10:04 +00:00
2022-07-07 00:57:41 +00:00
def lock(self):
2022-07-17 21:02:18 +00:00
self.transaction(XAPRoutes.XAP_SECURE_LOCK)
2022-07-07 00:57:41 +00:00
def reset(self):
2022-07-17 21:02:18 +00:00
status = int.from_bytes(self.transaction(XAPRoutes.QMK_BOOTLOADER_JUMP) or bytes(0), 'little')
2022-07-07 00:57:41 +00:00
return status == 1