diff --git a/broadlink/__init__.py b/broadlink/__init__.py index d3bdd90..c1c3feb 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -118,8 +118,8 @@ def gendevice( dev_type: int, host: t.Tuple[str, int], mac: t.Union[bytes, str], - name: str = None, - is_locked: bool = None, + name: str = "", + is_locked: bool = False, ) -> Device: """Generate a device.""" try: diff --git a/broadlink/alarm.py b/broadlink/alarm.py index dbd14b5..a9b5e87 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -9,20 +9,18 @@ class S1C(Device): TYPE = "S1C" _SENSORS_TYPES = { - 0x31: "Door Sensor", # 49 as hex - 0x91: "Key Fob", # 145 as hex, as serial on fob corpse - 0x21: "Motion Sensor", # 33 as hex + 0x31: "Door Sensor", + 0x91: "Key Fob", + 0x21: "Motion Sensor", } def get_sensors_status(self) -> dict: """Return the state of the sensors.""" packet = bytearray(16) - packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors + packet[0] = 0x06 response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - if not payload: - return None count = payload[0x4] sensor_data = payload[0x6:] sensors = [ diff --git a/broadlink/climate.py b/broadlink/climate.py index 186ad58..98770cd 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,5 @@ """Support for climate control.""" -from typing import List +import typing as t from . import exceptions as e from .device import Device @@ -11,50 +11,38 @@ class hysen(Device): TYPE = "Hysen heating controller" - # Send a request - # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) - # Returns decrypted payload - # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails - # The function prepends length (2 bytes) and appends CRC - - def send_request(self, input_payload: bytes) -> bytes: + def send_request(self, request: t.Sequence[int]) -> bytes: """Send a request to the device.""" - crc = CRC16.calculate(input_payload) + packet = bytearray() + packet.extend((len(request) + 2).to_bytes(2, "little")) + packet.extend(request) + packet.extend(CRC16.calculate(request).to_bytes(2, "little")) - # first byte is length, +2 for CRC16 - request_payload = bytearray([len(input_payload) + 2, 0x00]) - request_payload.extend(input_payload) - - # append CRC - request_payload.append(crc & 0xFF) - request_payload.append((crc >> 8) & 0xFF) - - # send to device - response = self.send_packet(0x6A, request_payload) + response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) - response_payload = self.decrypt(response[0x38:]) + payload = self.decrypt(response[0x38:]) - # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) - response_payload_len = response_payload[0] - if response_payload_len + 2 > len(response_payload): + p_len = int.from_bytes(payload[:0x02], "little") + if p_len + 2 > len(payload): raise ValueError( "hysen_response_error", "first byte of response is not length" ) - crc = CRC16.calculate(response_payload[2:response_payload_len]) - if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF - ): - return response_payload[2:response_payload_len] - raise ValueError("hysen_response_error", "CRC check on response failed") - def get_temp(self) -> int: + nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: + raise ValueError("hysen_response_error", "CRC check on response failed") + + return payload[0x02:p_len] + + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) return payload[0x05] / 2.0 - def get_external_temp(self) -> int: + def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) return payload[18] / 2.0 def get_full_status(self) -> dict: @@ -62,28 +50,28 @@ class hysen(Device): Timer schedule included. """ - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]) data = {} data["remote_lock"] = payload[3] & 1 data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 - data["room_temp"] = (payload[5] & 255) / 2.0 - data["thermostat_temp"] = (payload[6] & 255) / 2.0 - data["auto_mode"] = payload[7] & 15 - data["loop_mode"] = (payload[7] >> 4) & 15 + data["room_temp"] = payload[5] / 2.0 + data["thermostat_temp"] = payload[6] / 2.0 + data["auto_mode"] = payload[7] & 0xF + data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] data["dif"] = payload[10] data["svh"] = payload[11] data["svl"] = payload[12] - data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0 - if data["room_temp_adj"] > 32767: - data["room_temp_adj"] = 32767 - data["room_temp_adj"] + data["room_temp_adj"] = ( + int.from_bytes(payload[13:15], "big", signed=True) / 10.0 + ) data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = (payload[18] & 255) / 2.0 + data["external_temp"] = payload[18] / 2.0 data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -124,7 +112,7 @@ class hysen(Device): def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode - self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) + self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) # Advanced settings # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, @@ -133,7 +121,7 @@ class hysen(Device): # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C - # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C + # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, # 1 for anti-freezing function open. Factory default: 0 # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 @@ -150,7 +138,7 @@ class hysen(Device): poweron: int, ) -> None: """Set advanced options.""" - input_payload = bytearray( + self.send_request( [ 0x01, 0x10, @@ -165,13 +153,12 @@ class hysen(Device): dif, svh, svl, - (int(adj * 2) >> 8 & 0xFF), - (int(adj * 2) & 0xFF), + int(adj * 10) >> 8 & 0xFF, + int(adj * 10) & 0xFF, fre, poweron, ] ) - self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. # Note this function invokes loop_mode=0 and sensor=0. @@ -186,22 +173,20 @@ class hysen(Device): # Set temperature for manual mode (also activates manual mode if currently in automatic) def set_temp(self, temp: float) -> None: """Set the target temperature.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) + self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]) # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. def set_power(self, power: int = 1, remote_lock: int = 0) -> None: """Set the power state of the device.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - bytearray( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] - ) + [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] ) # Set timer schedule @@ -210,28 +195,26 @@ class hysen(Device): # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: + def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: """Set timer schedule.""" - # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]) + request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] - # Now simply append times/temps # weekday times for i in range(0, 6): - input_payload.append(weekday[i]["start_hour"]) - input_payload.append(weekday[i]["start_minute"]) + request.append(weekday[i]["start_hour"]) + request.append(weekday[i]["start_minute"]) # weekend times for i in range(0, 2): - input_payload.append(weekend[i]["start_hour"]) - input_payload.append(weekend[i]["start_minute"]) + request.append(weekend[i]["start_hour"]) + request.append(weekend[i]["start_minute"]) # weekday temperatures for i in range(0, 6): - input_payload.append(int(weekday[i]["temp"] * 2)) + request.append(int(weekday[i]["temp"] * 2)) # weekend temperatures for i in range(0, 2): - input_payload.append(int(weekend[i]["temp"] * 2)) + request.append(int(weekend[i]["temp"] * 2)) - self.send_request(input_payload) + self.send_request(request) diff --git a/broadlink/device.py b/broadlink/device.py index cf889d9..f65f54b 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -141,20 +141,12 @@ class Device: def __str__(self) -> str: """Return a readable representation of the device.""" - model = [] - if self.manufacturer: - model.append(self.manufacturer) - if self.model: - model.append(self.model) - model.append(hex(self.devtype)) - model = " ".join(model) - - info = [] - info.append(model) - info.append(f"{self.host[0]}:{self.host[1]}") - info.append(":".join(format(x, "02x") for x in self.mac).upper()) - info = " / ".join(info) - return "%s (%s)" % (self.name or "Unknown", info) + return "%s (%s / %s:%s / %s)" % ( + self.name or "Unknown", + " ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])), + *self.host, + ":".join(format(x, "02X") for x in self.mac), + ) def update_aes(self, key: bytes) -> None: """Update AES.""" @@ -177,22 +169,18 @@ class Device: self.id = 0 self.update_aes(bytes.fromhex(self.__INIT_KEY)) - payload = bytearray(0x50) - payload[0x04:0x14] = [0x31]*16 - payload[0x1E] = 0x01 - payload[0x2D] = 0x01 - payload[0x30:0x36] = "Test 1".encode() + packet = bytearray(0x50) + packet[0x04:0x14] = [0x31] * 16 + packet[0x1E] = 0x01 + packet[0x2D] = 0x01 + packet[0x30:0x36] = "Test 1".encode() - response = self.send_packet(0x65, payload) + response = self.send_packet(0x65, packet) e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False - self.id = int.from_bytes(payload[:0x4], "little") - self.update_aes(key) + self.update_aes(payload[0x04:0x14]) return True def hello(self, local_ip_address=None) -> bool: @@ -284,8 +272,8 @@ class Device: packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") packet[0x24:0x26] = self.devtype.to_bytes(2, "little") packet[0x26:0x28] = packet_type.to_bytes(2, "little") - packet[0x28:0x2a] = self.count.to_bytes(2, "little") - packet[0x2a:0x30] = self.mac[::-1] + packet[0x28:0x2A] = self.count.to_bytes(2, "little") + packet[0x2A:0x30] = self.mac[::-1] packet[0x30:0x34] = self.id.to_bytes(4, "little") p_checksum = sum(payload, 0xBEAF) & 0xFFFF diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 948fdd0..6ee5499 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -8,10 +8,10 @@ class CRC16: CRC tables are cached for performance. """ - _cache = {} + _cache: t.Dict[int, t.List[int]] = {} @classmethod - def get_table(cls, polynomial: int): + def get_table(cls, polynomial: int) -> t.List[int]: """Return the CRC-16 table for a polynomial.""" try: crc_table = cls._cache[polynomial] diff --git a/broadlink/light.py b/broadlink/light.py index 907add2..00bce4d 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -2,7 +2,6 @@ import enum import json import struct -import typing as t from . import exceptions as e from .device import Device @@ -16,6 +15,7 @@ class lb1(Device): @enum.unique class ColorMode(enum.IntEnum): """Enumerates color modes.""" + RGB = 0 WHITE = 1 SCENE = 2 @@ -83,21 +83,21 @@ class lb1(Device): e.check_error(response[0x22:0x24]) return self._decode(response) - def _encode(self, flag: int, obj: t.Any) -> bytes: + def _encode(self, flag: int, state: dict) -> bytes: """Encode a JSON packet.""" # flag: 1 for reading, 2 for writing. packet = bytearray(14) - js = json.dumps(obj, separators=[',', ':']).encode() - p_len = 12 + len(js) + data = json.dumps(state, separators=(",", ":")).encode() + p_len = 12 + len(data) struct.pack_into( - " t.Any: + def _decode(self, response: bytes) -> dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" bytes: + def _encode(self, flag: int, state: dict) -> bytes: """Encode a JSON packet.""" # flag: 1 for reading, 2 for writing. packet = bytearray(12) - js = json.dumps(obj, separators=[',', ':']).encode() - struct.pack_into( - " t.Any: + def _decode(self, response: bytes) -> dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(" bytes: + def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" bytes: + def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" dict: @@ -96,7 +96,7 @@ class rm4mini(rmminib): temp = struct.unpack(" float: diff --git a/broadlink/switch.py b/broadlink/switch.py index 1db8cf3..7e0f16e 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -11,7 +11,7 @@ class mp1(Device): TYPE = "MP1" - def set_power_mask(self, sid_mask: int, state: bool) -> None: + def set_power_mask(self, sid_mask: int, pwr: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0x00] = 0x0D @@ -19,20 +19,20 @@ class mp1(Device): packet[0x03] = 0xA5 packet[0x04] = 0x5A packet[0x05] = 0x5A - packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask) + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) packet[0x07] = 0xC0 packet[0x08] = 0x02 packet[0x0A] = 0x03 packet[0x0D] = sid_mask - packet[0x0E] = sid_mask if state else 0 + packet[0x0E] = sid_mask if pwr else 0 response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) - def set_power(self, sid: int, state: bool) -> None: + def set_power(self, sid: int, pwr: bool) -> None: """Set the power state of the device.""" sid_mask = 0x01 << (sid - 1) - self.set_power_mask(sid_mask, state) + self.set_power_mask(sid_mask, pwr) def check_power_raw(self) -> int: """Return the power state of the device in raw format.""" @@ -53,15 +53,13 @@ class mp1(Device): def check_power(self) -> dict: """Return the power state of the device.""" - state = self.check_power_raw() - if state is None: - return {"s1": None, "s2": None, "s3": None, "s4": None} - data = {} - data["s1"] = bool(state & 0x01) - data["s2"] = bool(state & 0x02) - data["s3"] = bool(state & 0x04) - data["s4"] = bool(state & 0x08) - return data + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + } class bg1(Device): @@ -74,7 +72,7 @@ class bg1(Device): Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` """ - packet = self._encode(1, b"{}") + packet = self._encode(1, {}) response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) return self._decode(response) @@ -90,48 +88,38 @@ class bg1(Device): idcbrightness: int = None, ) -> dict: """Set the power state of the device.""" - data = {} + state = {} if pwr is not None: - data["pwr"] = int(bool(pwr)) + state["pwr"] = int(bool(pwr)) if pwr1 is not None: - data["pwr1"] = int(bool(pwr1)) + state["pwr1"] = int(bool(pwr1)) if pwr2 is not None: - data["pwr2"] = int(bool(pwr2)) + state["pwr2"] = int(bool(pwr2)) if maxworktime is not None: - data["maxworktime"] = maxworktime + state["maxworktime"] = maxworktime if maxworktime1 is not None: - data["maxworktime1"] = maxworktime1 + state["maxworktime1"] = maxworktime1 if maxworktime2 is not None: - data["maxworktime2"] = maxworktime2 + state["maxworktime2"] = maxworktime2 if idcbrightness is not None: - data["idcbrightness"] = idcbrightness - js = json.dumps(data).encode("utf8") - packet = self._encode(2, js) + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) return self._decode(response) - def _encode(self, flag: int, js: str) -> bytes: + def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" - # The packet format is: - # 0x00-0x01 length - # 0x02-0x05 header - # 0x06-0x07 00 - # 0x08 flag (1 for read or 2 write?) - # 0x09 unknown (0xb) - # 0x0a-0x0d length of json - # 0x0e- json data packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(js) + data = json.dumps(state).encode() + length = 12 + len(data) struct.pack_into( - "> 8 + packet.extend(data) + checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF + packet[0x06:0x08] = checksum.to_bytes(2, "little") return packet def _decode(self, response: bytes) -> dict: @@ -147,10 +135,10 @@ class sp1(Device): TYPE = "SP1" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" packet = bytearray(4) - packet[0] = state + packet[0] = bool(pwr) response = self.send_packet(0x66, packet) e.check_error(response[0x22:0x24]) @@ -160,11 +148,11 @@ class sp2(Device): TYPE = "SP2" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0] = 2 - packet[4] = int(bool(state)) + packet[4] = bool(pwr) response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) @@ -198,25 +186,19 @@ class sp3(Device): TYPE = "SP3" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0] = 2 - if self.check_nightlight(): - packet[4] = 3 if state else 2 - else: - packet[4] = 1 if state else 0 + packet[4] = self.check_nightlight() << 1 | bool(pwr) response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) - def set_nightlight(self, state: bool) -> None: + def set_nightlight(self, ntlight: bool) -> None: """Set the night light state of the device.""" packet = bytearray(16) packet[0] = 2 - if self.check_power(): - packet[4] = 3 if state else 1 - else: - packet[4] = 2 if state else 0 + packet[4] = bool(ntlight) << 1 | self.check_power() response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) @@ -227,7 +209,7 @@ class sp3(Device): response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) + return bool(payload[0x4] & 1) def check_nightlight(self) -> bool: """Return the state of the night light.""" @@ -236,7 +218,7 @@ class sp3(Device): response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) + return bool(payload[0x4] & 2) class sp3s(sp2): @@ -259,13 +241,13 @@ class sp4(Device): TYPE = "SP4" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" - self.set_state(pwr=state) + self.set_state(pwr=pwr) - def set_nightlight(self, state: bool) -> None: + def set_nightlight(self, ntlight: bool) -> None: """Set the night light state of the device.""" - self.set_state(ntlight=state) + self.set_state(ntlight=ntlight) def set_state( self, @@ -277,21 +259,21 @@ class sp4(Device): childlock: bool = None, ) -> dict: """Set state of device.""" - data = {} + state = {} if pwr is not None: - data["pwr"] = int(bool(pwr)) + state["pwr"] = int(bool(pwr)) if ntlight is not None: - data["ntlight"] = int(bool(ntlight)) + state["ntlight"] = int(bool(ntlight)) if indicator is not None: - data["indicator"] = int(bool(indicator)) + state["indicator"] = int(bool(indicator)) if ntlbrightness is not None: - data["ntlbrightness"] = ntlbrightness + state["ntlbrightness"] = ntlbrightness if maxworktime is not None: - data["maxworktime"] = maxworktime + state["maxworktime"] = maxworktime if childlock is not None: - data["childlock"] = int(bool(childlock)) + state["childlock"] = int(bool(childlock)) - packet = self._encode(2, data) + packet = self._encode(2, state) response = self.send_packet(0x6A, packet) return self._decode(response) @@ -313,15 +295,14 @@ class sp4(Device): def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" - payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() struct.pack_into( - "> 8 + packet[0x04:0x06] = checksum.to_bytes(2, "little") return packet def _decode(self, response: bytes) -> dict: @@ -352,9 +333,9 @@ class sp4b(sp4): def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" - payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(payload) + data = json.dumps(state, separators=(",", ":")).encode() + length = 12 + len(data) struct.pack_into( "> 8 + packet.extend(data) + checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF + packet[0x06:0x08] = checksum.to_bytes(2, "little") return packet def _decode(self, response: bytes) -> dict: