From 1a8ee21a34fe62c5970757c00769d5b471e59a55 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 31 Mar 2021 14:27:05 -0300 Subject: [PATCH] Make better use of namespaces (#564) Use namespaces for typing and exceptions. --- broadlink/__init__.py | 14 +++++++------- broadlink/alarm.py | 4 ++-- broadlink/climate.py | 4 ++-- broadlink/cover.py | 4 ++-- broadlink/device.py | 30 +++++++++++++++--------------- broadlink/light.py | 10 +++++----- broadlink/remote.py | 6 +++--- broadlink/sensor.py | 4 ++-- broadlink/switch.py | 32 ++++++++++++++++---------------- 9 files changed, 54 insertions(+), 54 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 26392e9..cae7b11 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -from typing import Generator, List, Tuple, Union +import typing as t +from . import exceptions as e from .alarm import S1C from .climate import hysen from .cover import dooya from .device import device, ping, scan -from .exceptions import exception from .light import lb1, lb27r1 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .sensor import a1 @@ -115,8 +115,8 @@ SUPPORTED_TYPES = { def gendevice( dev_type: int, - host: Tuple[str, int], - mac: Union[bytes, str], + host: t.Tuple[str, int], + mac: t.Union[bytes, str], name: str = None, is_locked: bool = None, ) -> device: @@ -151,7 +151,7 @@ def hello( try: return next(xdiscover(timeout, local_ip_address, host, port)) except StopIteration: - raise exception(-4000) # Network timeout. + raise e.exception(-4000) # Network timeout. def discover( @@ -159,7 +159,7 @@ def discover( local_ip_address: str = None, discover_ip_address: str = "255.255.255.255", discover_ip_port: int = 80, -) -> List[device]: +) -> t.List[device]: """Discover devices connected to the local network.""" responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) return [gendevice(*resp) for resp in responses] @@ -170,7 +170,7 @@ def xdiscover( local_ip_address: str = None, discover_ip_address: str = "255.255.255.255", discover_ip_port: int = 80, -) -> Generator[device, None, None]: +) -> t.Generator[device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. diff --git a/broadlink/alarm.py b/broadlink/alarm.py index f49b2cf..8030101 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -1,6 +1,6 @@ """Support for alarm kits.""" +from . import exceptions as e from .device import device -from .exceptions import check_error class S1C(device): @@ -19,7 +19,7 @@ class S1C(device): packet = bytearray(16) packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if not payload: return None diff --git a/broadlink/climate.py b/broadlink/climate.py index b510db9..9f64949 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,8 +1,8 @@ """Support for climate control.""" from typing import List +from . import exceptions as e from .device import device -from .exceptions import check_error from .helpers import calculate_crc16 @@ -31,7 +31,7 @@ class hysen(device): # send to device response = self.send_packet(0x6A, request_payload) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) response_payload = self.decrypt(response[0x38:]) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) diff --git a/broadlink/cover.py b/broadlink/cover.py index b2dc84a..2d0cf67 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,8 +1,8 @@ """Support for covers.""" import time +from . import exceptions as e from .device import device -from .exceptions import check_error class dooya(device): @@ -20,7 +20,7 @@ class dooya(device): packet[9] = 0xFA packet[10] = 0x44 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[4] diff --git a/broadlink/device.py b/broadlink/device.py index d67ac8a..6cdec0e 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,15 +3,15 @@ import socket import threading import random import time -from typing import Generator, Tuple, Union +import typing as t from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from .exceptions import check_error, exception +from . import exceptions as e from .protocol import Datetime -HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] +HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] def scan( @@ -19,7 +19,7 @@ def scan( local_ip_address: str = None, discover_ip_address: str = "255.255.255.255", discover_ip_port: int = 80, -) -> Generator[HelloResponse, None, None]: +) -> t.Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -94,8 +94,8 @@ class device: def __init__( self, - host: Tuple[str, int], - mac: Union[bytes, str], + host: t.Tuple[str, int], + mac: t.Union[bytes, str], devtype: int, timeout: int = 10, name: str = "", @@ -184,7 +184,7 @@ class device: payload[0x30:0x36] = "Test 1".encode() response = self.send_packet(0x65, payload) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) key = payload[0x04:0x14] @@ -209,10 +209,10 @@ class device: try: devtype, host, mac, name, is_locked = next(responses) except StopIteration: - raise exception(-4000) # Network timeout. + raise e.exception(-4000) # Network timeout. if (devtype, host, mac) != (self.devtype, self.host, self.mac): - raise exception(-2040) # Device information is not intact. + raise e.exception(-2040) # Device information is not intact. self.name = name self.is_locked = is_locked @@ -231,7 +231,7 @@ class device: """Get firmware version.""" packet = bytearray([0x68]) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 @@ -242,7 +242,7 @@ class device: packet += bytearray(0x50 - len(packet)) packet[0x43] = self.is_locked response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) self.name = name def set_lock(self, state: bool) -> None: @@ -252,7 +252,7 @@ class device: packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(state) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) self.is_locked = bool(state) def get_type(self) -> str: @@ -294,13 +294,13 @@ class device: break except socket.timeout: if (time.time() - start_time) > timeout: - raise exception(-4000) # Network timeout. + raise e.exception(-4000) # Network timeout. if len(resp) < 0x30: - raise exception(-4007) # Length error. + raise e.exception(-4007) # Length error. checksum = int.from_bytes(resp[0x20:0x22], "little") if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: - raise exception(-4008) # Checksum error. + raise e.exception(-4008) # Checksum error. return resp diff --git a/broadlink/light.py b/broadlink/light.py index afa60a4..68f0233 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -4,8 +4,8 @@ import json import struct import typing as t +from . import exceptions as e from .device import device -from .exceptions import check_error class lb1(device): @@ -27,7 +27,7 @@ class lb1(device): """ packet = self._encode(1, {}) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def set_state( @@ -80,7 +80,7 @@ class lb1(device): packet = self._encode(2, state) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def _encode(self, flag: int, obj: t.Any) -> bytes: @@ -124,7 +124,7 @@ class lb27r1(device): """ packet = self._encode(1, {}) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def set_state( @@ -174,7 +174,7 @@ class lb27r1(device): packet = self._encode(2, state) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def _encode(self, flag: int, obj: t.Any) -> bytes: diff --git a/broadlink/remote.py b/broadlink/remote.py index 1227dae..931727e 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,8 +1,8 @@ """Support for universal remotes.""" import struct +from . import exceptions as e from .device import device -from .exceptions import check_error class rmmini(device): @@ -14,7 +14,7 @@ class rmmini(device): """Send a packet to the device.""" packet = struct.pack(" None: """Set the power state of the device.""" @@ -47,7 +47,7 @@ class mp1(device): packet[0x08] = 0x01 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x0E] @@ -76,7 +76,7 @@ class bg1(device): """ packet = self._encode(1, b"{}") response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def set_state( @@ -108,7 +108,7 @@ class bg1(device): js = json.dumps(data).encode("utf8") packet = self._encode(2, js) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) return self._decode(response) def _encode(self, flag: int, js: str) -> bytes: @@ -152,7 +152,7 @@ class sp1(device): packet = bytearray(4) packet[0] = state response = self.send_packet(0x66, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) class sp2(device): @@ -166,14 +166,14 @@ class sp2(device): packet[0] = 2 packet[4] = int(bool(state)) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4]) @@ -188,7 +188,7 @@ class sp2s(sp2): packet = bytearray(16) packet[0] = 4 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return int.from_bytes(payload[0x4:0x7], "little") / 1000 @@ -207,7 +207,7 @@ class sp3(device): else: packet[4] = 1 if state else 0 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) def set_nightlight(self, state: bool) -> None: """Set the night light state of the device.""" @@ -218,14 +218,14 @@ class sp3(device): else: packet[4] = 2 if state else 0 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) @@ -234,7 +234,7 @@ class sp3(device): packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) @@ -248,7 +248,7 @@ class sp3s(sp2): """Return the power consumption in W.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) energy = payload[0x7:0x4:-1].hex() return int(energy) / 100 @@ -326,7 +326,7 @@ class sp4(device): def _decode(self, response: bytes) -> dict: """Decode a message.""" - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: """Decode a message.""" - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from("