diff --git a/broadlink/device.py b/broadlink/device.py index e675377..9509278 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,13 +3,13 @@ import socket import threading import random import time -from datetime import datetime from typing import Generator, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from .exceptions import check_error, exception +from .protocol import Datetime HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] @@ -33,42 +33,13 @@ def scan( port = 0 packet = bytearray(0x30) - - timezone = int(time.timezone / -3600) - if timezone < 0: - packet[0x08] = 0xFF + timezone - 1 - packet[0x09] = 0xFF - packet[0x0A] = 0xFF - packet[0x0B] = 0xFF - else: - packet[0x08] = timezone - packet[0x09] = 0 - packet[0x0A] = 0 - packet[0x0B] = 0 - - year = datetime.now().year - packet[0x0C] = year & 0xFF - packet[0x0D] = year >> 8 - packet[0x0E] = datetime.now().minute - packet[0x0F] = datetime.now().hour - subyear = str(year)[2:] - packet[0x10] = int(subyear) - packet[0x11] = datetime.now().isoweekday() - packet[0x12] = datetime.now().day - packet[0x13] = datetime.now().month - - address = local_ip_address.split(".") - packet[0x18] = int(address[3]) - packet[0x19] = int(address[2]) - packet[0x1A] = int(address[1]) - packet[0x1B] = int(address[0]) - packet[0x1C] = port & 0xFF - packet[0x1D] = port >> 8 + packet[0x08:0x14] = Datetime.pack(Datetime.now()) + packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1] + packet[0x1C:0x1E] = port.to_bytes(2, "little") packet[0x26] = 6 checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[0x20] = checksum & 0xFF - packet[0x21] = checksum >> 8 + packet[0x20:0x22] = checksum.to_bytes(2, "little") start_time = time.time() discovered = [] @@ -81,18 +52,19 @@ def scan( while True: try: - response, host = conn.recvfrom(1024) + resp, host = conn.recvfrom(1024) except socket.timeout: break - devtype = response[0x34] | response[0x35] << 8 - mac = bytes(reversed(response[0x3A:0x40])) + devtype = resp[0x34] | resp[0x35] << 8 + mac = resp[0x3A:0x40][::-1] + if (host, mac, devtype) in discovered: continue discovered.append((host, mac, devtype)) - name = response[0x40:].split(b"\x00")[0].decode("utf-8") - is_locked = bool(response[-1]) + name = resp[0x40:].split(b"\x00")[0].decode() + is_locked = bool(resp[-1]) yield devtype, host, mac, name, is_locked finally: conn.close() @@ -123,7 +95,7 @@ class device: self.is_locked = is_locked self.count = random.randint(0x8000, 0xFFFF) self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") - self.id = bytes(4) + self.id = 0 self.type = "Unknown" self.lock = threading.Lock() @@ -170,30 +142,10 @@ class device: def auth(self) -> bool: """Authenticate to the device.""" payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0A] = 0x31 - payload[0x0B] = 0x31 - payload[0x0C] = 0x31 - payload[0x0D] = 0x31 - payload[0x0E] = 0x31 - payload[0x0F] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 + payload[0x04:0x14] = [0x31]*16 payload[0x1E] = 0x01 payload[0x2D] = 0x01 - payload[0x30] = ord("T") - payload[0x31] = ord("e") - payload[0x32] = ord("s") - payload[0x33] = ord("t") - payload[0x34] = ord(" ") - payload[0x35] = ord(" ") - payload[0x36] = ord("1") + payload[0x30:0x37] = "Test 1".encode() response = self.send_packet(0x65, payload) check_error(response[0x22:0x24]) @@ -203,7 +155,7 @@ class device: if len(key) % 16 != 0: return False - self.id = payload[0x03::-1] + self.id = int.from_bytes(payload[:0x4], "little") self.update_aes(key) return True @@ -262,73 +214,47 @@ class device: """Return device type.""" return self.type - def send_packet(self, command: int, payload: bytes) -> bytes: + def send_packet(self, packet_type: int, payload: bytes) -> bytes: """Send a packet to the device.""" self.count = ((self.count + 1) | 0x8000) & 0xFFFF packet = bytearray(0x38) - packet[0x00] = 0x5A - packet[0x01] = 0xA5 - packet[0x02] = 0xAA - packet[0x03] = 0x55 - packet[0x04] = 0x5A - packet[0x05] = 0xA5 - packet[0x06] = 0xAA - packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xFF - packet[0x25] = self.devtype >> 8 - packet[0x26] = command - packet[0x28] = self.count & 0xFF - packet[0x29] = self.count >> 8 - packet[0x2A] = self.mac[5] - packet[0x2B] = self.mac[4] - packet[0x2C] = self.mac[3] - packet[0x2D] = self.mac[2] - packet[0x2E] = self.mac[1] - packet[0x2F] = self.mac[0] - packet[0x30] = self.id[3] - packet[0x31] = self.id[2] - packet[0x32] = self.id[1] - packet[0x33] = self.id[0] + packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") + packet[0x24:0x26] = self.devtype.to_bytes(2, "little") + packet[0x26:0x28] = packet_type.to_bytes(2, "little") + packet[0x28:0x2a] = self.count.to_bytes(2, "little") + packet[0x2a:0x30] = self.mac[::-1] + packet[0x30:0x34] = self.id.to_bytes(4, "little") + + p_checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34:0x36] = p_checksum.to_bytes(2, "little") - # pad the payload for AES encryption padding = (16 - len(payload)) % 16 - if padding: - payload = bytearray(payload) - payload += bytearray(padding) - - checksum = sum(payload, 0xBEAF) & 0xFFFF - packet[0x34] = checksum & 0xFF - packet[0x35] = checksum >> 8 - - payload = self.encrypt(payload) - for i in range(len(payload)): - packet.append(payload[i]) + payload = self.encrypt(payload + bytes(padding)) + packet.extend(payload) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[0x20] = checksum & 0xFF - packet[0x21] = checksum >> 8 + packet[0x20:0x22] = checksum.to_bytes(2, "little") - with self.lock: - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: - timeout = self.timeout - start_time = time.time() + with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() - while True: - time_left = timeout - (time.time() - start_time) - conn.settimeout(min(1, time_left)) - conn.sendto(packet, self.host) + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(1, time_left)) + conn.sendto(packet, self.host) - try: - resp = conn.recvfrom(2048)[0] - break - except socket.timeout: - if (time.time() - start_time) > timeout: - raise exception(-4000) # Network timeout. + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout: + if (time.time() - start_time) > timeout: + raise exception(-4000) # Network timeout. if len(resp) < 0x30: raise exception(-4007) # Length error. - checksum = resp[0x20] | (resp[0x21] << 8) + checksum = int.from_bytes(resp[0x20:0x22], "little") if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: raise exception(-4008) # Checksum error. diff --git a/broadlink/protocol.py b/broadlink/protocol.py new file mode 100644 index 0000000..017125e --- /dev/null +++ b/broadlink/protocol.py @@ -0,0 +1,49 @@ +import datetime as dt +import time + + +class Datetime: + """Helps to pack and unpack datetime objects for the Broadlink protocol.""" + + @staticmethod + def pack(datetime: dt.datetime) -> bytes: + """Pack the timestamp to be sent over the Broadlink protocol.""" + data = bytearray(12) + utcoffset = int(datetime.utcoffset().total_seconds() / 3600) + data[:0x04] = utcoffset.to_bytes(4, "little", signed=True) + data[0x04:0x06] = datetime.year.to_bytes(2, "little") + data[0x06] = datetime.minute + data[0x07] = datetime.hour + data[0x08] = int(datetime.strftime('%y')) + data[0x09] = datetime.isoweekday() + data[0x0A] = datetime.day + data[0x0B] = datetime.month + return data + + @staticmethod + def unpack(data: bytes) -> dt.datetime: + """Unpack a timestamp received over the Broadlink protocol.""" + utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True) + year = int.from_bytes(data[0x04:0x06], "little") + minute = data[0x06] + hour = data[0x07] + subyear = data[0x08] + isoweekday = data[0x09] + day = data[0x0A] + month = data[0x0B] + + tz_info = dt.timezone(dt.timedelta(hours=utcoffset)) + datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info) + + if datetime.isoweekday() != isoweekday: + raise ValueError("isoweekday does not match") + if int(datetime.strftime('%y')) != subyear: + raise ValueError("subyear does not match") + + return datetime + + @staticmethod + def now() -> dt.datetime: + """Return the current date and time with timezone info.""" + tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone)) + return dt.datetime.now(tz_info)