Tidy up code/docstrings

This commit is contained in:
zvecr 2022-07-19 00:46:08 +01:00
parent c798395122
commit 819bf2eff8
6 changed files with 151 additions and 81 deletions

View File

@ -42,14 +42,14 @@ def _list_devices():
"""Dump out available devices """Dump out available devices
""" """
cli.log.info('Available devices:') cli.log.info('Available devices:')
devices = XAPClient.list() for dev in XAPClient.devices():
for dev in devices:
device = XAPClient().connect(dev) device = XAPClient().connect(dev)
ver = device.version()
data = device.info() cli.log.info(' %04x:%04x %s %s [API:%s]', dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], ver['xap'])
cli.log.info(' %04x:%04x %s %s [API:%s]', dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], data['_version']['xap'])
if cli.config.general.verbose: if cli.args.verbose:
data = device.info()
# TODO: better formatting like 'lsusb -v'? # TODO: better formatting like 'lsusb -v'?
print_dotted_output(data) print_dotted_output(data)
@ -188,6 +188,7 @@ class XAPShell(cmd.Cmd):
return False return False
@cli.argument('-v', '--verbose', arg_only=True, action='store_true', help='Turns on verbose output.')
@cli.argument('-d', '--device', help='device to select - uses format <pid>:<vid>.') @cli.argument('-d', '--device', help='device to select - uses format <pid>:<vid>.')
@cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available devices.') @cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available devices.')
@cli.argument('-i', '--interactive', arg_only=True, action='store_true', help='Start interactive shell.') @cli.argument('-i', '--interactive', arg_only=True, action='store_true', help='Start interactive shell.')
@ -200,7 +201,7 @@ def xap(cli):
return _list_devices() return _list_devices()
# Connect to first available device # Connect to first available device
devices = XAPClient.list() devices = XAPClient.devices()
if not devices: if not devices:
cli.log.error('No devices found!') cli.log.error('No devices found!')
return False return False

View File

@ -1,2 +1,5 @@
# Copyright 2022 QMK
# SPDX-License-Identifier: GPL-2.0-or-later
from .types import * # noqa: F403 from .types import * # noqa: F403
from .client import * # noqa: F403 from .client import * # noqa: F403
from .routes import * # noqa: F403

View File

@ -1,12 +1,17 @@
"""XAP Client # Copyright 2022 QMK
""" # SPDX-License-Identifier: GPL-2.0-or-later
import hid import hid
class XAPClient: class XAPClient:
"""XAP device discovery
"""
@staticmethod @staticmethod
def list(search=None): def devices(search: str = None) -> list[dict]:
"""Find compatible XAP devices """Find compatible XAP devices
Args:
search: optional search string to filter results by
""" """
def _is_xap_usage(x): def _is_xap_usage(x):
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058 return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058
@ -21,9 +26,11 @@ class XAPClient:
return list(devices) return list(devices)
def connect(self, dev): def connect(self, device: dict):
"""Connect to a given XAP device """Connect to a given XAP device
Args:
device: item from a previous `XAPClient.devices()` call
""" """
from .device import XAPDevice from .device import XAPDevice
return XAPDevice(dev) return XAPDevice(device)

View File

@ -1,5 +1,5 @@
"""XAP Device # Copyright 2022 QMK
""" # SPDX-License-Identifier: GPL-2.0-or-later
import hid import hid
import json import json
import time import time
@ -12,10 +12,15 @@ from platform import platform
from .types import XAPSecureStatus, XAPFlags, XAPRequest, XAPResponse from .types import XAPSecureStatus, XAPFlags, XAPRequest, XAPResponse
from .routes import XAPRoutes, XAPRouteError from .routes import XAPRoutes, XAPRouteError
from .util import u32toBCD
def _gen_token(): def _u32_to_bcd(val: bytes) -> str: # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
def _gen_token() -> bytes:
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF) """Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
""" """
token = random.randrange(0x0100, 0xFFFD) token = random.randrange(0x0100, 0xFFFD)
@ -24,33 +29,32 @@ def _gen_token():
return unpack('<H', pack('>H', token))[0] return unpack('<H', pack('>H', token))[0]
class XAPDevice: class XAPDeviceBase:
def __init__(self, dev): """Raw XAP interactions
"""
def __init__(self, dev: dict, timeout: int = 1.0):
"""Constructor opens hid device and starts dependent services """Constructor opens hid device and starts dependent services
""" """
self.responses = {} self.responses = {}
self.do_read = True self.timeout = timeout
self.running = True
self.dev = hid.Device(path=dev['path']) self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True) self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start() self.bg.start()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def close(self): def close(self):
self.do_read = False """Close device and stop dependent services
"""
self.running = False
time.sleep(1) time.sleep(1)
self.dev.close() self.dev.close()
def _read_loop(self): def _read_loop(self):
"""Background thread to signal waiting transactions """Background thread to signal waiting transactions
""" """
while self.do_read: while self.running:
data = self.dev.read(XAPResponse.fmt.size, 100) data = self.dev.read(XAPResponse.fmt.size, 100)
if data: if data:
r = XAPResponse.from_bytes(data) r = XAPResponse.from_bytes(data)
@ -59,35 +63,8 @@ class XAPDevice:
event._ret = data event._ret = data
event.set() event.set()
def _query_device_info(self): def transaction(self, *args) -> bytes | None:
datalen = int.from_bytes(self.transaction(XAPRoutes.QMK_CONFIG_BLOB_LEN) or bytes(0), 'little') """Request/Receive Helper
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(XAPRoutes.QMK_CONFIG_BLOB_CHUNK, offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
r = XAPResponse.from_bytes(event._ret)
return (r.flags, r.data[:r.length])
def _transaction(self, *args):
"""Request/Receive
""" """
# convert args to array of bytes # convert args to array of bytes
data = bytes() data = bytes()
@ -109,7 +86,7 @@ class XAPDevice:
buffer = b'\x00' + buffer buffer = b'\x00' + buffer
self.dev.write(buffer) self.dev.write(buffer)
event.wait(timeout=1) event.wait(timeout=self.timeout)
self.responses.pop(token, None) self.responses.pop(token, None)
if not hasattr(event, '_ret'): if not hasattr(event, '_ret'):
return None return None
@ -120,56 +97,124 @@ class XAPDevice:
return r.data[:r.length] return r.data[:r.length]
@functools.lru_cache def listen(self) -> dict:
def capability(self, route): """Receive a single 'broadcast' message
cap = int.from_bytes(self._transaction(route) or bytes(0), 'little') """
return cap token = 0xFFFF
event = threading.Event()
self.responses[token] = event
@functools.lru_cache # emulate a blocking read while allowing `ctrl+c` on windows
def subsystem(self): while not hasattr(event, '_ret'):
sub = int.from_bytes(self._transaction(XAPRoutes.XAP_SUBSYSTEM_QUERY) or bytes(0), 'little') event.wait(timeout=0.25)
return sub
@functools.lru_cache r = XAPResponse.from_bytes(event._ret)
def version(self): return (r.flags, r.data[:r.length])
xap = int.from_bytes(self._transaction(XAPRoutes.XAP_VERSION_QUERY) or bytes(0), 'little')
qmk = int.from_bytes(self._transaction(XAPRoutes.QMK_VERSION_QUERY) or bytes(0), 'little')
return {'xap': u32toBCD(xap), 'qmk': u32toBCD(qmk)}
def _ensure_route(self, route):
class XAPDevice(XAPDeviceBase):
"""XAP device interaction
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def _query_device_info(self) -> dict:
"""Helper to reconstruct info.json from requested chunks
"""
datalen = self.int_transaction(XAPRoutes.QMK_CONFIG_BLOB_LEN)
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(XAPRoutes.QMK_CONFIG_BLOB_CHUNK, offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def _ensure_route(self, route: bytes):
"""Check a route can be accessed
Raises:
XAPRouteError: Access to invalid route attempted
"""
# TODO: Remove assumption that capability is always xx01
(sub, rt) = route (sub, rt) = route
cap = bytes([sub, 1]) cap = bytes([sub, 1])
if self.subsystem() & (1 << sub) == 0: if self.subsystems() & (1 << sub) == 0:
raise XAPRouteError("subsystem not available") raise XAPRouteError("subsystem not available")
if self.capability(cap) & (1 << rt) == 0: if self.capability(cap) & (1 << rt) == 0:
raise XAPRouteError("route not available") raise XAPRouteError("route not available")
def transaction(self, route, *args): def transaction(self, route: bytes, *args):
"""Request/Receive to XAP device
Raises:
XAPRouteError: Access to invalid route attempted
"""
self._ensure_route(route) self._ensure_route(route)
return self._transaction(route, *args) return super().transaction(route, *args)
def int_transaction(self, route: bytes, *args):
"""transaction with int parsing
"""
return int.from_bytes(self.transaction(route, *args) or bytes(0), 'little')
@functools.lru_cache @functools.lru_cache
def info(self): def capability(self, route: bytes):
# use parent transaction as we want to ignore capability checks
return int.from_bytes(super().transaction(route) or bytes(0), 'little')
@functools.lru_cache
def subsystems(self):
# use parent transaction as we want to ignore capability checks
return int.from_bytes(super().transaction(XAPRoutes.XAP_SUBSYSTEM_QUERY) or bytes(0), 'little')
@functools.lru_cache
def version(self) -> dict:
"""Query version data from device
"""
xap = self.int_transaction(XAPRoutes.XAP_VERSION_QUERY)
qmk = self.int_transaction(XAPRoutes.QMK_VERSION_QUERY)
return {'xap': _u32_to_bcd(xap), 'qmk': _u32_to_bcd(qmk)}
@functools.lru_cache
def info(self) -> dict:
"""Query config data from device
"""
data = self._query_device_info() data = self._query_device_info()
data['_id'] = self.transaction(XAPRoutes.QMK_HARDWARE_ID) data['_id'] = self.transaction(XAPRoutes.QMK_HARDWARE_ID)
data['_version'] = self.version() data['_version'] = self.version()
return data return data
def status(self): def status(self) -> dict:
lock = int.from_bytes(self.transaction(XAPRoutes.XAP_SECURE_STATUS) or bytes(0), 'little') """Query current device state
"""
lock = self.int_transaction(XAPRoutes.XAP_SECURE_STATUS)
data = {} data = {}
data['lock'] = XAPSecureStatus(lock).name data['lock'] = XAPSecureStatus(lock).name
return data return data
def unlock(self): def unlock(self):
"""Initiate unlock procedure
"""
self.transaction(XAPRoutes.XAP_SECURE_UNLOCK) self.transaction(XAPRoutes.XAP_SECURE_UNLOCK)
def lock(self): def lock(self):
"""Lock device
"""
self.transaction(XAPRoutes.XAP_SECURE_LOCK) self.transaction(XAPRoutes.XAP_SECURE_LOCK)
def reset(self): def reset(self):
status = int.from_bytes(self.transaction(XAPRoutes.QMK_BOOTLOADER_JUMP) or bytes(0), 'little') """Request device reboot to bootloader - Requires previous unlock
"""
status = self.int_transaction(XAPRoutes.QMK_BOOTLOADER_JUMP)
return status == 1 return status == 1

View File

@ -0,0 +1,18 @@
# XAP python3 bindings
## Example
```python
from xap_client import XAPClient
# List Available Devices
devices = XAPClient.devices()
selected = devices[0]
# Connect then run commands
with XAPClient().connect(selected) as dev:
print(dev.version())
```
## API
TODO

View File

@ -1,4 +0,0 @@
def u32toBCD(val): # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'