1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-21 22:51:41 +01:00

Make better use of namespaces (#564)

Use namespaces for typing and exceptions.
This commit is contained in:
Felipe Martins Diel 2021-03-31 14:27:05 -03:00 committed by Felipe Martins Diel
parent 2198400ad6
commit 1a8ee21a34
9 changed files with 54 additions and 54 deletions

View File

@ -1,13 +1,13 @@
#!/usr/bin/env python3
"""The python-broadlink library."""
import socket
from typing import Generator, List, Tuple, Union
import typing as t
from . import exceptions as e
from .alarm import S1C
from .climate import hysen
from .cover import dooya
from .device import device, ping, scan
from .exceptions import exception
from .light import lb1, lb27r1
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
@ -115,8 +115,8 @@ SUPPORTED_TYPES = {
def gendevice(
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
name: str = None,
is_locked: bool = None,
) -> device:
@ -151,7 +151,7 @@ def hello(
try:
return next(xdiscover(timeout, local_ip_address, host, port))
except StopIteration:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.
def discover(
@ -159,7 +159,7 @@ def discover(
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> List[device]:
) -> t.List[device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
return [gendevice(*resp) for resp in responses]
@ -170,7 +170,7 @@ def xdiscover(
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[device, None, None]:
) -> t.Generator[device, None, None]:
"""Discover devices connected to the local network.
This function returns a generator that yields devices instantly.

View File

@ -1,6 +1,6 @@
"""Support for alarm kits."""
from . import exceptions as e
from .device import device
from .exceptions import check_error
class S1C(device):
@ -19,7 +19,7 @@ class S1C(device):
packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if not payload:
return None

View File

@ -1,8 +1,8 @@
"""Support for climate control."""
from typing import List
from . import exceptions as e
from .device import device
from .exceptions import check_error
from .helpers import calculate_crc16
@ -31,7 +31,7 @@ class hysen(device):
# send to device
response = self.send_packet(0x6A, request_payload)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
response_payload = self.decrypt(response[0x38:])
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)

View File

@ -1,8 +1,8 @@
"""Support for covers."""
import time
from . import exceptions as e
from .device import device
from .exceptions import check_error
class dooya(device):
@ -20,7 +20,7 @@ class dooya(device):
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[4]

View File

@ -3,15 +3,15 @@ import socket
import threading
import random
import time
from typing import Generator, Tuple, Union
import typing as t
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .exceptions import check_error, exception
from . import exceptions as e
from .protocol import Datetime
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
def scan(
@ -19,7 +19,7 @@ def scan(
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[HelloResponse, None, None]:
) -> t.Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -94,8 +94,8 @@ class device:
def __init__(
self,
host: Tuple[str, int],
mac: Union[bytes, str],
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
devtype: int,
timeout: int = 10,
name: str = "",
@ -184,7 +184,7 @@ class device:
payload[0x30:0x36] = "Test 1".encode()
response = self.send_packet(0x65, payload)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
key = payload[0x04:0x14]
@ -209,10 +209,10 @@ class device:
try:
devtype, host, mac, name, is_locked = next(responses)
except StopIteration:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.
if (devtype, host, mac) != (self.devtype, self.host, self.mac):
raise exception(-2040) # Device information is not intact.
raise e.exception(-2040) # Device information is not intact.
self.name = name
self.is_locked = is_locked
@ -231,7 +231,7 @@ class device:
"""Get firmware version."""
packet = bytearray([0x68])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x4] | payload[0x5] << 8
@ -242,7 +242,7 @@ class device:
packet += bytearray(0x50 - len(packet))
packet[0x43] = self.is_locked
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.name = name
def set_lock(self, state: bool) -> None:
@ -252,7 +252,7 @@ class device:
packet += bytearray(0x50 - len(packet))
packet[0x43] = bool(state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.is_locked = bool(state)
def get_type(self) -> str:
@ -294,13 +294,13 @@ class device:
break
except socket.timeout:
if (time.time() - start_time) > timeout:
raise exception(-4000) # Network timeout.
raise e.exception(-4000) # Network timeout.
if len(resp) < 0x30:
raise exception(-4007) # Length error.
raise e.exception(-4007) # Length error.
checksum = int.from_bytes(resp[0x20:0x22], "little")
if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum:
raise exception(-4008) # Checksum error.
raise e.exception(-4008) # Checksum error.
return resp

View File

@ -4,8 +4,8 @@ import json
import struct
import typing as t
from . import exceptions as e
from .device import device
from .exceptions import check_error
class lb1(device):
@ -27,7 +27,7 @@ class lb1(device):
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
@ -80,7 +80,7 @@ class lb1(device):
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, obj: t.Any) -> bytes:
@ -124,7 +124,7 @@ class lb27r1(device):
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
@ -174,7 +174,7 @@ class lb27r1(device):
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, obj: t.Any) -> bytes:

View File

@ -1,8 +1,8 @@
"""Support for universal remotes."""
import struct
from . import exceptions as e
from .device import device
from .exceptions import check_error
class rmmini(device):
@ -14,7 +14,7 @@ class rmmini(device):
"""Send a packet to the device."""
packet = struct.pack("<I", command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[0x4:]
@ -79,7 +79,7 @@ class rmminib(rmmini):
"""Send a packet to the device."""
packet = struct.pack("<HI", len(data) + 4, command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6:p_len+2]

View File

@ -1,8 +1,8 @@
"""Support for sensors."""
import struct
from . import exceptions as e
from .device import device
from .exceptions import check_error
class a1(device):
@ -30,7 +30,7 @@ class a1(device):
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]

View File

@ -2,8 +2,8 @@
import json
import struct
from . import exceptions as e
from .device import device
from .exceptions import check_error
class mp1(device):
@ -27,7 +27,7 @@ class mp1(device):
packet[0x0E] = sid_mask if state else 0
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
def set_power(self, sid: int, state: bool) -> None:
"""Set the power state of the device."""
@ -47,7 +47,7 @@ class mp1(device):
packet[0x08] = 0x01
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x0E]
@ -76,7 +76,7 @@ class bg1(device):
"""
packet = self._encode(1, b"{}")
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
@ -108,7 +108,7 @@ class bg1(device):
js = json.dumps(data).encode("utf8")
packet = self._encode(2, js)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, js: str) -> bytes:
@ -152,7 +152,7 @@ class sp1(device):
packet = bytearray(4)
packet[0] = state
response = self.send_packet(0x66, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
class sp2(device):
@ -166,14 +166,14 @@ class sp2(device):
packet[0] = 2
packet[4] = int(bool(state))
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
def check_power(self) -> bool:
"""Return the power state of the device."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4])
@ -188,7 +188,7 @@ class sp2s(sp2):
packet = bytearray(16)
packet[0] = 4
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return int.from_bytes(payload[0x4:0x7], "little") / 1000
@ -207,7 +207,7 @@ class sp3(device):
else:
packet[4] = 1 if state else 0
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
def set_nightlight(self, state: bool) -> None:
"""Set the night light state of the device."""
@ -218,14 +218,14 @@ class sp3(device):
else:
packet[4] = 2 if state else 0
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
def check_power(self) -> bool:
"""Return the power state of the device."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
@ -234,7 +234,7 @@ class sp3(device):
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
@ -248,7 +248,7 @@ class sp3s(sp2):
"""Return the power consumption in W."""
packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
energy = payload[0x7:0x4:-1].hex()
return int(energy) / 100
@ -326,7 +326,7 @@ class sp4(device):
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
@ -375,7 +375,7 @@ class sp4b(sp4):
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])