From 0dc0068d63d2e07cf7524854aff16022b544c911 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Thu, 24 Sep 2020 02:36:12 -0300 Subject: [PATCH] Improve code quality (#428) * Fix lint errors * Remove rm2 class * Rename cs to conn * Add __repr__ to device class * Make get_devices() a dictionary * Clean up alarm kit * Add module doscstrings * Fix MAC address conversion --- broadlink/__init__.py | 158 +++++++++++++++++++++--------------------- broadlink/alarm.py | 56 +++++++-------- broadlink/climate.py | 1 + broadlink/cover.py | 1 + broadlink/device.py | 72 +++++++++++-------- broadlink/light.py | 1 + broadlink/remote.py | 13 +--- broadlink/sensor.py | 1 + broadlink/switch.py | 1 + 9 files changed, 150 insertions(+), 154 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index bc5faee..7324d37 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 """The python-broadlink library.""" import socket -from typing import Dict, List, Union, Tuple, Type +from typing import Generator, List, Union, Tuple from .alarm import S1C from .climate import hysen @@ -9,87 +9,85 @@ from .cover import dooya from .device import device, scan from .exceptions import exception from .light import lb1 -from .remote import rm, rm2, rm4 +from .remote import rm, rm4 from .sensor import a1 from .switch import bg1, mp1, sp1, sp2, sp4 -def get_devices() -> Dict[int, Tuple[Type[device], str, str]]: - """Return all supported devices.""" - return { - 0x0000: (sp1, "SP1", "Broadlink"), - 0x2711: (sp2, "SP2", "Broadlink"), - 0x2716: (sp2, "NEO PRO", "Ankuoo"), - 0x2717: (sp2, "NEO", "Ankuoo"), - 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271a: (sp2, "SP2-compatible", "Honeywell"), - 0x2720: (sp2, "SP mini", "Broadlink"), - 0x2728: (sp2, "SP2-compatible", "URANT"), - 0x2733: (sp2, "SP3", "Broadlink"), - 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273e: (sp2, "SP mini", "Broadlink"), - 0x7530: (sp2, "SP2", "Broadlink (OEM)"), - 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), - 0x753e: (sp2, "SP mini 3", "Broadlink"), - 0x7540: (sp2, "MP2", "Broadlink"), - 0X7544: (sp2, "SP2-CL", "Broadlink"), - 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), - 0x7547: (sp2, "SC1", "Broadlink"), - 0x7918: (sp2, "SP2", "Broadlink (OEM)"), - 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791a: (sp2, "SP2-compatible", "Honeywell"), - 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), - 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947a: (sp2, "SP3S-EU", "Broadlink"), - 0x7579: (sp4, "SP4L-EU", "Broadlink"), - 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272a: (rm, "RM pro", "Broadlink"), - 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273d: (rm, "RM pro", "Broadlink"), - 0x277c: (rm, "RM home", "Broadlink"), - 0x2783: (rm, "RM home", "Broadlink"), - 0x2787: (rm, "RM pro", "Broadlink"), - 0x278b: (rm, "RM plus", "Broadlink"), - 0x278f: (rm, "RM mini", "Broadlink"), - 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279d: (rm, "RM pro+", "Broadlink"), - 0x27a1: (rm, "RM plus", "Broadlink"), - 0x27a6: (rm, "RM plus", "Broadlink"), - 0x27a9: (rm, "RM pro+", "Broadlink"), - 0x27c2: (rm, "RM mini 3", "Broadlink"), - 0x27c3: (rm, "RM pro+", "Broadlink"), - 0x27cc: (rm, "RM mini 3", "Broadlink"), - 0x27cd: (rm, "RM mini 3", "Broadlink"), - 0x27d0: (rm, "RM mini 3", "Broadlink"), - 0x27d1: (rm, "RM mini 3", "Broadlink"), - 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), - 0x5f36: (rm4, "RM mini 3", "Broadlink"), - 0x6026: (rm4, "RM4 pro", "Broadlink"), - 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610e: (rm4, "RM4 mini", "Broadlink"), - 0x610f: (rm4, "RM4C mini", "Broadlink"), - 0x61a2: (rm4, "RM4 pro", "Broadlink"), - 0x62bc: (rm4, "RM4 mini", "Broadlink"), - 0x62be: (rm4, "RM4C mini", "Broadlink"), - 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x649b: (rm4, "RM4 pro", "Broadlink"), - 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4f1b: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), - 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), - 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504e: (lb1, "LB1", "Broadlink"), - 0x60c7: (lb1, "LB1", "Broadlink"), - 0x60c8: (lb1, "LB1", "Broadlink"), - 0x6112: (lb1, "LB1", "Broadlink"), - 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4ead: (hysen, "HY02B05H", "Hysen"), - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), - } +SUPPORTED_TYPES = { + 0x0000: (sp1, "SP1", "Broadlink"), + 0x2711: (sp2, "SP2", "Broadlink"), + 0x2716: (sp2, "NEO PRO", "Ankuoo"), + 0x2717: (sp2, "NEO", "Ankuoo"), + 0x2719: (sp2, "SP2-compatible", "Honeywell"), + 0x271a: (sp2, "SP2-compatible", "Honeywell"), + 0x2720: (sp2, "SP mini", "Broadlink"), + 0x2728: (sp2, "SP2-compatible", "URANT"), + 0x2733: (sp2, "SP3", "Broadlink"), + 0x2736: (sp2, "SP mini+", "Broadlink"), + 0x273e: (sp2, "SP mini", "Broadlink"), + 0x7530: (sp2, "SP2", "Broadlink (OEM)"), + 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), + 0x753e: (sp2, "SP mini 3", "Broadlink"), + 0x7540: (sp2, "MP2", "Broadlink"), + 0X7544: (sp2, "SP2-CL", "Broadlink"), + 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), + 0x7547: (sp2, "SC1", "Broadlink"), + 0x7918: (sp2, "SP2", "Broadlink (OEM)"), + 0x7919: (sp2, "SP2-compatible", "Honeywell"), + 0x791a: (sp2, "SP2-compatible", "Honeywell"), + 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), + 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), + 0x9479: (sp2, "SP3S-US", "Broadlink"), + 0x947a: (sp2, "SP3S-EU", "Broadlink"), + 0x7579: (sp4, "SP4L-EU", "Broadlink"), + 0x2712: (rm, "RM pro/pro+", "Broadlink"), + 0x272a: (rm, "RM pro", "Broadlink"), + 0x2737: (rm, "RM mini 3", "Broadlink"), + 0x273d: (rm, "RM pro", "Broadlink"), + 0x277c: (rm, "RM home", "Broadlink"), + 0x2783: (rm, "RM home", "Broadlink"), + 0x2787: (rm, "RM pro", "Broadlink"), + 0x278b: (rm, "RM plus", "Broadlink"), + 0x278f: (rm, "RM mini", "Broadlink"), + 0x2797: (rm, "RM pro+", "Broadlink"), + 0x279d: (rm, "RM pro+", "Broadlink"), + 0x27a1: (rm, "RM plus", "Broadlink"), + 0x27a6: (rm, "RM plus", "Broadlink"), + 0x27a9: (rm, "RM pro+", "Broadlink"), + 0x27c2: (rm, "RM mini 3", "Broadlink"), + 0x27c3: (rm, "RM pro+", "Broadlink"), + 0x27cc: (rm, "RM mini 3", "Broadlink"), + 0x27cd: (rm, "RM mini 3", "Broadlink"), + 0x27d0: (rm, "RM mini 3", "Broadlink"), + 0x27d1: (rm, "RM mini 3", "Broadlink"), + 0x27de: (rm, "RM mini 3", "Broadlink"), + 0x51da: (rm4, "RM4 mini", "Broadlink"), + 0x5f36: (rm4, "RM mini 3", "Broadlink"), + 0x6026: (rm4, "RM4 pro", "Broadlink"), + 0x6070: (rm4, "RM4C mini", "Broadlink"), + 0x610e: (rm4, "RM4 mini", "Broadlink"), + 0x610f: (rm4, "RM4C mini", "Broadlink"), + 0x61a2: (rm4, "RM4 pro", "Broadlink"), + 0x62bc: (rm4, "RM4 mini", "Broadlink"), + 0x62be: (rm4, "RM4C mini", "Broadlink"), + 0x648d: (rm4, "RM4 mini", "Broadlink"), + 0x649b: (rm4, "RM4 pro", "Broadlink"), + 0x2714: (a1, "e-Sensor", "Broadlink"), + 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), + 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), + 0x4f1b: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), + 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), + 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), + 0x504e: (lb1, "LB1", "Broadlink"), + 0x60c7: (lb1, "LB1", "Broadlink"), + 0x60c8: (lb1, "LB1", "Broadlink"), + 0x6112: (lb1, "LB1", "Broadlink"), + 0x2722: (S1C, "S2KIT", "Broadlink"), + 0x4ead: (hysen, "HY02B05H", "Hysen"), + 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), + 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), +} def gendevice( @@ -101,7 +99,7 @@ def gendevice( ) -> device: """Generate a device.""" try: - dev_class, model, manufacturer = get_devices()[dev_type] + dev_class, model, manufacturer = SUPPORTED_TYPES[dev_type] except KeyError: return device(host, mac, dev_type, name=name, is_locked=is_locked) @@ -185,7 +183,7 @@ def setup(ssid: str, password: str, security_mode: int) -> None: payload[0x84] = ssid_length # Character length of SSID payload[0x85] = pass_length # Character length of password - payload[0x86] = security_mode # Type of encryption (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2) + payload[0x86] = security_mode # Type of encryption checksum = sum(payload, 0xbeaf) & 0xffff payload[0x20] = checksum & 0xff # Checksum 1 position diff --git a/broadlink/alarm.py b/broadlink/alarm.py index 62dbb30..faded9d 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -1,17 +1,17 @@ +"""Support for alarm kits.""" from .device import device from .exceptions import check_error -S1C_SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex -} - - class S1C(device): """Controls a Broadlink S1C.""" + _SENSORS_TYPES = { + 0x31: 'Door Sensor', # 49 as hex + 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse + 0x21: 'Motion Sensor' # 33 as hex + } + def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) @@ -27,30 +27,22 @@ class S1C(device): if not payload: return None count = payload[0x4] - sensors = payload[0x6:] - sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] - - sens_res = [] - for sens in sensors_a: - status = sens[0] - _name = sens[4:26].decode() - _order = sens[1] - _type = sens[3] - _serial = sens[26:30].hex() - - type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') - - r = { - 'status': status, - 'name': _name.strip('\x00'), - 'type': type_str, - 'order': _order, - 'serial': _serial, - } - if r['serial'] != '00000000': - sens_res.append(r) - result = { + sensor_data = payload[0x6:] + sensors = [ + bytearray(sensor_data[i * 83:(i + 1) * 83]) + for i in range(len(sensor_data) // 83) + ] + return { 'count': count, - 'sensors': sens_res + 'sensors': [ + { + 'status': sensor[0], + 'name': sensor[4:26].decode().strip('\x00'), + 'type': self._SENSORS_TYPES.get(sensor[3], 'Unknown'), + 'order': sensor[1], + 'serial': sensor[26:30].hex(), + } + for sensor in sensors + if any(sensor[26:30]) + ] } - return result diff --git a/broadlink/climate.py b/broadlink/climate.py index 3d68eb4..f0c337e 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,3 +1,4 @@ +"""Support for climate control.""" from typing import List from .device import device diff --git a/broadlink/cover.py b/broadlink/cover.py index b6b3be1..236e747 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,3 +1,4 @@ +"""Support for covers.""" import time from .device import device diff --git a/broadlink/device.py b/broadlink/device.py index e1ad59d..0c2021a 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -1,3 +1,4 @@ +"""Support for Broadlink devices.""" import socket import threading import random @@ -20,13 +21,13 @@ def scan( discover_ip_port: int = 80, ) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) if local_ip_address: - cs.bind((local_ip_address, 0)) - port = cs.getsockname()[1] + conn.bind((local_ip_address, 0)) + port = conn.getsockname()[1] else: local_ip_address = "0.0.0.0" port = 0 @@ -71,13 +72,13 @@ def scan( packet[0x20] = checksum & 0xff packet[0x21] = checksum >> 8 - cs.sendto(packet, (discover_ip_address, discover_ip_port)) + conn.sendto(packet, (discover_ip_address, discover_ip_port)) try: while (time.time() - starttime) < timeout: - cs.settimeout(timeout - (time.time() - starttime)) + conn.settimeout(timeout - (time.time() - starttime)) try: - response, host = cs.recvfrom(1024) + response, host = conn.recvfrom(1024) except socket.timeout: break @@ -87,26 +88,26 @@ def scan( is_locked = bool(response[-1]) yield devtype, host, mac, name, is_locked finally: - cs.close() + conn.close() class device: """Controls a Broadlink device.""" def __init__( - self, - host: Tuple[str, int], - mac: Union[bytes, str], - devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, ) -> None: """Initialize the controller.""" self.host = host - self.mac = mac.encode() if isinstance(mac, str) else mac + self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac self.devtype = devtype if devtype is not None else 0x272a self.timeout = timeout self.name = name @@ -114,17 +115,28 @@ class device: self.manufacturer = manufacturer self.is_locked = is_locked self.count = random.randrange(0xffff) - self.iv = bytes( - [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) + self.iv = bytes.fromhex('562e17996d093d28ddb3ba695a2e6f58') self.id = bytes(4) self.type = "Unknown" self.lock = threading.Lock() self.aes = None - key = bytes( - [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) + key = bytes.fromhex('097628343fe99e23765c1513accf8b02') self.update_aes(key) + def __repr__(self): + return "<%s: %s %s (%s) at %s:%s | %s | %s | %s>" % ( + type(self).__name__, + self.manufacturer, + self.model, + hex(self.devtype), + self.host[0], + self.host[1], + ':'.join(format(x, '02x') for x in self.mac), + self.name, + "Locked" if self.is_locked else "Unlocked", + ) + def update_aes(self, key: bytes) -> None: """Update AES.""" self.aes = Cipher( @@ -284,20 +296,20 @@ class device: start_time = time.time() with self.lock: - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) while True: try: - cs.sendto(packet, self.host) - cs.settimeout(1) - resp, _ = cs.recvfrom(2048) + conn.sendto(packet, self.host) + conn.settimeout(1) + resp, _ = conn.recvfrom(2048) break except socket.timeout: if (time.time() - start_time) > self.timeout: - cs.close() + conn.close() raise exception(-4000) # Network timeout. - cs.close() + conn.close() if len(resp) < 0x30: raise exception(-4007) # Length error. diff --git a/broadlink/light.py b/broadlink/light.py index 1cac4a5..cffc77d 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -1,3 +1,4 @@ +"""Support for lights.""" import json from typing import Union diff --git a/broadlink/remote.py b/broadlink/remote.py index a08c675..4a427f6 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,3 +1,4 @@ +"""Support for universal remotes.""" from .device import device from .exceptions import check_error @@ -119,15 +120,3 @@ class rm4(rm): 'temperature': data[0x0] + data[0x1] / 100.0, 'humidity': data[0x2] + data[0x3] / 100.0 } - - -# For legacy compatibility - don't use this -class rm2(rm): - def __init__(self): - device.__init__(self, None, None, None) - - def discover(self): - dev = discover() - self.host = dev.host - self.mac = dev.mac - diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 819518b..63c23b4 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -1,3 +1,4 @@ +"""Support for sensors.""" from .device import device from .exceptions import check_error diff --git a/broadlink/switch.py b/broadlink/switch.py index 34614f0..b25d069 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,3 +1,4 @@ +"""Support for switches.""" import json import struct