1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-10 18:00:12 +01:00

Clean up device.py (#523)

* Clean up device.send_packet()

* Clean up device.auth()

* Clean up scan()
This commit is contained in:
Felipe Martins Diel 2021-01-25 17:00:46 -03:00 committed by Felipe Martins Diel
parent 82a20dbdda
commit 9af3a3c56c
2 changed files with 91 additions and 116 deletions

View File

@ -3,13 +3,13 @@ import socket
import threading import threading
import random import random
import time import time
from datetime import datetime
from typing import Generator, Tuple, Union from typing import Generator, Tuple, Union
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .exceptions import check_error, exception from .exceptions import check_error, exception
from .protocol import Datetime
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
@ -33,42 +33,13 @@ def scan(
port = 0 port = 0
packet = bytearray(0x30) packet = bytearray(0x30)
packet[0x08:0x14] = Datetime.pack(Datetime.now())
timezone = int(time.timezone / -3600) packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1]
if timezone < 0: packet[0x1C:0x1E] = port.to_bytes(2, "little")
packet[0x08] = 0xFF + timezone - 1
packet[0x09] = 0xFF
packet[0x0A] = 0xFF
packet[0x0B] = 0xFF
else:
packet[0x08] = timezone
packet[0x09] = 0
packet[0x0A] = 0
packet[0x0B] = 0
year = datetime.now().year
packet[0x0C] = year & 0xFF
packet[0x0D] = year >> 8
packet[0x0E] = datetime.now().minute
packet[0x0F] = datetime.now().hour
subyear = str(year)[2:]
packet[0x10] = int(subyear)
packet[0x11] = datetime.now().isoweekday()
packet[0x12] = datetime.now().day
packet[0x13] = datetime.now().month
address = local_ip_address.split(".")
packet[0x18] = int(address[3])
packet[0x19] = int(address[2])
packet[0x1A] = int(address[1])
packet[0x1B] = int(address[0])
packet[0x1C] = port & 0xFF
packet[0x1D] = port >> 8
packet[0x26] = 6 packet[0x26] = 6
checksum = sum(packet, 0xBEAF) & 0xFFFF checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x20] = checksum & 0xFF packet[0x20:0x22] = checksum.to_bytes(2, "little")
packet[0x21] = checksum >> 8
start_time = time.time() start_time = time.time()
discovered = [] discovered = []
@ -81,18 +52,19 @@ def scan(
while True: while True:
try: try:
response, host = conn.recvfrom(1024) resp, host = conn.recvfrom(1024)
except socket.timeout: except socket.timeout:
break break
devtype = response[0x34] | response[0x35] << 8 devtype = resp[0x34] | resp[0x35] << 8
mac = bytes(reversed(response[0x3A:0x40])) mac = resp[0x3A:0x40][::-1]
if (host, mac, devtype) in discovered: if (host, mac, devtype) in discovered:
continue continue
discovered.append((host, mac, devtype)) discovered.append((host, mac, devtype))
name = response[0x40:].split(b"\x00")[0].decode("utf-8") name = resp[0x40:].split(b"\x00")[0].decode()
is_locked = bool(response[-1]) is_locked = bool(resp[-1])
yield devtype, host, mac, name, is_locked yield devtype, host, mac, name, is_locked
finally: finally:
conn.close() conn.close()
@ -123,7 +95,7 @@ class device:
self.is_locked = is_locked self.is_locked = is_locked
self.count = random.randint(0x8000, 0xFFFF) self.count = random.randint(0x8000, 0xFFFF)
self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58")
self.id = bytes(4) self.id = 0
self.type = "Unknown" self.type = "Unknown"
self.lock = threading.Lock() self.lock = threading.Lock()
@ -170,30 +142,10 @@ class device:
def auth(self) -> bool: def auth(self) -> bool:
"""Authenticate to the device.""" """Authenticate to the device."""
payload = bytearray(0x50) payload = bytearray(0x50)
payload[0x04] = 0x31 payload[0x04:0x14] = [0x31]*16
payload[0x05] = 0x31
payload[0x06] = 0x31
payload[0x07] = 0x31
payload[0x08] = 0x31
payload[0x09] = 0x31
payload[0x0A] = 0x31
payload[0x0B] = 0x31
payload[0x0C] = 0x31
payload[0x0D] = 0x31
payload[0x0E] = 0x31
payload[0x0F] = 0x31
payload[0x10] = 0x31
payload[0x11] = 0x31
payload[0x12] = 0x31
payload[0x1E] = 0x01 payload[0x1E] = 0x01
payload[0x2D] = 0x01 payload[0x2D] = 0x01
payload[0x30] = ord("T") payload[0x30:0x37] = "Test 1".encode()
payload[0x31] = ord("e")
payload[0x32] = ord("s")
payload[0x33] = ord("t")
payload[0x34] = ord(" ")
payload[0x35] = ord(" ")
payload[0x36] = ord("1")
response = self.send_packet(0x65, payload) response = self.send_packet(0x65, payload)
check_error(response[0x22:0x24]) check_error(response[0x22:0x24])
@ -203,7 +155,7 @@ class device:
if len(key) % 16 != 0: if len(key) % 16 != 0:
return False return False
self.id = payload[0x03::-1] self.id = int.from_bytes(payload[:0x4], "little")
self.update_aes(key) self.update_aes(key)
return True return True
@ -262,73 +214,47 @@ class device:
"""Return device type.""" """Return device type."""
return self.type return self.type
def send_packet(self, command: int, payload: bytes) -> bytes: def send_packet(self, packet_type: int, payload: bytes) -> bytes:
"""Send a packet to the device.""" """Send a packet to the device."""
self.count = ((self.count + 1) | 0x8000) & 0xFFFF self.count = ((self.count + 1) | 0x8000) & 0xFFFF
packet = bytearray(0x38) packet = bytearray(0x38)
packet[0x00] = 0x5A packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55")
packet[0x01] = 0xA5 packet[0x24:0x26] = self.devtype.to_bytes(2, "little")
packet[0x02] = 0xAA packet[0x26:0x28] = packet_type.to_bytes(2, "little")
packet[0x03] = 0x55 packet[0x28:0x2a] = self.count.to_bytes(2, "little")
packet[0x04] = 0x5A packet[0x2a:0x30] = self.mac[::-1]
packet[0x05] = 0xA5 packet[0x30:0x34] = self.id.to_bytes(4, "little")
packet[0x06] = 0xAA
packet[0x07] = 0x55 p_checksum = sum(payload, 0xBEAF) & 0xFFFF
packet[0x24] = self.devtype & 0xFF packet[0x34:0x36] = p_checksum.to_bytes(2, "little")
packet[0x25] = self.devtype >> 8
packet[0x26] = command
packet[0x28] = self.count & 0xFF
packet[0x29] = self.count >> 8
packet[0x2A] = self.mac[5]
packet[0x2B] = self.mac[4]
packet[0x2C] = self.mac[3]
packet[0x2D] = self.mac[2]
packet[0x2E] = self.mac[1]
packet[0x2F] = self.mac[0]
packet[0x30] = self.id[3]
packet[0x31] = self.id[2]
packet[0x32] = self.id[1]
packet[0x33] = self.id[0]
# pad the payload for AES encryption
padding = (16 - len(payload)) % 16 padding = (16 - len(payload)) % 16
if padding: payload = self.encrypt(payload + bytes(padding))
payload = bytearray(payload) packet.extend(payload)
payload += bytearray(padding)
checksum = sum(payload, 0xBEAF) & 0xFFFF
packet[0x34] = checksum & 0xFF
packet[0x35] = checksum >> 8
payload = self.encrypt(payload)
for i in range(len(payload)):
packet.append(payload[i])
checksum = sum(packet, 0xBEAF) & 0xFFFF checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x20] = checksum & 0xFF packet[0x20:0x22] = checksum.to_bytes(2, "little")
packet[0x21] = checksum >> 8
with self.lock: with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: timeout = self.timeout
timeout = self.timeout start_time = time.time()
start_time = time.time()
while True: while True:
time_left = timeout - (time.time() - start_time) time_left = timeout - (time.time() - start_time)
conn.settimeout(min(1, time_left)) conn.settimeout(min(1, time_left))
conn.sendto(packet, self.host) conn.sendto(packet, self.host)
try: try:
resp = conn.recvfrom(2048)[0] resp = conn.recvfrom(2048)[0]
break break
except socket.timeout: except socket.timeout:
if (time.time() - start_time) > timeout: if (time.time() - start_time) > timeout:
raise exception(-4000) # Network timeout. raise exception(-4000) # Network timeout.
if len(resp) < 0x30: if len(resp) < 0x30:
raise exception(-4007) # Length error. raise exception(-4007) # Length error.
checksum = resp[0x20] | (resp[0x21] << 8) checksum = int.from_bytes(resp[0x20:0x22], "little")
if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum:
raise exception(-4008) # Checksum error. raise exception(-4008) # Checksum error.

49
broadlink/protocol.py Normal file
View File

@ -0,0 +1,49 @@
import datetime as dt
import time
class Datetime:
"""Helps to pack and unpack datetime objects for the Broadlink protocol."""
@staticmethod
def pack(datetime: dt.datetime) -> bytes:
"""Pack the timestamp to be sent over the Broadlink protocol."""
data = bytearray(12)
utcoffset = int(datetime.utcoffset().total_seconds() / 3600)
data[:0x04] = utcoffset.to_bytes(4, "little", signed=True)
data[0x04:0x06] = datetime.year.to_bytes(2, "little")
data[0x06] = datetime.minute
data[0x07] = datetime.hour
data[0x08] = int(datetime.strftime('%y'))
data[0x09] = datetime.isoweekday()
data[0x0A] = datetime.day
data[0x0B] = datetime.month
return data
@staticmethod
def unpack(data: bytes) -> dt.datetime:
"""Unpack a timestamp received over the Broadlink protocol."""
utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True)
year = int.from_bytes(data[0x04:0x06], "little")
minute = data[0x06]
hour = data[0x07]
subyear = data[0x08]
isoweekday = data[0x09]
day = data[0x0A]
month = data[0x0B]
tz_info = dt.timezone(dt.timedelta(hours=utcoffset))
datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info)
if datetime.isoweekday() != isoweekday:
raise ValueError("isoweekday does not match")
if int(datetime.strftime('%y')) != subyear:
raise ValueError("subyear does not match")
return datetime
@staticmethod
def now() -> dt.datetime:
"""Return the current date and time with timezone info."""
tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone))
return dt.datetime.now(tz_info)