1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-09-21 04:20:36 +02:00

Improve code quality (#428)

* Fix lint errors

* Remove rm2 class

* Rename cs to conn

* Add __repr__ to device class

* Make get_devices() a dictionary

* Clean up alarm kit

* Add module doscstrings

* Fix MAC address conversion
This commit is contained in:
Felipe Martins Diel 2020-09-24 02:36:12 -03:00 committed by GitHub
parent 28fa72f962
commit 0dc0068d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 150 additions and 154 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
"""The python-broadlink library."""
import socket
from typing import Dict, List, Union, Tuple, Type
from typing import Generator, List, Union, Tuple
from .alarm import S1C
from .climate import hysen
@ -9,14 +9,12 @@ from .cover import dooya
from .device import device, scan
from .exceptions import exception
from .light import lb1
from .remote import rm, rm2, rm4
from .remote import rm, rm4
from .sensor import a1
from .switch import bg1, mp1, sp1, sp2, sp4
def get_devices() -> Dict[int, Tuple[Type[device], str, str]]:
"""Return all supported devices."""
return {
SUPPORTED_TYPES = {
0x0000: (sp1, "SP1", "Broadlink"),
0x2711: (sp2, "SP2", "Broadlink"),
0x2716: (sp2, "NEO PRO", "Ankuoo"),
@ -101,7 +99,7 @@ def gendevice(
) -> device:
"""Generate a device."""
try:
dev_class, model, manufacturer = get_devices()[dev_type]
dev_class, model, manufacturer = SUPPORTED_TYPES[dev_type]
except KeyError:
return device(host, mac, dev_type, name=name, is_locked=is_locked)
@ -185,7 +183,7 @@ def setup(ssid: str, password: str, security_mode: int) -> None:
payload[0x84] = ssid_length # Character length of SSID
payload[0x85] = pass_length # Character length of password
payload[0x86] = security_mode # Type of encryption (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2)
payload[0x86] = security_mode # Type of encryption
checksum = sum(payload, 0xbeaf) & 0xffff
payload[0x20] = checksum & 0xff # Checksum 1 position

View File

@ -1,17 +1,17 @@
"""Support for alarm kits."""
from .device import device
from .exceptions import check_error
S1C_SENSORS_TYPES = {
class S1C(device):
"""Controls a Broadlink S1C."""
_SENSORS_TYPES = {
0x31: 'Door Sensor', # 49 as hex
0x91: 'Key Fob', # 145 as hex, as serial on fob corpse
0x21: 'Motion Sensor' # 33 as hex
}
class S1C(device):
"""Controls a Broadlink S1C."""
def __init__(self, *args, **kwargs) -> None:
"""Initialize the controller."""
device.__init__(self, *args, **kwargs)
@ -27,30 +27,22 @@ class S1C(device):
if not payload:
return None
count = payload[0x4]
sensors = payload[0x6:]
sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)]
sens_res = []
for sens in sensors_a:
status = sens[0]
_name = sens[4:26].decode()
_order = sens[1]
_type = sens[3]
_serial = sens[26:30].hex()
type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown')
r = {
'status': status,
'name': _name.strip('\x00'),
'type': type_str,
'order': _order,
'serial': _serial,
}
if r['serial'] != '00000000':
sens_res.append(r)
result = {
sensor_data = payload[0x6:]
sensors = [
bytearray(sensor_data[i * 83:(i + 1) * 83])
for i in range(len(sensor_data) // 83)
]
return {
'count': count,
'sensors': sens_res
'sensors': [
{
'status': sensor[0],
'name': sensor[4:26].decode().strip('\x00'),
'type': self._SENSORS_TYPES.get(sensor[3], 'Unknown'),
'order': sensor[1],
'serial': sensor[26:30].hex(),
}
for sensor in sensors
if any(sensor[26:30])
]
}
return result

View File

@ -1,3 +1,4 @@
"""Support for climate control."""
from typing import List
from .device import device

View File

@ -1,3 +1,4 @@
"""Support for covers."""
import time
from .device import device

View File

@ -1,3 +1,4 @@
"""Support for Broadlink devices."""
import socket
import threading
import random
@ -20,13 +21,13 @@ def scan(
discover_ip_port: int = 80,
) -> Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
if local_ip_address:
cs.bind((local_ip_address, 0))
port = cs.getsockname()[1]
conn.bind((local_ip_address, 0))
port = conn.getsockname()[1]
else:
local_ip_address = "0.0.0.0"
port = 0
@ -71,13 +72,13 @@ def scan(
packet[0x20] = checksum & 0xff
packet[0x21] = checksum >> 8
cs.sendto(packet, (discover_ip_address, discover_ip_port))
conn.sendto(packet, (discover_ip_address, discover_ip_port))
try:
while (time.time() - starttime) < timeout:
cs.settimeout(timeout - (time.time() - starttime))
conn.settimeout(timeout - (time.time() - starttime))
try:
response, host = cs.recvfrom(1024)
response, host = conn.recvfrom(1024)
except socket.timeout:
break
@ -87,7 +88,7 @@ def scan(
is_locked = bool(response[-1])
yield devtype, host, mac, name, is_locked
finally:
cs.close()
conn.close()
class device:
@ -106,7 +107,7 @@ class device:
) -> None:
"""Initialize the controller."""
self.host = host
self.mac = mac.encode() if isinstance(mac, str) else mac
self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac
self.devtype = devtype if devtype is not None else 0x272a
self.timeout = timeout
self.name = name
@ -114,17 +115,28 @@ class device:
self.manufacturer = manufacturer
self.is_locked = is_locked
self.count = random.randrange(0xffff)
self.iv = bytes(
[0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58])
self.iv = bytes.fromhex('562e17996d093d28ddb3ba695a2e6f58')
self.id = bytes(4)
self.type = "Unknown"
self.lock = threading.Lock()
self.aes = None
key = bytes(
[0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02])
key = bytes.fromhex('097628343fe99e23765c1513accf8b02')
self.update_aes(key)
def __repr__(self):
return "<%s: %s %s (%s) at %s:%s | %s | %s | %s>" % (
type(self).__name__,
self.manufacturer,
self.model,
hex(self.devtype),
self.host[0],
self.host[1],
':'.join(format(x, '02x') for x in self.mac),
self.name,
"Locked" if self.is_locked else "Unlocked",
)
def update_aes(self, key: bytes) -> None:
"""Update AES."""
self.aes = Cipher(
@ -284,20 +296,20 @@ class device:
start_time = time.time()
with self.lock:
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:
try:
cs.sendto(packet, self.host)
cs.settimeout(1)
resp, _ = cs.recvfrom(2048)
conn.sendto(packet, self.host)
conn.settimeout(1)
resp, _ = conn.recvfrom(2048)
break
except socket.timeout:
if (time.time() - start_time) > self.timeout:
cs.close()
conn.close()
raise exception(-4000) # Network timeout.
cs.close()
conn.close()
if len(resp) < 0x30:
raise exception(-4007) # Length error.

View File

@ -1,3 +1,4 @@
"""Support for lights."""
import json
from typing import Union

View File

@ -1,3 +1,4 @@
"""Support for universal remotes."""
from .device import device
from .exceptions import check_error
@ -119,15 +120,3 @@ class rm4(rm):
'temperature': data[0x0] + data[0x1] / 100.0,
'humidity': data[0x2] + data[0x3] / 100.0
}
# For legacy compatibility - don't use this
class rm2(rm):
def __init__(self):
device.__init__(self, None, None, None)
def discover(self):
dev = discover()
self.host = dev.host
self.mac = dev.mac

View File

@ -1,3 +1,4 @@
"""Support for sensors."""
from .device import device
from .exceptions import check_error

View File

@ -1,3 +1,4 @@
"""Support for switches."""
import json
import struct