From 8bf107ab696d3bd72b26a19b2079a59b68b297fd Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Wed, 16 Sep 2020 04:41:28 -0300 Subject: [PATCH] Add docstrings and annotations --- broadlink/__init__.py | 350 ++++++++++++++++++++++++---------------- broadlink/exceptions.py | 36 ----- 2 files changed, 212 insertions(+), 174 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index bc3a63d..a31895e 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,5 +1,5 @@ -#!/usr/bin/python - +#!/usr/bin/python3 +"""The python-broadlink library.""" import codecs import json import random @@ -16,10 +16,10 @@ from .exceptions import check_error, exception from .helpers import get_local_ip -def get_devices(): +def get_devices() -> dict: + """Return all supported devices.""" return { 0x0000: (sp1, "SP1", "Broadlink"), - 0x2711: (sp2, "SP2", "Broadlink"), 0x2719: (sp2, "SP2-compatible", "Honeywell"), 0x271a: (sp2, "SP2-compatible", "Honeywell"), @@ -40,7 +40,6 @@ def get_devices(): 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), 0x9479: (sp2, "SP3S-US", "Broadlink"), 0x947a: (sp2, "SP3S-EU", "Broadlink"), - 0x2712: (rm, "RM pro/pro+", "Broadlink"), 0x272a: (rm, "RM pro", "Broadlink"), 0x2737: (rm, "RM mini 3", "Broadlink"), @@ -58,7 +57,6 @@ def get_devices(): 0x27c2: (rm, "RM mini 3", "Broadlink"), 0x27d1: (rm, "RM mini 3", "Broadlink"), 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), 0x5f36: (rm4, "RM mini 3", "Broadlink"), 0x6026: (rm4, "RM4 pro", "Broadlink"), @@ -69,25 +67,18 @@ def get_devices(): 0x62bc: (rm4, "RM4 mini", "Broadlink"), 0x62be: (rm4, "RM4C mini", "Broadlink"), 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), - 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), 0x504e: (lb1, "LB1", "Broadlink"), 0x60c7: (lb1, "LB1", "Broadlink"), 0x60c8: (lb1, "LB1", "Broadlink"), 0x6112: (lb1, "LB1", "Broadlink"), - 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4ead: (hysen, "HY02B05H", "Hysen"), - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), } @@ -116,7 +107,8 @@ def discover( local_ip_address=None, discover_ip_address='255.255.255.255', discover_ip_port=80 -): +) -> list: + """Discover devices connected to the local network.""" local_ip_address = local_ip_address or get_local_ip() address = local_ip_address.split('.') cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -143,6 +135,7 @@ def discover( packet[0x09] = 0 packet[0x0a] = 0 packet[0x0b] = 0 + packet[0x0c] = year & 0xff packet[0x0d] = year >> 8 packet[0x0e] = datetime.now().minute @@ -197,6 +190,8 @@ def discover( class device: + """Controls a Broadlink device.""" + def __init__( self, host, @@ -207,7 +202,8 @@ class device: model=None, manufacturer=None, is_locked=None - ): + ) -> None: + """Initialize the controller.""" self.host = host self.mac = mac.encode() if isinstance(mac, str) else mac self.devtype = devtype if devtype is not None else 0x272a @@ -228,19 +224,24 @@ class device: [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) self.update_aes(key) - def update_aes(self, key): - self.aes = Cipher(algorithms.AES(key), modes.CBC(self.iv), - backend=default_backend()) + def update_aes(self, key) -> None: + """Update AES.""" + self.aes = Cipher( + algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() + ) - def encrypt(self, payload): + def encrypt(self, payload) -> bytes: + """Encrypt the payload.""" encryptor = self.aes.encryptor() return encryptor.update(payload) + encryptor.finalize() - def decrypt(self, payload): + def decrypt(self, payload) -> bytes: + """Decrypt the payload.""" decryptor = self.aes.decryptor() return decryptor.update(payload) + decryptor.finalize() - def auth(self): + def auth(self) -> bool: + """Authenticate to the device.""" payload = bytearray(0x50) payload[0x04] = 0x31 payload[0x05] = 0x31 @@ -277,17 +278,18 @@ class device: self.id = payload[0x03::-1] self.update_aes(key) - return True - def get_fwversion(self): + def get_fwversion(self) -> int: + """Get firmware version.""" packet = bytearray([0x68]) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 - def set_name(self, name): + def set_name(self, name) -> None: + """Set device name.""" packet = bytearray(4) packet += name.encode('utf-8') packet += bytearray(0x50 - len(packet)) @@ -296,7 +298,8 @@ class device: check_error(response[0x22:0x24]) self.name = name - def set_lock(self, state): + def set_lock(self, state) -> None: + """Lock/unlock the device.""" packet = bytearray(4) packet += self.name.encode('utf-8') packet += bytearray(0x50 - len(packet)) @@ -305,10 +308,12 @@ class device: check_error(response[0x22:0x24]) self.is_locked = bool(state) - def get_type(self): + def get_type(self) -> str: + """Return device type.""" return self.type - def send_packet(self, command, payload): + def send_packet(self, command, payload) -> bytearray: + """Send a packet to the device.""" self.count = (self.count + 1) & 0xffff packet = bytearray(0x38) packet[0x00] = 0x5a @@ -380,13 +385,15 @@ class device: class mp1(device): - def __init__(self, *args, **kwargs): + """Controls a Broadlink MP1.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "MP1" - def set_power_mask(self, sid_mask, state): - """Sets the power state of the smart power strip.""" - + def set_power_mask(self, sid_mask, state) -> None: + """Set the power state of the device.""" packet = bytearray(16) packet[0x00] = 0x0d packet[0x02] = 0xa5 @@ -403,13 +410,13 @@ class mp1(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def set_power(self, sid, state): - """Sets the power state of the smart power strip.""" + def set_power(self, sid, state) -> None: + """Set the power state of the device.""" sid_mask = 0x01 << (sid - 1) - return self.set_power_mask(sid_mask, state) + self.set_power_mask(sid_mask, state) - def check_power_raw(self): - """Returns the power state of the smart power strip in raw format.""" + def check_power_raw(self) -> bool: + """Return the power state of the device in raw format.""" packet = bytearray(16) packet[0x00] = 0x0a packet[0x02] = 0xa5 @@ -423,14 +430,10 @@ class mp1(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x4], int): - state = payload[0x0e] - else: - state = ord(payload[0x0e]) - return state + return payload[0x0e] - def check_power(self): - """Returns the power state of the smart power strip.""" + def check_power(self) -> dict: + """Return the power state of the device.""" state = self.check_power_raw() if state is None: return {'s1': None, 's2': None, 's3': None, 's4': None} @@ -443,22 +446,25 @@ class mp1(device): class bg1(device): - def __init__(self, *args, **kwargs): + """Controls a BG Electrical smart outlet.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "BG1" - def get_state(self): - """Get state of device. + def get_state(self) -> dict: + """Return the power state of the device. - Returns: - dict: Dictionary of current state - eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`""" + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ packet = self._encode(1, b'{}') response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) return self._decode(response) - def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None): + def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None) -> dict: + """Set the power state of the device.""" data = {} if pwr is not None: data['pwr'] = int(bool(pwr)) @@ -480,15 +486,16 @@ class bg1(device): check_error(response[0x22:0x24]) return self._decode(response) - def _encode(self, flag, js): - # packet format is: - # 0x00-0x01 length - # 0x02-0x05 header - # 0x06-0x07 00 - # 0x08 flag (1 for read or 2 write?) - # 0x09 unknown (0xb) - # 0x0a-0x0d length of json - # 0x0e- json data + def _encode(self, flag, js) -> bytearray: + """Encode a message.""" + # The packet format is: + # 0x00-0x01 length + # 0x02-0x05 header + # 0x06-0x07 00 + # 0x08 flag (1 for read or 2 write?) + # 0x09 unknown (0xb) + # 0x0a-0x0d length of json + # 0x0e- json data packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(js) struct.pack_into('> 8 - return packet - def _decode(self, response): + def _decode(self, response) -> dict: + """Decode a message.""" payload = self.decrypt(bytes(response[0x38:])) js_len = struct.unpack_from(' None: + """Initialize the device.""" device.__init__(self, *args, **kwargs) self.type = "SP1" - def set_power(self, state): + def set_power(self, state) -> None: + """Set the power state of the device.""" packet = bytearray(4) packet[0] = state response = self.send_packet(0x66, packet) @@ -521,12 +532,15 @@ class sp1(device): class sp2(device): - def __init__(self, *args, **kwargs): + """Controls a Broadlink SP2.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "SP2" - def set_power(self, state): - """Sets the power state of the smart plug.""" + def set_power(self, state) -> None: + """Set the power state of the device.""" packet = bytearray(16) packet[0] = 2 if self.check_nightlight(): @@ -536,8 +550,8 @@ class sp2(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def set_nightlight(self, state): - """Sets the night light state of the smart plug""" + def set_nightlight(self, state) -> None: + """Set the night light state of the device.""" packet = bytearray(16) packet[0] = 2 if self.check_power(): @@ -547,8 +561,8 @@ class sp2(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def check_power(self): - """Returns the power state of the smart plug.""" + def check_power(self) -> bool: + """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6a, packet) @@ -558,8 +572,8 @@ class sp2(device): return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) return bool(ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD) - def check_nightlight(self): - """Returns the power state of the smart plug.""" + def check_nightlight(self) -> bool: + """Return the state of the night light.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6a, packet) @@ -569,7 +583,8 @@ class sp2(device): return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF) - def get_energy(self): + def get_energy(self) -> int: + """Return the energy state of the device.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) @@ -583,6 +598,7 @@ class sp2(device): class a1(device): + """Controls a Broadlink A1.""" _SENSORS_AND_LEVELS = ( ('light', ('dark', 'dim', 'normal', 'bright')), @@ -590,11 +606,13 @@ class a1(device): ('noise', ('quiet', 'normal', 'noisy')), ) - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "A1" - def check_sensors(self): + def check_sensors(self) -> dict: + """Return the state of the sensors.""" data = self.check_sensors_raw() for sensor, levels in self._SENSORS_AND_LEVELS: try: @@ -603,7 +621,8 @@ class a1(device): data[sensor] = 'unknown' return data - def check_sensors_raw(self): + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) @@ -619,13 +638,17 @@ class a1(device): class rm(device): - def __init__(self, *args, **kwargs): + """Controls a Broadlink RM.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM2" self._request_header = bytes() self._code_sending_header = bytes() - def check_data(self): + def check_data(self) -> bytes: + """Return the last captured code.""" packet = bytearray(self._request_header) packet.append(0x04) response = self.send_packet(0x6a, packet) @@ -633,32 +656,37 @@ class rm(device): payload = self.decrypt(bytes(response[0x38:])) return payload[len(self._request_header) + 4:] - def send_data(self, data): + def send_data(self, data) -> None: + """Send a code to the device.""" packet = bytearray(self._code_sending_header) packet += bytearray([0x02, 0x00, 0x00, 0x00]) packet += data response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def enter_learning(self): + def enter_learning(self) -> None: + """Enter infrared learning mode.""" packet = bytearray(self._request_header) packet.append(0x03) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def sweep_frequency(self): + def sweep_frequency(self) -> None: + """Sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x19) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def cancel_sweep_frequency(self): + def cancel_sweep_frequency(self) -> None: + """Cancel sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x1e) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def check_frequency(self): + def check_frequency(self) -> bool: + """Return True if the frequency was identified successfully.""" packet = bytearray(self._request_header) packet.append(0x1a) response = self.send_packet(0x6a, packet) @@ -668,7 +696,8 @@ class rm(device): return True return False - def find_rf_packet(self): + def find_rf_packet(self) -> bool: + """Enter radiofrequency learning mode.""" packet = bytearray(self._request_header) packet.append(0x1b) response = self.send_packet(0x6a, packet) @@ -678,7 +707,8 @@ class rm(device): return True return False - def _check_sensors(self, command): + def _check_sensors(self, command) -> bytes: + """Return the state of the sensors in raw format.""" packet = bytearray(self._request_header) packet.append(command) response = self.send_packet(0x6a, packet) @@ -686,31 +716,39 @@ class rm(device): payload = self.decrypt(bytes(response[0x38:])) return bytearray(payload[len(self._request_header) + 4:]) - def check_temperature(self): + def check_temperature(self) -> int: + """Return the temperature.""" data = self._check_sensors(0x1) return data[0x0] + data[0x1] / 10.0 - def check_sensors(self): + def check_sensors(self) -> dict: + """Return the state of the sensors.""" data = self._check_sensors(0x1) return {'temperature': data[0x0] + data[0x1] / 10.0} class rm4(rm): - def __init__(self, *args, **kwargs): + """Controls a Broadlink RM4.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" self._request_header = b'\x04\x00' self._code_sending_header = b'\xda\x00' - def check_temperature(self): + def check_temperature(self) -> int: + """Return the temperature.""" data = self._check_sensors(0x24) return data[0x0] + data[0x1] / 100.0 - def check_humidity(self): + def check_humidity(self) -> int: + """Return the humidity.""" data = self._check_sensors(0x24) return data[0x2] + data[0x3] / 100.0 - def check_sensors(self): + def check_sensors(self) -> dict: + """Return the state of the sensors.""" data = self._check_sensors(0x24) return { 'temperature': data[0x0] + data[0x1] / 100.0, @@ -730,7 +768,10 @@ class rm2(rm): class hysen(device): - def __init__(self, *args, **kwargs): + """Controls a Hysen HVAC.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "Hysen heating controller" @@ -740,7 +781,8 @@ class hysen(device): # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails # The function prepends length (2 bytes) and appends CRC - def calculate_crc16(self, input_data): + def calculate_crc16(self, input_data) -> int: + """Calculate CRC-16.""" from ctypes import c_ushort crc16_tab = [] crc16_constant = 0xA001 @@ -774,8 +816,8 @@ class hysen(device): except Exception as e: print("EXCEPTION(calculate): {}".format(e)) - def send_request(self, input_payload): - + def send_request(self, input_payload) -> bytes: + """Send a request to the device.""" crc = self.calculate_crc16(bytes(input_payload)) # first byte is length, +2 for CRC16 @@ -801,18 +843,22 @@ class hysen(device): return response_payload[2:response_payload_len] raise ValueError('hysen_response_error', 'CRC check on response failed') - # Get current room temperature in degrees celsius - def get_temp(self): + def get_temp(self) -> int: + """Return the room temperature in degrees celsius.""" payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) return payload[0x05] / 2.0 - # Get current external temperature in degrees celsius - def get_external_temp(self): + def get_external_temp(self) -> int: + """Return the external temperature in degrees celsius.""" payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) return payload[18] / 2.0 - # Get full status (including timer schedule) - def get_full_status(self): + def get_full_status(self) -> dict: + """Return the state of the device. + + Timer schedule included. + """ + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) data = {} data['remote_lock'] = payload[3] & 1 @@ -862,7 +908,8 @@ class hysen(device): # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule # The sensor command is currently experimental - def set_mode(self, auto_mode, loop_mode, sensor=0): + def set_mode(self, auto_mode, loop_mode, sensor=0) -> None: + """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) @@ -877,31 +924,37 @@ class hysen(device): # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, # 1 for anti-freezing function open. Factory default: 0 # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 - def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron): + def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron) -> None: + """Set advanced options.""" input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. # Note this function invokes loop_mode=0 and sensor=0. - def switch_to_auto(self): + def switch_to_auto(self) -> None: + """Switch mode to auto.""" self.set_mode(auto_mode=1, loop_mode=0) - def switch_to_manual(self): + def switch_to_manual(self) -> None: + """Switch mode to manual.""" self.set_mode(auto_mode=0, loop_mode=0) # Set temperature for manual mode (also activates manual mode if currently in automatic) - def set_temp(self, temp): + def set_temp(self, temp) -> None: + """Set the target temperature.""" self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power=1, remote_lock=0): + def set_power(self, power=1, remote_lock=0) -> None: + """Set the power state of the device.""" self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday - def set_time(self, hour, minute, second, day): + def set_time(self, hour, minute, second, day) -> None: + """Set the time.""" self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) # Set timer schedule @@ -910,7 +963,8 @@ class hysen(device): # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday, weekend): + def set_schedule(self, weekday, weekend) -> None: + """Set timer schedule.""" # Begin with some magic values ... input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) @@ -944,15 +998,15 @@ S1C_SENSORS_TYPES = { class S1C(device): - """ - Its VERY VERY VERY DIRTY IMPLEMENTATION of S1C - """ + """Controls a Broadlink S1C.""" - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = 'S1C' - def get_sensors_status(self): + def get_sensors_status(self) -> dict: + """Return the state of the sensors.""" packet = bytearray(16) packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors response = self.send_packet(0x6a, packet) @@ -991,11 +1045,15 @@ class S1C(device): class dooya(device): - def __init__(self, *args, **kwargs): + """Controls a Dooya curtain motor.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "Dooya DT360E" - def _send(self, magic1, magic2): + def _send(self, magic1, magic2) -> int: + """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 packet[2] = 0xbb @@ -1008,19 +1066,24 @@ class dooya(device): payload = self.decrypt(bytes(response[0x38:])) return ord(payload[4]) - def open(self): + def open(self) -> int: + """Open the curtain.""" return self._send(0x01, 0x00) - def close(self): + def close(self) -> int: + """Close the curtain.""" return self._send(0x02, 0x00) - def stop(self): + def stop(self) -> int: + """Stop the curtain.""" return self._send(0x03, 0x00) - def get_percentage(self): + def get_percentage(self) -> int: + """Return the position of the curtain.""" return self._send(0x06, 0x5d) - def set_percentage_and_wait(self, new_percentage): + def set_percentage_and_wait(self, new_percentage) -> None: + """Set the position of the curtain.""" current = self.get_percentage() if current > new_percentage: self.close() @@ -1035,22 +1098,29 @@ class dooya(device): current = self.get_percentage() self.stop() -class lb1(device): - state_dict = [] - effect_map_dict = { 'lovely color' : 0, - 'flashlight' : 1, - 'lightning' : 2, - 'color fading' : 3, - 'color breathing' : 4, - 'multicolor breathing' : 5, - 'color jumping' : 6, - 'multicolor jumping' : 7 } - def __init__(self, *args, **kwargs): +class lb1(device): + """Controls a Broadlink LB1.""" + + state_dict = [] + effect_map_dict = { + 'lovely color': 0, + 'flashlight': 1, + 'lightning': 2, + 'color fading': 3, + 'color breathing': 4, + 'multicolor breathing': 5, + 'color jumping': 6, + 'multicolor jumping': 7, + } + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "SmartBulb" - def send_command(self, command, type='set'): + def send_command(self, command, type='set') -> None: + """Send a command to the device.""" packet = bytearray(16+(int(len(command)/16) + 1)*16) packet[0x00] = 0x0c + len(command) & 0xff packet[0x02] = 0xa5 @@ -1074,7 +1144,8 @@ class lb1(device): if responseLength > 0: self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) - def set_json(self, jsonstr): + def set_json(self, jsonstr) -> str: + """Send a command to the device and return state.""" reconvert = json.loads(jsonstr) if 'bulb_sceneidx' in reconvert.keys(): reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) @@ -1082,18 +1153,21 @@ class lb1(device): self.send_command(json.dumps(reconvert)) return json.dumps(self.state_dict) - def set_state(self, state): + def set_state(self, state) -> None: + """Set the state of the device.""" cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) self.send_command(cmd) - def get_state(self): + def get_state(self) -> dict: + """Return the state of the device.""" cmd = "{}" self.send_command(cmd) return self.state_dict # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid, password, security_mode): +def setup(ssid, password, security_mode) -> None: + """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) payload[0x26] = 0x14 # This seems to always be set to 14 diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py index e00f7d8..8e3aa4a 100644 --- a/broadlink/exceptions.py +++ b/broadlink/exceptions.py @@ -31,110 +31,74 @@ class BroadlinkException(Exception): class FirmwareException(BroadlinkException): """Common base class for all firmware exceptions.""" - pass - class AuthenticationError(FirmwareException): """Authentication error.""" - pass - class AuthorizationError(FirmwareException): """Authorization error.""" - pass - class CommandNotSupportedError(FirmwareException): """Command not supported error.""" - pass - class ConnectionClosedError(FirmwareException): """Connection closed error.""" - pass - class DataValidationError(FirmwareException): """Data validation error.""" - pass - class DeviceOfflineError(FirmwareException): """Device offline error.""" - pass - class ReadError(FirmwareException): """Read error.""" - pass - class SendError(FirmwareException): """Send error.""" - pass - class SSIDNotFoundError(FirmwareException): """SSID not found error.""" - pass - class StorageError(FirmwareException): """Storage error.""" - pass - class WriteError(FirmwareException): """Write error.""" - pass - class SDKException(BroadlinkException): """Common base class for all SDK exceptions.""" - pass - class ChecksumError(SDKException): """Received data packet check error.""" - pass - class LengthError(SDKException): """Received data packet length error.""" - pass - class DNSLookupError(SDKException): """Failed to obtain local IP address.""" - pass - class NetworkTimeoutError(SDKException): """Network timeout error.""" - pass - class UnknownError(BroadlinkException): """Unknown error.""" - pass - BROADLINK_EXCEPTIONS = { # Firmware-related errors are generated by the device.