Prep client gen for header parsing

This commit is contained in:
zvecr 2022-07-17 23:04:35 +01:00
parent 5c21830da2
commit 7bce3d7b25
5 changed files with 75 additions and 23 deletions

View File

@ -1,4 +1,8 @@
class XAPRouteError(Exception):
pass
class XAPRoutes():
{%- for id, route in xap.routes | dictsort %}
{%- if route.routes %}

View File

@ -1,8 +1,34 @@
from collections import namedtuple
from enum import IntFlag, IntEnum
from struct import Struct
class XAPRouteError(Exception):
pass
class XAPRequest(namedtuple('XAPRequest', 'token length data')):
fmt = Struct('<HB61s')
def __new__(cls, *args):
return super().__new__(cls, *args)
@staticmethod
def from_bytes(data):
return XAPRequest._make(XAPRequest.fmt.unpack(data))
def to_bytes(self):
return self.fmt.pack(*list(self))
class XAPResponse(namedtuple('XAPResponse', 'token flags length data')):
fmt = Struct('<HBB60s')
def __new__(cls, *args):
return super().__new__(cls, *args)
@staticmethod
def from_bytes(data):
return XAPResponse._make(XAPResponse.fmt.unpack(data))
def to_bytes(self):
return self.fmt.pack(*list(self))
class XAPSecureStatus(IntEnum):

View File

@ -7,20 +7,13 @@ import gzip
import random
import threading
import functools
from struct import Struct, pack, unpack
from collections import namedtuple
from struct import pack, unpack
from platform import platform
from .types import XAPSecureStatus, XAPFlags, XAPRouteError
from .routes import XAPRoutes
from .types import XAPSecureStatus, XAPFlags, XAPRequest, XAPResponse
from .routes import XAPRoutes, XAPRouteError
from .util import u32toBCD
RequestPacket = namedtuple('RequestPacket', 'token length data')
RequestStruct = Struct('<HB61s')
ResponsePacket = namedtuple('ResponsePacket', 'token flags length data')
ResponseStruct = Struct('<HBB60s')
def _gen_token():
"""Generate XAP token - cannot start with 00xx or 'reserved' (FFFE|FFFF)
@ -58,12 +51,12 @@ class XAPDevice:
"""Background thread to signal waiting transactions
"""
while self.do_read:
array_alpha = self.dev.read(ResponseStruct.size, 100)
if array_alpha:
token = int.from_bytes(array_alpha[:2], 'little')
event = self.responses.get(token)
data = self.dev.read(XAPResponse.fmt.size, 100)
if data:
r = XAPResponse.from_bytes(data)
event = self.responses.get(r.token)
if event:
event._ret = array_alpha
event._ret = data
event.set()
def _query_device_info(self):
@ -90,7 +83,7 @@ class XAPDevice:
while not hasattr(event, '_ret'):
event.wait(timeout=0.25)
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
r = XAPResponse.from_bytes(event._ret)
return (r.flags, r.data[:r.length])
def _transaction(self, *args):
@ -106,8 +99,7 @@ class XAPDevice:
token = _gen_token()
p = RequestPacket(token, len(data), data)
buffer = RequestStruct.pack(*list(p))
buffer = XAPRequest(token, len(data), data).to_bytes()
event = threading.Event()
self.responses[token] = event
@ -122,7 +114,7 @@ class XAPDevice:
if not hasattr(event, '_ret'):
return None
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
r = XAPResponse.from_bytes(event._ret)
if r.flags & XAPFlags.SUCCESS == 0:
return None

View File

@ -27,6 +27,10 @@
################################################################################
class XAPRouteError(Exception):
pass
class XAPRoutes():
# XAP
XAP_VERSION_QUERY = b'\x00\x00'

View File

@ -26,11 +26,37 @@
#
################################################################################
from collections import namedtuple
from enum import IntFlag, IntEnum
from struct import Struct
class XAPRouteError(Exception):
pass
class XAPRequest(namedtuple('XAPRequest', 'token length data')):
fmt = Struct('<HB61s')
def __new__(cls, *args):
return super().__new__(cls, *args)
@staticmethod
def from_bytes(data):
return XAPRequest._make(XAPRequest.fmt.unpack(data))
def to_bytes(self):
return self.fmt.pack(*list(self))
class XAPResponse(namedtuple('XAPResponse', 'token flags length data')):
fmt = Struct('<HBB60s')
def __new__(cls, *args):
return super().__new__(cls, *args)
@staticmethod
def from_bytes(data):
return XAPResponse._make(XAPResponse.fmt.unpack(data))
def to_bytes(self):
return self.fmt.pack(*list(self))
class XAPSecureStatus(IntEnum):