Refactor xap client

This commit is contained in:
zvecr 2022-06-21 13:27:53 +01:00
parent 90fc901624
commit c22fedb5b2

View File

@ -4,6 +4,9 @@ import cmd
import json import json
import random import random
import gzip import gzip
import threading
import functools
from enum import IntFlag
from platform import platform from platform import platform
from milc import cli from milc import cli
@ -14,21 +17,179 @@ from qmk.xap.common import get_xap_keycodes
KEYCODE_MAP = get_xap_keycodes('latest') KEYCODE_MAP = get_xap_keycodes('latest')
def _is_xap_usage(x): def _u32toBCD(val): # noqa: N802
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058 """Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
def _is_filtered_device(x): class XAPFlags(IntFlag):
name = "%04x:%04x" % (x['vendor_id'], x['product_id']) SUCCESS = 0x01
return name.lower().startswith(cli.args.device.lower())
def _search(): class XAPDevice:
devices = filter(_is_xap_usage, hid.enumerate()) def __init__(self, dev):
if cli.args.device: """Constructor opens hid device and starts dependent services
devices = filter(_is_filtered_device, devices) """
self.responses = {}
return list(devices) self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
while 1:
array_alpha = self.dev.read(64, 100)
if array_alpha:
token = str(array_alpha[:2])
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
datalen = int.from_bytes(self.transaction(0x01, 0x05) or bytes(0), "little")
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(0x01, 0x06, offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a "broadcast" message
"""
token = b"\xFF\xFF"
event = threading.Event()
self.responses[str(token)] = event
event.wait()
return event._ret
def transaction(self, sub, route, *args):
"""Request/Receive
"""
# token cannot start with zero or be FFFF
token = random.randrange(0x0100, 0xFFFE).to_bytes(2, byteorder='big')
# send with padding
# TODO: this code is total garbage
args_data = []
args_len = 2
if len(args) == 1:
if isinstance(args[0], (bytes, bytearray)):
args_len += len(args[0])
args_data = args[0]
else:
args_len += 2
args_data = args[0].to_bytes(2, byteorder='little')
padding_len = 64 - 3 - args_len
padding = b"\x00" * padding_len
if args_data:
padding = args_data + padding
buffer = token + args_len.to_bytes(1, byteorder='little') + sub.to_bytes(1, byteorder='little') + route.to_bytes(1, byteorder='little') + padding
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b"\x00" + buffer
event = threading.Event()
self.responses[str(token)] = event
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(str(token), None)
if not hasattr(event, '_ret'):
return None
array_alpha = event._ret
if int(array_alpha[2]) != XAPFlags.SUCCESS:
return None
payload_len = int(array_alpha[3])
return array_alpha[4:4 + payload_len]
@functools.cache
def version(self):
ver = int.from_bytes(self.transaction(0x00, 0x00) or bytes(0), 'little')
return {'xap': _u32toBCD(ver)}
@functools.cache
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(0x01, 0x08)
data['xap'] = self.version()['xap']
return data
def unlock(self):
self.transaction(0x00, 0x04)
class XAPClient:
@staticmethod
def _lazy_imports():
# Lazy load to avoid missing dependency issues
global hid
import hid
@staticmethod
def list(search=None):
"""Find compatible XAP devices
"""
XAPClient._lazy_imports()
def _is_xap_usage(x):
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058
def _is_filtered_device(x):
name = "%04x:%04x" % (x['vendor_id'], x['product_id'])
return name.lower().startswith(search.lower())
devices = filter(_is_xap_usage, hid.enumerate())
if search:
devices = filter(_is_filtered_device, devices)
return list(devices)
def connect(self, dev):
"""Connect to a given XAP device
"""
XAPClient._lazy_imports()
return XAPDevice(dev)
# def _query_device_secure(device):
# secure = int.from_bytes(_xap_transaction(device, 0x00, 0x03), 'little')
# secure = 'unlocked' if secure == 2 else 'LOCKED'
# return {'secure': secure}
#
# def xap_dummy(device):
# # get layer count
# layers = _xap_transaction(device, 0x04, 0x02)
# layers = int.from_bytes(layers, "little")
# print(f'layers:{layers}')
# # get keycode [layer:0, row:0, col:0]
# # keycode = _xap_transaction(device, 0x04, 0x03, b"\x00\x00\x00")
# # get encoder [layer:0, index:0, clockwise:0]
# keycode = _xap_transaction(device, 0x04, 0x04, b"\x00\x00\x00")
# keycode = int.from_bytes(keycode, "little")
# print(f'keycode:{KEYCODE_MAP.get(keycode, "unknown")}[{keycode}]')
# # set encoder [layer:0, index:0, clockwise:0, keycode:KC_A]
# _xap_transaction(device, 0x05, 0x04, b"\x00\x00\x00\x04\00")
def print_dotted_output(kb_info_json, prefix=''): def print_dotted_output(kb_info_json, prefix=''):
@ -52,154 +213,27 @@ def print_dotted_output(kb_info_json, prefix=''):
for index, item in enumerate(data, start=0): for index, item in enumerate(data, start=0):
cli.echo(' {fg_blue}%s.%s{fg_reset}: %s', new_prefix, index, str(item)) cli.echo(' {fg_blue}%s.%s{fg_reset}: %s', new_prefix, index, str(item))
else: else:
cli.echo(' {fg_blue}%s{fg_reset}: %s', new_prefix, ', '.join(sorted(map(str, data)))) cli.echo(' {fg_blue}%s{fg_reset}: %s', new_prefix, ', '.join(map(str, data)))
else: else:
cli.echo(' {fg_blue}%s{fg_reset}: %s', new_prefix, kb_info_json[key]) cli.echo(' {fg_blue}%s{fg_reset}: %s', new_prefix, kb_info_json[key])
def _xap_transaction(device, sub, route, *args):
# gen token
tok = random.getrandbits(16)
token = tok.to_bytes(2, byteorder='little')
# send with padding
# TODO: this code is total garbage
args_data = []
args_len = 2
if len(args) == 1:
if isinstance(args[0], (bytes, bytearray)):
args_len += len(args[0])
args_data = args[0]
else:
args_len += 2
args_data = args[0].to_bytes(2, byteorder='little')
padding_len = 64 - 3 - args_len
padding = b"\x00" * padding_len
if args_data:
padding = args_data + padding
buffer = token + args_len.to_bytes(1, byteorder='little') + sub.to_bytes(1, byteorder='little') + route.to_bytes(1, byteorder='little') + padding
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b"\x00" + buffer
device.write(buffer)
# get resp
array_alpha = device.read(64, 250)
# validate tok sent == resp
if str(token) != str(array_alpha[:2]):
return None
if int(array_alpha[2]) != 0x01:
return None
payload_len = int(array_alpha[3])
return array_alpha[4:4 + payload_len]
def _query_device(device):
ver_data = _xap_transaction(device, 0x00, 0x00)
if not ver_data:
return {'xap': 'UNKNOWN', 'secure': 'UNKNOWN'}
# to u32 to BCD string
a = (ver_data[3] << 24) + (ver_data[2] << 16) + (ver_data[1] << 8) + (ver_data[0])
ver = f'{a>>24}.{a>>16 & 0xFF}.{a & 0xFFFF}'
secure = int.from_bytes(_xap_transaction(device, 0x00, 0x03), 'little')
secure = 'unlocked' if secure == 2 else 'LOCKED'
return {'xap': ver, 'secure': secure}
def _query_device_id(device):
return _xap_transaction(device, 0x01, 0x08)
def _query_device_info_len(device):
len_data = _xap_transaction(device, 0x01, 0x05)
if not len_data:
return 0
# to u32
return (len_data[3] << 24) + (len_data[2] << 16) + (len_data[1] << 8) + (len_data[0])
def _query_device_info_chunk(device, offset):
return _xap_transaction(device, 0x01, 0x06, offset)
def _query_device_info(device):
datalen = _query_device_info_len(device)
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
data += _query_device_info_chunk(device, offset)
offset += 32
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def _list_devices(): def _list_devices():
"""Dump out available devices """Dump out available devices
""" """
cli.log.info('Available devices:') cli.log.info('Available devices:')
devices = _search() devices = XAPClient.list()
for dev in devices: for dev in devices:
device = hid.Device(path=dev['path']) device = XAPClient().connect(dev)
data = _query_device(device) data = device.info()
cli.log.info(" %04x:%04x %s %s [API:%s] %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], data['xap'], data['secure']) cli.log.info(" %04x:%04x %s %s [API:%s]", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], data['xap'])
if cli.config.general.verbose: if cli.config.general.verbose:
# TODO: better formatting like "lsusb -v"? # TODO: better formatting like "lsusb -v"?
data = _query_device_info(device)
data["_id"] = _query_device_id(device)
print_dotted_output(data) print_dotted_output(data)
# def xap_dummy(device):
# # get layer count
# layers = _xap_transaction(device, 0x04, 0x02)
# layers = int.from_bytes(layers, "little")
# print(f'layers:{layers}')
# # get keycode [layer:0, row:0, col:0]
# # keycode = _xap_transaction(device, 0x04, 0x03, b"\x00\x00\x00")
# # get encoder [layer:0, index:0, clockwise:0]
# keycode = _xap_transaction(device, 0x04, 0x04, b"\x00\x00\x00")
# keycode = int.from_bytes(keycode, "little")
# print(f'keycode:{KEYCODE_MAP.get(keycode, "unknown")}[{keycode}]')
# # set encoder [layer:0, index:0, clockwise:0, keycode:KC_A]
# _xap_transaction(device, 0x05, 0x04, b"\x00\x00\x00\x04\00")
def xap_broadcast_listen(device):
try:
cli.log.info("Listening for XAP broadcasts...")
while 1:
array_alpha = device.read(64, 100)
if str(b"\xFF\xFF") == str(array_alpha[:2]):
if array_alpha[2] == 1:
cli.log.info(" Broadcast: Secure[%02x]", array_alpha[4])
else:
cli.log.info(" Broadcast: type[%02x] data:[%02x]", array_alpha[2], array_alpha[4])
except KeyboardInterrupt:
cli.log.info("Stopping...")
def xap_unlock(device):
_xap_transaction(device, 0x00, 0x04)
class XAPShell(cmd.Cmd): class XAPShell(cmd.Cmd):
intro = 'Welcome to the XAP shell. Type help or ? to list commands.\n' intro = 'Welcome to the XAP shell. Type help or ? to list commands.\n'
prompt = 'Ψ> ' prompt = 'Ψ> '
@ -208,24 +242,33 @@ class XAPShell(cmd.Cmd):
cmd.Cmd.__init__(self) cmd.Cmd.__init__(self)
self.device = device self.device = device
# cache keycodes for this device # cache keycodes for this device
self.keycodes = get_xap_keycodes(_query_device(device)['xap']) self.keycodes = get_xap_keycodes(device.version()['xap'])
def do_about(self, arg): def do_about(self, arg):
"""Prints out the current version of QMK with a build date """Prints out the current version of QMK with a build date
""" """
data = _query_device(self.device) # TODO: request stuff?
print(data) print(self.device.info()['xap'])
def do_unlock(self, arg): def do_unlock(self, arg):
"""Initiate secure unlock """Initiate secure unlock
""" """
xap_unlock(self.device) self.device.unlock()
print("Done") print("Done")
def do_listen(self, arg): def do_listen(self, arg):
"""Log out XAP broadcast messages """Log out XAP broadcast messages
""" """
xap_broadcast_listen(self.device) try:
cli.log.info("Listening for XAP broadcasts...")
while 1:
array_alpha = self.device.listen()
if array_alpha[2] == 1:
cli.log.info(" Broadcast: Secure[%02x]", array_alpha[4])
else:
cli.log.info(" Broadcast: type[%02x] data:[%02x]", array_alpha[2], array_alpha[4])
except KeyboardInterrupt:
cli.log.info("Stopping...")
def do_keycode(self, arg): def do_keycode(self, arg):
"""Prints out the keycode value of a certain layer, row, and column """Prints out the keycode value of a certain layer, row, and column
@ -235,7 +278,7 @@ class XAPShell(cmd.Cmd):
cli.log.error("Invalid args") cli.log.error("Invalid args")
return return
keycode = _xap_transaction(self.device, 0x04, 0x03, data) keycode = self.device.transaction(0x04, 0x03, data)
keycode = int.from_bytes(keycode, "little") keycode = int.from_bytes(keycode, "little")
print(f'keycode:{self.keycodes.get(keycode, "unknown")}[{keycode}]') print(f'keycode:{self.keycodes.get(keycode, "unknown")}[{keycode}]')
@ -247,14 +290,14 @@ class XAPShell(cmd.Cmd):
cli.log.error("Invalid args") cli.log.error("Invalid args")
return return
info = _query_device_info(self.device) info = self.device.info()
rows = info['matrix_size']['rows'] rows = info['matrix_size']['rows']
cols = info['matrix_size']['cols'] cols = info['matrix_size']['cols']
for r in range(rows): for r in range(rows):
for c in range(cols): for c in range(cols):
q = data + r.to_bytes(1, byteorder='little') + c.to_bytes(1, byteorder='little') q = data + r.to_bytes(1, byteorder='little') + c.to_bytes(1, byteorder='little')
keycode = _xap_transaction(self.device, 0x04, 0x03, q) keycode = self.device.transaction(0x04, 0x03, q)
keycode = int.from_bytes(keycode, "little") keycode = int.from_bytes(keycode, "little")
print(f'| {self.keycodes.get(keycode, "unknown").ljust(7)} ', end='', flush=True) print(f'| {self.keycodes.get(keycode, "unknown").ljust(7)} ', end='', flush=True)
print('|') print('|')
@ -267,7 +310,7 @@ class XAPShell(cmd.Cmd):
cli.log.error("Invalid args") cli.log.error("Invalid args")
return return
info = _query_device_info(self.device) info = self.device.info()
# Assumptions on selected layout rather than prompt # Assumptions on selected layout rather than prompt
first_layout = next(iter(info['layouts'])) first_layout = next(iter(info['layouts']))
@ -276,7 +319,7 @@ class XAPShell(cmd.Cmd):
keycodes = [] keycodes = []
for item in layout: for item in layout:
q = data + bytes(item['matrix']) q = data + bytes(item['matrix'])
keycode = _xap_transaction(self.device, 0x04, 0x03, q) keycode = self.device.transaction(0x04, 0x03, q)
keycode = int.from_bytes(keycode, "little") keycode = int.from_bytes(keycode, "little")
keycodes.append(self.keycodes.get(keycode, "???")) keycodes.append(self.keycodes.get(keycode, "???"))
@ -311,22 +354,18 @@ class XAPShell(cmd.Cmd):
def xap(cli): def xap(cli):
"""Acquire debugging information from XAP devices """Acquire debugging information from XAP devices
""" """
# Lazy load to avoid issues
global hid
import hid
if cli.args.list: if cli.args.list:
return _list_devices() return _list_devices()
# Connect to first available device # Connect to first available device
devices = _search() devices = XAPClient.list()
if not devices: if not devices:
cli.log.error("No devices found!") cli.log.error("No devices found!")
return False return False
dev = devices[0] dev = devices[0]
device = hid.Device(path=dev['path']) cli.log.info("Connecting to:%04x:%04x %s %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'])
cli.log.info("Connected to:%04x:%04x %s %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string']) device = XAPClient().connect(dev)
# shell? # shell?
if cli.args.interactive: if cli.args.interactive: