qmk_firmware/lib/python/xap_client/device.py

224 lines
6.5 KiB
Python
Raw Normal View History

2022-07-18 23:46:08 +00:00
# Copyright 2022 QMK
# SPDX-License-Identifier: GPL-2.0-or-later
2022-07-06 16:10:04 +00:00
import json
2022-07-17 21:02:18 +00:00
import time
2022-07-06 16:10:04 +00:00
import gzip
2022-07-17 21:02:18 +00:00
import random
2022-07-06 16:10:04 +00:00
import threading
import functools
2022-08-30 17:47:55 +00:00
from typing import Optional
2022-07-17 22:04:35 +00:00
from struct import pack, unpack
2022-07-06 16:10:04 +00:00
from platform import platform
2022-07-17 22:04:35 +00:00
from .types import XAPSecureStatus, XAPFlags, XAPRequest, XAPResponse
from .routes import XAPRoutes, XAPRouteError
2022-07-17 00:28:54 +00:00
2022-07-06 16:10:04 +00:00
2022-07-18 23:46:08 +00:00
def _u32_to_bcd(val: bytes) -> str: # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
def _gen_token() -> bytes:
2022-07-06 23:57:59 +00:00
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
2022-07-06 16:10:04 +00:00
"""
2022-07-06 23:57:59 +00:00
token = random.randrange(0x0100, 0xFFFD)
2022-07-06 16:10:04 +00:00
# swap endianness
return unpack('<H', pack('>H', token))[0]
2022-07-18 23:46:08 +00:00
class XAPDeviceBase:
"""Raw XAP interactions
"""
def __init__(self, dev: dict, timeout: int = 1.0):
2022-07-06 16:10:04 +00:00
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
2022-07-18 23:46:08 +00:00
self.timeout = timeout
self.running = True
2022-07-06 16:10:04 +00:00
2022-09-06 16:33:19 +00:00
# lazy import to avoid compile issues
import hid
2022-07-06 16:10:04 +00:00
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
2022-07-17 21:02:18 +00:00
def close(self):
2022-07-18 23:46:08 +00:00
"""Close device and stop dependent services
"""
self.running = False
2022-07-17 21:02:18 +00:00
time.sleep(1)
self.dev.close()
2022-07-06 16:10:04 +00:00
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
2022-07-18 23:46:08 +00:00
while self.running:
2022-07-17 22:04:35 +00:00
data = self.dev.read(XAPResponse.fmt.size, 100)
if data:
r = XAPResponse.from_bytes(data)
event = self.responses.get(r.token)
2022-07-06 16:10:04 +00:00
if event:
2022-07-17 22:04:35 +00:00
event._ret = data
2022-07-06 16:10:04 +00:00
event.set()
2022-08-30 17:47:55 +00:00
def transaction(self, *args) -> Optional[bytes]:
2022-07-18 23:46:08 +00:00
"""Request/Receive Helper
2022-07-06 16:10:04 +00:00
"""
# convert args to array of bytes
data = bytes()
for arg in args:
if isinstance(arg, (bytes, bytearray)):
data += arg
if isinstance(arg, int): # TODO: remove terrible assumption of u16
data += arg.to_bytes(2, byteorder='little')
token = _gen_token()
2022-07-17 22:04:35 +00:00
buffer = XAPRequest(token, len(data), data).to_bytes()
2022-07-06 16:10:04 +00:00
event = threading.Event()
self.responses[token] = event
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b'\x00' + buffer
self.dev.write(buffer)
2022-07-18 23:46:08 +00:00
event.wait(timeout=self.timeout)
2022-07-06 16:10:04 +00:00
self.responses.pop(token, None)
if not hasattr(event, '_ret'):
return None
2022-07-17 22:04:35 +00:00
r = XAPResponse.from_bytes(event._ret)
2022-07-13 22:49:55 +00:00
if r.flags & XAPFlags.SUCCESS == 0:
2022-07-06 16:10:04 +00:00
return None
return r.data[:r.length]
2022-07-18 23:46:08 +00:00
def listen(self) -> dict:
"""Receive a single 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
2022-07-13 02:01:03 +00:00
2022-07-18 23:46:08 +00:00
# emulate a blocking read while allowing `ctrl+c` on windows
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
2022-07-13 02:01:03 +00:00
2022-07-18 23:46:08 +00:00
r = XAPResponse.from_bytes(event._ret)
return (r.flags, r.data[:r.length])
class XAPDevice(XAPDeviceBase):
"""XAP device interaction
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def _query_device_info(self) -> dict:
"""Helper to reconstruct info.json from requested chunks
"""
datalen = self.int_transaction(XAPRoutes.QMK_CONFIG_BLOB_LEN)
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(XAPRoutes.QMK_CONFIG_BLOB_CHUNK, offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
2022-07-06 16:10:04 +00:00
2022-07-18 23:46:08 +00:00
def _ensure_route(self, route: bytes):
"""Check a route can be accessed
Raises:
XAPRouteError: Access to invalid route attempted
"""
# TODO: Remove assumption that capability is always xx01
2022-07-13 02:01:03 +00:00
(sub, rt) = route
cap = bytes([sub, 1])
2022-07-18 23:46:08 +00:00
if self.subsystems() & (1 << sub) == 0:
2022-07-13 02:01:03 +00:00
raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available")
2022-07-18 23:46:08 +00:00
def transaction(self, route: bytes, *args):
"""Request/Receive to XAP device
Raises:
XAPRouteError: Access to invalid route attempted
"""
2022-07-13 02:01:03 +00:00
self._ensure_route(route)
2022-07-18 23:46:08 +00:00
return super().transaction(route, *args)
def int_transaction(self, route: bytes, *args):
"""transaction with int parsing
"""
return int.from_bytes(self.transaction(route, *args) or bytes(0), 'little')
@functools.lru_cache
def capability(self, route: bytes):
# use parent transaction as we want to ignore capability checks
return int.from_bytes(super().transaction(route) or bytes(0), 'little')
2022-07-13 02:01:03 +00:00
@functools.lru_cache
2022-07-18 23:46:08 +00:00
def subsystems(self):
# use parent transaction as we want to ignore capability checks
return int.from_bytes(super().transaction(XAPRoutes.XAP_SUBSYSTEM_QUERY) or bytes(0), 'little')
@functools.lru_cache
def version(self) -> dict:
"""Query version data from device
"""
xap = self.int_transaction(XAPRoutes.XAP_VERSION_QUERY)
qmk = self.int_transaction(XAPRoutes.QMK_VERSION_QUERY)
return {'xap': _u32_to_bcd(xap), 'qmk': _u32_to_bcd(qmk)}
@functools.lru_cache
def info(self) -> dict:
"""Query config data from device
"""
2022-07-06 16:10:04 +00:00
data = self._query_device_info()
2022-07-17 21:02:18 +00:00
data['_id'] = self.transaction(XAPRoutes.QMK_HARDWARE_ID)
data['_version'] = self.version()
2022-07-06 16:10:04 +00:00
return data
2022-07-18 23:46:08 +00:00
def status(self) -> dict:
"""Query current device state
"""
lock = self.int_transaction(XAPRoutes.XAP_SECURE_STATUS)
2022-07-07 00:57:41 +00:00
data = {}
data['lock'] = XAPSecureStatus(lock).name
return data
2022-07-06 16:10:04 +00:00
def unlock(self):
2022-07-18 23:46:08 +00:00
"""Initiate unlock procedure
"""
2022-07-17 21:02:18 +00:00
self.transaction(XAPRoutes.XAP_SECURE_UNLOCK)
2022-07-06 16:10:04 +00:00
2022-07-07 00:57:41 +00:00
def lock(self):
2022-07-18 23:46:08 +00:00
"""Lock device
"""
2022-07-17 21:02:18 +00:00
self.transaction(XAPRoutes.XAP_SECURE_LOCK)
2022-07-07 00:57:41 +00:00
def reset(self):
2022-07-18 23:46:08 +00:00
"""Request device reboot to bootloader - Requires previous unlock
"""
status = self.int_transaction(XAPRoutes.QMK_BOOTLOADER_JUMP)
2022-07-07 00:57:41 +00:00
return status == 1