From 76012c6cd44c30cc81b950b6f5f26a5b86291505 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Wed, 16 Sep 2020 21:35:09 -0300 Subject: [PATCH] Add annotations for parameters --- broadlink/__init__.py | 153 ++++++++++++++++++++---------------------- broadlink/helpers.py | 38 +++++++++++ 2 files changed, 110 insertions(+), 81 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index a31895e..03e45fc 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -8,12 +8,13 @@ import struct import threading import time from datetime import datetime +from typing import List, Union, Tuple from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from .exceptions import check_error, exception -from .helpers import get_local_ip +from .helpers import calculate_crc16, get_local_ip def get_devices() -> dict: @@ -83,7 +84,13 @@ def get_devices() -> dict: } -def gendevice(dev_type, host, mac, name=None, is_locked=None): +def gendevice( + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = None, + is_locked: bool = None, +): """Generate a device.""" try: dev_class, model, manufacturer = get_devices()[dev_type] @@ -103,10 +110,10 @@ def gendevice(dev_type, host, mac, name=None, is_locked=None): def discover( - timeout=None, - local_ip_address=None, - discover_ip_address='255.255.255.255', - discover_ip_port=80 + timeout: int = None, + local_ip_address: str = None, + discover_ip_address: str = '255.255.255.255', + discover_ip_port: int = 80, ) -> list: """Discover devices connected to the local network.""" local_ip_address = local_ip_address or get_local_ip() @@ -194,14 +201,14 @@ class device: def __init__( self, - host, - mac, - devtype, - timeout=10, - name=None, - model=None, - manufacturer=None, - is_locked=None + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, ) -> None: """Initialize the controller.""" self.host = host @@ -224,18 +231,18 @@ class device: [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) self.update_aes(key) - def update_aes(self, key) -> None: + def update_aes(self, key: bytes) -> None: """Update AES.""" self.aes = Cipher( algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() ) - def encrypt(self, payload) -> bytes: + def encrypt(self, payload: bytes) -> bytes: """Encrypt the payload.""" encryptor = self.aes.encryptor() return encryptor.update(payload) + encryptor.finalize() - def decrypt(self, payload) -> bytes: + def decrypt(self, payload: bytes) -> bytes: """Decrypt the payload.""" decryptor = self.aes.decryptor() return decryptor.update(payload) + decryptor.finalize() @@ -288,7 +295,7 @@ class device: payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 - def set_name(self, name) -> None: + def set_name(self, name: str) -> None: """Set device name.""" packet = bytearray(4) packet += name.encode('utf-8') @@ -298,7 +305,7 @@ class device: check_error(response[0x22:0x24]) self.name = name - def set_lock(self, state) -> None: + def set_lock(self, state: bool) -> None: """Lock/unlock the device.""" packet = bytearray(4) packet += self.name.encode('utf-8') @@ -312,7 +319,7 @@ class device: """Return device type.""" return self.type - def send_packet(self, command, payload) -> bytearray: + def send_packet(self, command: int, payload: bytearray) -> bytearray: """Send a packet to the device.""" self.count = (self.count + 1) & 0xffff packet = bytearray(0x38) @@ -392,7 +399,7 @@ class mp1(device): device.__init__(self, *args, **kwargs) self.type = "MP1" - def set_power_mask(self, sid_mask, state) -> None: + def set_power_mask(self, sid_mask: int, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0x00] = 0x0d @@ -410,7 +417,7 @@ class mp1(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def set_power(self, sid, state) -> None: + def set_power(self, sid: int, state: bool) -> None: """Set the power state of the device.""" sid_mask = 0x01 << (sid - 1) self.set_power_mask(sid_mask, state) @@ -463,7 +470,16 @@ class bg1(device): check_error(response[0x22:0x24]) return self._decode(response) - def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None) -> dict: + def set_state( + self, + pwr: bool = None, + pwr1: bool = None, + pwr2: bool = None, + maxworktime: int = None, + maxworktime1: int = None, + maxworktime2: int = None, + idcbrightness: int = None, + ) -> dict: """Set the power state of the device.""" data = {} if pwr is not None: @@ -486,7 +502,7 @@ class bg1(device): check_error(response[0x22:0x24]) return self._decode(response) - def _encode(self, flag, js) -> bytearray: + def _encode(self, flag: int, js: str) -> bytearray: """Encode a message.""" # The packet format is: # 0x00-0x01 length @@ -507,7 +523,7 @@ class bg1(device): packet[0x07] = checksum >> 8 return packet - def _decode(self, response) -> dict: + def _decode(self, response: bytes) -> dict: """Decode a message.""" payload = self.decrypt(bytes(response[0x38:])) js_len = struct.unpack_from(' None: + def set_power(self, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(4) packet[0] = state @@ -539,7 +555,7 @@ class sp2(device): device.__init__(self, *args, **kwargs) self.type = "SP2" - def set_power(self, state) -> None: + def set_power(self, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0] = 2 @@ -550,7 +566,7 @@ class sp2(device): response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) - def set_nightlight(self, state) -> None: + def set_nightlight(self, state: bool) -> None: """Set the night light state of the device.""" packet = bytearray(16) packet[0] = 2 @@ -656,7 +672,7 @@ class rm(device): payload = self.decrypt(bytes(response[0x38:])) return payload[len(self._request_header) + 4:] - def send_data(self, data) -> None: + def send_data(self, data: bytes) -> None: """Send a code to the device.""" packet = bytearray(self._code_sending_header) packet += bytearray([0x02, 0x00, 0x00, 0x00]) @@ -707,7 +723,7 @@ class rm(device): return True return False - def _check_sensors(self, command) -> bytes: + def _check_sensors(self, command: int) -> bytes: """Return the state of the sensors in raw format.""" packet = bytearray(self._request_header) packet.append(command) @@ -781,44 +797,9 @@ class hysen(device): # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails # The function prepends length (2 bytes) and appends CRC - def calculate_crc16(self, input_data) -> int: - """Calculate CRC-16.""" - from ctypes import c_ushort - crc16_tab = [] - crc16_constant = 0xA001 - - for i in range(0, 256): - crc = c_ushort(i).value - for j in range(0, 8): - if (crc & 0x0001): - crc = c_ushort(crc >> 1).value ^ crc16_constant - else: - crc = c_ushort(crc >> 1).value - crc16_tab.append(hex(crc)) - - try: - is_string = isinstance(input_data, str) - is_bytes = isinstance(input_data, bytes) - - if not is_string and not is_bytes: - raise Exception("Please provide a string or a byte sequence " - "as argument for calculation.") - - crcValue = 0xffff - - for c in input_data: - d = ord(c) if is_string else c - tmp = crcValue ^ d - rotated = c_ushort(crcValue >> 8).value - crcValue = rotated ^ int(crc16_tab[(tmp & 0x00ff)], 0) - - return crcValue - except Exception as e: - print("EXCEPTION(calculate): {}".format(e)) - - def send_request(self, input_payload) -> bytes: + def send_request(self, input_payload: bytes) -> bytes: """Send a request to the device.""" - crc = self.calculate_crc16(bytes(input_payload)) + crc = calculate_crc16(bytes(input_payload)) # first byte is length, +2 for CRC16 request_payload = bytearray([len(input_payload) + 2, 0x00]) @@ -837,7 +818,7 @@ class hysen(device): response_payload_len = response_payload[0] if response_payload_len + 2 > len(response_payload): raise ValueError('hysen_response_error', 'first byte of response is not length') - crc = self.calculate_crc16(bytes(response_payload[2:response_payload_len])) + crc = calculate_crc16(bytes(response_payload[2:response_payload_len])) if (response_payload[response_payload_len] == crc & 0xFF) and ( response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): return response_payload[2:response_payload_len] @@ -858,7 +839,6 @@ class hysen(device): Timer schedule included. """ - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) data = {} data['remote_lock'] = payload[3] & 1 @@ -908,7 +888,7 @@ class hysen(device): # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule # The sensor command is currently experimental - def set_mode(self, auto_mode, loop_mode, sensor=0) -> None: + def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) @@ -924,7 +904,18 @@ class hysen(device): # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, # 1 for anti-freezing function open. Factory default: 0 # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 - def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron) -> None: + def set_advanced( + self, + loop_mode: int, + sensor: int, + osv: int, + dif: int, + svh: int, + svl: int, + adj: float, + fre: int, + poweron: int, + ) -> None: """Set advanced options.""" input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) @@ -941,19 +932,19 @@ class hysen(device): self.set_mode(auto_mode=0, loop_mode=0) # Set temperature for manual mode (also activates manual mode if currently in automatic) - def set_temp(self, temp) -> None: + def set_temp(self, temp: float) -> None: """Set the target temperature.""" self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power=1, remote_lock=0) -> None: + def set_power(self, power: int = 1, remote_lock: int = 0) -> None: """Set the power state of the device.""" self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday - def set_time(self, hour, minute, second, day) -> None: + def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) @@ -963,7 +954,7 @@ class hysen(device): # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday, weekend) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" # Begin with some magic values ... input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) @@ -1052,7 +1043,7 @@ class dooya(device): device.__init__(self, *args, **kwargs) self.type = "Dooya DT360E" - def _send(self, magic1, magic2) -> int: + def _send(self, magic1: int, magic2: int) -> int: """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 @@ -1082,7 +1073,7 @@ class dooya(device): """Return the position of the curtain.""" return self._send(0x06, 0x5d) - def set_percentage_and_wait(self, new_percentage) -> None: + def set_percentage_and_wait(self, new_percentage: int) -> None: """Set the position of the curtain.""" current = self.get_percentage() if current > new_percentage: @@ -1119,7 +1110,7 @@ class lb1(device): device.__init__(self, *args, **kwargs) self.type = "SmartBulb" - def send_command(self, command, type='set') -> None: + def send_command(self, command: str, type: str = 'set') -> None: """Send a command to the device.""" packet = bytearray(16+(int(len(command)/16) + 1)*16) packet[0x00] = 0x0c + len(command) & 0xff @@ -1144,7 +1135,7 @@ class lb1(device): if responseLength > 0: self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) - def set_json(self, jsonstr) -> str: + def set_json(self, jsonstr: str) -> str: """Send a command to the device and return state.""" reconvert = json.loads(jsonstr) if 'bulb_sceneidx' in reconvert.keys(): @@ -1153,7 +1144,7 @@ class lb1(device): self.send_command(json.dumps(reconvert)) return json.dumps(self.state_dict) - def set_state(self, state) -> None: + def set_state(self, state: Union[str, int]) -> None: """Set the state of the device.""" cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) self.send_command(cmd) @@ -1166,7 +1157,7 @@ class lb1(device): # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid, password, security_mode) -> None: +def setup(ssid: str, password: str, security_mode: int) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 40f06e3..22ac12f 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,4 +1,5 @@ """Helper functions.""" +from ctypes import c_ushort import socket from .exceptions import exception @@ -18,3 +19,40 @@ def get_local_ip() -> str: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(('8.8.8.8', 53)) return s.getsockname()[0] + + +def calculate_crc16(input_data) -> int: + """Calculate CRC-16.""" + crc16_tab = [] + crc16_constant = 0xA001 + + for i in range(0, 256): + crc = c_ushort(i).value + for j in range(0, 8): + if crc & 0x0001: + crc = c_ushort(crc >> 1).value ^ crc16_constant + else: + crc = c_ushort(crc >> 1).value + crc16_tab.append(hex(crc)) + + try: + is_string = isinstance(input_data, str) + is_bytes = isinstance(input_data, bytes) + + if not is_string and not is_bytes: + raise Exception( + "Please provide a string or a byte sequence " + "as argument for calculation." + ) + + crcValue = 0xFFFF + + for c in input_data: + d = ord(c) if is_string else c + tmp = crcValue ^ d + rotated = c_ushort(crcValue >> 8).value + crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0) + + return crcValue + except Exception as e: + print("EXCEPTION(calculate): {}".format(e))