diff --git a/broadlink/alarm.py b/broadlink/alarm.py index ea1bc54..62dbb30 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -1,5 +1,3 @@ -import codecs - from .device import device from .exceptions import check_error @@ -25,7 +23,7 @@ class S1C(device): packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) if not payload: return None count = payload[0x4] @@ -34,11 +32,11 @@ class S1C(device): sens_res = [] for sens in sensors_a: - status = ord(chr(sens[0])) - _name = str(bytes(sens[4:26]).decode()) - _order = ord(chr(sens[1])) - _type = ord(chr(sens[3])) - _serial = bytes(codecs.encode(sens[26:30], "hex")).decode() + status = sens[0] + _name = sens[4:26].decode() + _order = sens[1] + _type = sens[3] + _serial = sens[26:30].hex() type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') diff --git a/broadlink/climate.py b/broadlink/climate.py index 353ad59..3d68eb4 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -19,9 +19,9 @@ class hysen(device): # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails # The function prepends length (2 bytes) and appends CRC - def send_request(self, input_payload: bytearray) -> bytes: + def send_request(self, input_payload: bytes) -> bytes: """Send a request to the device.""" - crc = calculate_crc16(bytes(input_payload)) + crc = calculate_crc16(input_payload) # first byte is length, +2 for CRC16 request_payload = bytearray([len(input_payload) + 2, 0x00]) @@ -34,13 +34,13 @@ class hysen(device): # send to device response = self.send_packet(0x6a, request_payload) check_error(response[0x22:0x24]) - response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) + response_payload = self.decrypt(response[0x38:]) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) response_payload_len = response_payload[0] if response_payload_len + 2 > len(response_payload): raise ValueError('hysen_response_error', 'first byte of response is not length') - crc = calculate_crc16(bytes(response_payload[2:response_payload_len])) + crc = calculate_crc16(response_payload[2:response_payload_len]) if (response_payload[response_payload_len] == crc & 0xFF) and ( response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): return response_payload[2:response_payload_len] diff --git a/broadlink/cover.py b/broadlink/cover.py index 485e6c7..b6b3be1 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -23,8 +23,8 @@ class dooya(device): packet[10] = 0x44 response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return ord(payload[4]) + payload = self.decrypt(response[0x38:]) + return payload[4] def open(self) -> int: """Open the curtain.""" diff --git a/broadlink/device.py b/broadlink/device.py index 92916f9..bf29ea3 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -34,14 +34,14 @@ class device: self.manufacturer = manufacturer self.is_locked = is_locked self.count = random.randrange(0xffff) - self.iv = bytearray( + self.iv = bytes( [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) - self.id = bytearray([0, 0, 0, 0]) + self.id = bytes(4) self.type = "Unknown" self.lock = threading.Lock() self.aes = None - key = bytearray( + key = bytes( [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) self.update_aes(key) @@ -133,7 +133,7 @@ class device: """Return device type.""" return self.type - def send_packet(self, command: int, payload: bytearray) -> bytearray: + def send_packet(self, command: int, payload: bytes) -> bytes: """Send a packet to the device.""" self.count = (self.count + 1) & 0xffff packet = bytearray(0x38) @@ -162,8 +162,10 @@ class device: packet[0x33] = self.id[0] # pad the payload for AES encryption - if payload: - payload += bytearray((16 - len(payload)) % 16) + padding = (16 - len(payload)) % 16 + if padding: + payload = bytearray(payload) + payload += bytearray(padding) checksum = sum(payload, 0xbeaf) & 0xffff packet[0x34] = checksum & 0xff @@ -187,7 +189,6 @@ class device: cs.sendto(packet, self.host) cs.settimeout(1) resp, _ = cs.recvfrom(2048) - resp = bytearray(resp) break except socket.timeout: if (time.time() - start_time) > self.timeout: diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 22ac12f..4b21478 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -21,8 +21,8 @@ def get_local_ip() -> str: return s.getsockname()[0] -def calculate_crc16(input_data) -> int: - """Calculate CRC-16.""" +def calculate_crc16(input_data: bytes) -> int: + """Calculate the CRC-16 of a byte string.""" crc16_tab = [] crc16_constant = 0xA001 @@ -35,24 +35,11 @@ def calculate_crc16(input_data) -> int: crc = c_ushort(crc >> 1).value crc16_tab.append(hex(crc)) - try: - is_string = isinstance(input_data, str) - is_bytes = isinstance(input_data, bytes) + crcValue = 0xFFFF - if not is_string and not is_bytes: - raise Exception( - "Please provide a string or a byte sequence " - "as argument for calculation." - ) + for c in input_data: + tmp = crcValue ^ c + rotated = c_ushort(crcValue >> 8).value + crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0) - crcValue = 0xFFFF - - for c in input_data: - d = ord(c) if is_string else c - tmp = crcValue ^ d - rotated = c_ushort(crcValue >> 8).value - crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0) - - return crcValue - except Exception as e: - print("EXCEPTION(calculate): {}".format(e)) + return crcValue diff --git a/broadlink/light.py b/broadlink/light.py index 27d3ec4..1cac4a5 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -44,7 +44,7 @@ class lb1(device): response = self.send_packet(0x6a, packet) check_error(response[0x36:0x38]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) if responseLength > 0: diff --git a/broadlink/remote.py b/broadlink/remote.py index 236f6c4..a08c675 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -18,7 +18,7 @@ class rm(device): packet.append(0x04) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) return payload[len(self._request_header) + 4:] def send_data(self, data: bytes) -> None: @@ -56,7 +56,7 @@ class rm(device): packet.append(0x1a) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: return True return False @@ -67,7 +67,7 @@ class rm(device): packet.append(0x1b) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: return True return False @@ -78,7 +78,7 @@ class rm(device): packet.append(command) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) return bytearray(payload[len(self._request_header) + 4:]) def check_temperature(self) -> int: diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 19db0e8..819518b 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -31,7 +31,7 @@ class a1(device): packet = bytearray([0x1]) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) data = bytearray(payload[0x4:]) return { 'temperature': data[0x0] + data[0x1] / 10.0, diff --git a/broadlink/switch.py b/broadlink/switch.py index fdc24d1..6b287bd 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -50,7 +50,7 @@ class mp1(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) return payload[0x0e] def check_power(self) -> dict: @@ -116,7 +116,7 @@ class bg1(device): check_error(response[0x22:0x24]) return self._decode(response) - def _encode(self, flag: int, js: str) -> bytearray: + def _encode(self, flag: int, js: str) -> bytes: """Encode a message.""" # The packet format is: # 0x00-0x01 length @@ -139,7 +139,7 @@ class bg1(device): def _decode(self, response: bytes) -> dict: """Decode a message.""" - payload = self.decrypt(bytes(response[0x38:])) + payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(' bool: """Return the state of the night light.""" @@ -208,20 +206,13 @@ class sp2(device): packet[0] = 1 response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x4], int): - return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) - return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) def get_energy(self) -> int: """Return the energy state of the device.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x7], int): - energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 - else: - energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int( - hex(ord(payload[0x05]))[2:]) / 100.0 - return energy + payload = self.decrypt(response[0x38:]) + return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0