Source code for oresat_c3.protocols.edl_command

"""
Anything dealing with packing and unpacking EDL (Engineering Data Link) C3 command packets.

These package are defined as 1 octet for code and X octets for the data.

The EDL code is used to identify the message the rest of message is for argument for
request, and values for responses.
"""

import struct
from collections import namedtuple
from enum import IntEnum, auto

EdlCommand = namedtuple(
    "EdlCommand",
    ["req_fmt", "res_fmt", "req_pack_func", "req_unpack_func", "res_pack_func", "res_unpack_func"],
    defaults=(None, None, None, None, None, None),
)
"""
Parameters
----------
req_fmt: str
    The struct.unpack() format for request packet.
res_fmt: str
    The struct.pack() format for response packet.
req_pack_func: Callable[[tuple], bytes]
    Optional callback function to use instead of req_fmt for packing the request packet.
req_unpack_func: Callable[[bytes], tuple]
    Optional callback function to use instead of req_fmt for unpacking the request packet.
res_pack_func: Callable[[tuple], bytes]
    Optional callback function to use instead of res_fmt for packing the response packet.
res_unpack_func: Callable[[bytes], tuple]
    Optional callback function to use instead of res_fmt for unpacking the response packet.
"""


class EdlCommandPacketError(Exception):
    """Error with EdlCommandRequest or EdlCommandResponse"""


[docs] class EdlCommandCode(IntEnum): """The EDL telecommand codes.""" TX_CTRL = 0 """ Enable / Disable Tx. Parameters ---------- enable: bool True to enable Tx or False to disable Tx Returns ------- bool Tx status """ C3_SOFT_RESET = auto() """ Soft reset the C3 (reboot C3 daemon). """ C3_HARD_RESET = auto() """ Hard reset the C3 (reboot system). """ C3_FACTORY_RESET = auto() """ Factory reset the C3 (clear FRAM, reset RTC, and reboot system). """ CO_NODE_ENABLE = auto() """ Enable a CANopen node. Parameters ---------- node_id: uint8 Node id of the CANopen node to enable / disable enable: bool True to enable or False to disable Returns ------- uint8 node status """ CO_NODE_STATUS = auto() """ Get the status of a CANopen node. Parameters ---------- node_id: uint8 Node id of node to get the status for Returns ------- uint8 node status """ CO_SDO_WRITE = auto() """ Write a value to a node's OD over the CAN bus using a CANopen SDO message. Parameters ---------- node_id: uint8 The id of The CANopen node to write to. index: uint16 The OD index to write to. subindex: uint8 The OD subindex to write to. size: uint32 Size of the data buffer. buffer: bytes Data buffer. Returns ------- uint32 SDO error code (0 is no error). """ CO_SYNC = auto() """ Send a CANopen SYNC message on the CAN bus. Returns ------- bool The CANopen SYNC message was sent successfully. """ OPD_SYSENABLE = auto() """ Enable the OPD subsystem. Parameters ---------- enable: bool True to enable or False to disable. Returns ------- bool OPD subsystem status. """ OPD_SCAN = auto() """ Scan for all nodes on the OPD. Returns ------- uint8: The number of nodes found. """ OPD_PROBE = auto() """ Probe for a node on the OPD. Parameters ---------- node_id: uint8 The id of the OPD node to probe for. Returns ------- bool: True if the node was found or False if not. """ OPD_ENABLE = auto() """ Enable / disable a node on the OPD. Parameters ---------- node_id: uint8 The id of the OPD node to enable / disable. enable: bool True to enable or False to disable. Returns ------- uint8: OPD node status. See the OPD page. """ OPD_RESET = auto() """ Reset a node on the OPD. Parameters ---------- node_id: uint8 The id of the OPD node to reset. Returns ------- uint8: OPD node status. See the OPD page. """ OPD_STATUS = auto() """ Get the status of a node on the OPD. Parameters ---------- node_id: uint8 The id of the OPD node to get the status of. Returns ------- uint8: OPD node status. See the OPD page. """ RTC_SET_TIME = auto() """ Set the RTC time Parameters ---------- time: uint32 The Unix time in seconds. Returns ------- bool The RTC time was set successfully. """ TIME_SYNC = auto() """ C3 will send OreSat's Time Sync TPDO over the CAN bus (all nodes that are powered on and care about time will sync to it). Returns ------- bool Time sync was sent. """ BEACON_PING = auto() """ C3 will response with a beacon regardless of tx state. """ PING = auto() """ A basic ping to the C3. Parameters ---------- value: uint32 A value to return. Returns ------- uint32: The parameter value. """ RX_TEST = auto() """ Empty command for C3 Rx testing. """ CO_SDO_READ = auto() """ Read a value from a node's OD over the CAN bus using a CANopen SDO message. Parameters ---------- node_id: uint8 The id of The CANopen node to write to. index: uint16 The OD index to write to. subindex: uint8 The OD subindex to write to. Returns ------- uint32 SDO error code (0 is no error). uint32 Size of the data buffer. bytes Data buffer. """
def _edl_req_sdo_write_pack_cb(values: tuple) -> bytes: req = struct.pack("<BHBI", *values[:4]) return req + values[4] def _edl_req_sdo_write_unpack_cb(raw: bytes) -> tuple: fmt = "<BHBI" size = struct.calcsize(fmt) values = struct.unpack(fmt, raw[:size]) return values + (raw[size:],) def _edl_res_sdo_read_pack_cb(values: tuple) -> bytes: res = struct.pack("<2I", *values[:2]) res += values[2] return res def _edl_res_sdo_read_unpack_cb(raw: bytes) -> tuple: fmt = "<2I" size = struct.calcsize(fmt) res = struct.unpack(fmt, raw[:size]) res += (raw[size:],) return res EDL_COMMANDS = { EdlCommandCode.TX_CTRL: EdlCommand("?", "?"), EdlCommandCode.C3_SOFT_RESET: EdlCommand(), EdlCommandCode.C3_HARD_RESET: EdlCommand(), EdlCommandCode.C3_FACTORY_RESET: EdlCommand(), EdlCommandCode.CO_NODE_ENABLE: EdlCommand("B?", "B"), EdlCommandCode.CO_NODE_STATUS: EdlCommand("B", "B"), EdlCommandCode.CO_SDO_WRITE: EdlCommand( None, "I", _edl_req_sdo_write_pack_cb, _edl_req_sdo_write_unpack_cb ), EdlCommandCode.CO_SYNC: EdlCommand(None, "?"), EdlCommandCode.OPD_SYSENABLE: EdlCommand("?", "?"), EdlCommandCode.OPD_SCAN: EdlCommand(None, "B"), EdlCommandCode.OPD_PROBE: EdlCommand("B", "?"), EdlCommandCode.OPD_ENABLE: EdlCommand("B?", "B"), EdlCommandCode.OPD_RESET: EdlCommand("B", "B"), EdlCommandCode.OPD_STATUS: EdlCommand("B", "B"), EdlCommandCode.RTC_SET_TIME: EdlCommand("I", "?"), EdlCommandCode.TIME_SYNC: EdlCommand(None, "?"), EdlCommandCode.BEACON_PING: EdlCommand(), EdlCommandCode.PING: EdlCommand("I", "I"), EdlCommandCode.RX_TEST: EdlCommand(), EdlCommandCode.CO_SDO_READ: EdlCommand( "BHB", None, None, None, _edl_res_sdo_read_pack_cb, _edl_res_sdo_read_unpack_cb, ), } """All valid EDL commands lookup table""" class EdlCommandError(Exception): """Error with EdlBase""" class EdlCommandRequest: """ An request payload for an EDL command for the C3 to process. """ def __init__(self, code: EdlCommandCode, args: tuple): """ Parameters ---------- code: EdlCommandCode The EDL code args: tuple The arguments for the EDL command """ if code not in list(EdlCommandCode): raise EdlCommandError(f"Invalid EDL code {code}") if not isinstance(args, tuple) and args is not None: raise EdlCommandError("EdlCommandRequest args must be a tuple or None") self.code = code self.command = EDL_COMMANDS[code] self.args = args def __eq__(self, other) -> bool: if not isinstance(other, EdlCommandRequest): return False return self.code == other.code and self.args == other.args def __str__(self) -> str: return f"{self.code} {self.args}" def pack(self) -> bytes: """ Pack the EDL C3 command request packet. """ raw = self.code.value.to_bytes(1, "little") if self.command.req_fmt is not None: raw += struct.pack(self.command.req_fmt, *self.args) elif self.command.req_pack_func is not None: raw += self.command.req_pack_func(self.args) return raw @classmethod def unpack(cls, raw: bytes): """ Unpack the EDL C3 command response packet. Parameters ---------- raw: bytes The raw data to unpack. """ code = EdlCommandCode(raw[0]) command = EDL_COMMANDS[code] if command.req_fmt is not None: args = struct.unpack(command.req_fmt, raw[1:]) elif command.req_unpack_func is not None: args = command.req_unpack_func(raw[1:]) else: args = tuple() return EdlCommandRequest(code, args) class EdlCommandResponse: """ An response payload to an EDL command from the C3. """ def __init__(self, code: EdlCommandCode, values: tuple): """ Parameters ---------- code: EdlCommandCode The EDL code values: tuple The return values for the response. """ if code not in list(EdlCommandCode): raise EdlCommandError(f"Invalid EDL code {code}") if not isinstance(values, tuple) and values is not None: raise EdlCommandError("EdlCommandResponse values must be a tuple or None") self.code = code self.command = EDL_COMMANDS[code] self.values = values def __eq__(self, other) -> bool: if not isinstance(other, EdlCommandResponse): return False return self.code == other.code and self.values == other.values def pack(self) -> bytes: """ Pack the EDL C3 command response. """ raw = self.code.value.to_bytes(1, "little") if self.command.res_fmt is not None: raw += struct.pack(self.command.res_fmt, *self.values) elif self.command.res_pack_func is not None: raw += self.command.res_pack_func(self.values) return raw @classmethod def unpack(cls, raw: bytes): """ Unpack the EDL C3 command response. Parameters ---------- raw: bytes The raw data to unpack. """ code = EdlCommandCode(raw[0]) command = EDL_COMMANDS[code] if command.res_fmt is not None: values = struct.unpack(command.res_fmt, raw[1:]) elif command.res_unpack_func is not None: values = command.res_unpack_func(raw[1:]) else: values = tuple() return EdlCommandResponse(code, values)