1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-10 18:00:12 +01:00

Add annotations for parameters

This commit is contained in:
Felipe Martins Diel 2020-09-16 21:35:09 -03:00 committed by Matthew Garrett
parent 8bf107ab69
commit 76012c6cd4
2 changed files with 110 additions and 81 deletions

View File

@ -8,12 +8,13 @@ import struct
import threading
import time
from datetime import datetime
from typing import List, Union, Tuple
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .exceptions import check_error, exception
from .helpers import get_local_ip
from .helpers import calculate_crc16, get_local_ip
def get_devices() -> dict:
@ -83,7 +84,13 @@ def get_devices() -> dict:
}
def gendevice(dev_type, host, mac, name=None, is_locked=None):
def gendevice(
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = None,
is_locked: bool = None,
):
"""Generate a device."""
try:
dev_class, model, manufacturer = get_devices()[dev_type]
@ -103,10 +110,10 @@ def gendevice(dev_type, host, mac, name=None, is_locked=None):
def discover(
timeout=None,
local_ip_address=None,
discover_ip_address='255.255.255.255',
discover_ip_port=80
timeout: int = None,
local_ip_address: str = None,
discover_ip_address: str = '255.255.255.255',
discover_ip_port: int = 80,
) -> list:
"""Discover devices connected to the local network."""
local_ip_address = local_ip_address or get_local_ip()
@ -194,14 +201,14 @@ class device:
def __init__(
self,
host,
mac,
devtype,
timeout=10,
name=None,
model=None,
manufacturer=None,
is_locked=None
host: Tuple[str, int],
mac: Union[bytes, str],
devtype: int,
timeout: int = 10,
name: str = None,
model: str = None,
manufacturer: str = None,
is_locked: bool = None,
) -> None:
"""Initialize the controller."""
self.host = host
@ -224,18 +231,18 @@ class device:
[0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02])
self.update_aes(key)
def update_aes(self, key) -> None:
def update_aes(self, key: bytes) -> None:
"""Update AES."""
self.aes = Cipher(
algorithms.AES(key), modes.CBC(self.iv), backend=default_backend()
)
def encrypt(self, payload) -> bytes:
def encrypt(self, payload: bytes) -> bytes:
"""Encrypt the payload."""
encryptor = self.aes.encryptor()
return encryptor.update(payload) + encryptor.finalize()
def decrypt(self, payload) -> bytes:
def decrypt(self, payload: bytes) -> bytes:
"""Decrypt the payload."""
decryptor = self.aes.decryptor()
return decryptor.update(payload) + decryptor.finalize()
@ -288,7 +295,7 @@ class device:
payload = self.decrypt(response[0x38:])
return payload[0x4] | payload[0x5] << 8
def set_name(self, name) -> None:
def set_name(self, name: str) -> None:
"""Set device name."""
packet = bytearray(4)
packet += name.encode('utf-8')
@ -298,7 +305,7 @@ class device:
check_error(response[0x22:0x24])
self.name = name
def set_lock(self, state) -> None:
def set_lock(self, state: bool) -> None:
"""Lock/unlock the device."""
packet = bytearray(4)
packet += self.name.encode('utf-8')
@ -312,7 +319,7 @@ class device:
"""Return device type."""
return self.type
def send_packet(self, command, payload) -> bytearray:
def send_packet(self, command: int, payload: bytearray) -> bytearray:
"""Send a packet to the device."""
self.count = (self.count + 1) & 0xffff
packet = bytearray(0x38)
@ -392,7 +399,7 @@ class mp1(device):
device.__init__(self, *args, **kwargs)
self.type = "MP1"
def set_power_mask(self, sid_mask, state) -> None:
def set_power_mask(self, sid_mask: int, state: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0d
@ -410,7 +417,7 @@ class mp1(device):
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def set_power(self, sid, state) -> None:
def set_power(self, sid: int, state: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, state)
@ -463,7 +470,16 @@ class bg1(device):
check_error(response[0x22:0x24])
return self._decode(response)
def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None) -> dict:
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
data = {}
if pwr is not None:
@ -486,7 +502,7 @@ class bg1(device):
check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag, js) -> bytearray:
def _encode(self, flag: int, js: str) -> bytearray:
"""Encode a message."""
# The packet format is:
# 0x00-0x01 length
@ -507,7 +523,7 @@ class bg1(device):
packet[0x07] = checksum >> 8
return packet
def _decode(self, response) -> dict:
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(bytes(response[0x38:]))
js_len = struct.unpack_from('<I', payload, 0x0a)[0]
@ -523,7 +539,7 @@ class sp1(device):
device.__init__(self, *args, **kwargs)
self.type = "SP1"
def set_power(self, state) -> None:
def set_power(self, state: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(4)
packet[0] = state
@ -539,7 +555,7 @@ class sp2(device):
device.__init__(self, *args, **kwargs)
self.type = "SP2"
def set_power(self, state) -> None:
def set_power(self, state: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0] = 2
@ -550,7 +566,7 @@ class sp2(device):
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def set_nightlight(self, state) -> None:
def set_nightlight(self, state: bool) -> None:
"""Set the night light state of the device."""
packet = bytearray(16)
packet[0] = 2
@ -656,7 +672,7 @@ class rm(device):
payload = self.decrypt(bytes(response[0x38:]))
return payload[len(self._request_header) + 4:]
def send_data(self, data) -> None:
def send_data(self, data: bytes) -> None:
"""Send a code to the device."""
packet = bytearray(self._code_sending_header)
packet += bytearray([0x02, 0x00, 0x00, 0x00])
@ -707,7 +723,7 @@ class rm(device):
return True
return False
def _check_sensors(self, command) -> bytes:
def _check_sensors(self, command: int) -> bytes:
"""Return the state of the sensors in raw format."""
packet = bytearray(self._request_header)
packet.append(command)
@ -781,44 +797,9 @@ class hysen(device):
# New behaviour: raises a ValueError if the device response indicates an error or CRC check fails
# The function prepends length (2 bytes) and appends CRC
def calculate_crc16(self, input_data) -> int:
"""Calculate CRC-16."""
from ctypes import c_ushort
crc16_tab = []
crc16_constant = 0xA001
for i in range(0, 256):
crc = c_ushort(i).value
for j in range(0, 8):
if (crc & 0x0001):
crc = c_ushort(crc >> 1).value ^ crc16_constant
else:
crc = c_ushort(crc >> 1).value
crc16_tab.append(hex(crc))
try:
is_string = isinstance(input_data, str)
is_bytes = isinstance(input_data, bytes)
if not is_string and not is_bytes:
raise Exception("Please provide a string or a byte sequence "
"as argument for calculation.")
crcValue = 0xffff
for c in input_data:
d = ord(c) if is_string else c
tmp = crcValue ^ d
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00ff)], 0)
return crcValue
except Exception as e:
print("EXCEPTION(calculate): {}".format(e))
def send_request(self, input_payload) -> bytes:
def send_request(self, input_payload: bytes) -> bytes:
"""Send a request to the device."""
crc = self.calculate_crc16(bytes(input_payload))
crc = calculate_crc16(bytes(input_payload))
# first byte is length, +2 for CRC16
request_payload = bytearray([len(input_payload) + 2, 0x00])
@ -837,7 +818,7 @@ class hysen(device):
response_payload_len = response_payload[0]
if response_payload_len + 2 > len(response_payload):
raise ValueError('hysen_response_error', 'first byte of response is not length')
crc = self.calculate_crc16(bytes(response_payload[2:response_payload_len]))
crc = calculate_crc16(bytes(response_payload[2:response_payload_len]))
if (response_payload[response_payload_len] == crc & 0xFF) and (
response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF):
return response_payload[2:response_payload_len]
@ -858,7 +839,6 @@ class hysen(device):
Timer schedule included.
"""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]))
data = {}
data['remote_lock'] = payload[3] & 1
@ -908,7 +888,7 @@ class hysen(device):
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule
# loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule
# The sensor command is currently experimental
def set_mode(self, auto_mode, loop_mode, sensor=0) -> None:
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]))
@ -924,7 +904,18 @@ class hysen(device):
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
# 1 for anti-freezing function open. Factory default: 0
# Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0
def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron) -> None:
def set_advanced(
self,
loop_mode: int,
sensor: int,
osv: int,
dif: int,
svh: int,
svl: int,
adj: float,
fre: int,
poweron: int,
) -> None:
"""Set advanced options."""
input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl,
(int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron])
@ -941,19 +932,19 @@ class hysen(device):
self.set_mode(auto_mode=0, loop_mode=0)
# Set temperature for manual mode (also activates manual mode if currently in automatic)
def set_temp(self, temp) -> None:
def set_temp(self, temp: float) -> None:
"""Set the target temperature."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]))
# Set device on(1) or off(0), does not deactivate Wifi connectivity.
# Remote lock disables control by buttons on thermostat.
def set_power(self, power=1, remote_lock=0) -> None:
def set_power(self, power: int = 1, remote_lock: int = 0) -> None:
"""Set the power state of the device."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power]))
# set time on device
# n.b. day=1 is Monday, ..., day=7 is Sunday
def set_time(self, hour, minute, second, day) -> None:
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]))
@ -963,7 +954,7 @@ class hysen(device):
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday, weekend) -> None:
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule."""
# Begin with some magic values ...
input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18])
@ -1052,7 +1043,7 @@ class dooya(device):
device.__init__(self, *args, **kwargs)
self.type = "Dooya DT360E"
def _send(self, magic1, magic2) -> int:
def _send(self, magic1: int, magic2: int) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
@ -1082,7 +1073,7 @@ class dooya(device):
"""Return the position of the curtain."""
return self._send(0x06, 0x5d)
def set_percentage_and_wait(self, new_percentage) -> None:
def set_percentage_and_wait(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
current = self.get_percentage()
if current > new_percentage:
@ -1119,7 +1110,7 @@ class lb1(device):
device.__init__(self, *args, **kwargs)
self.type = "SmartBulb"
def send_command(self, command, type='set') -> None:
def send_command(self, command: str, type: str = 'set') -> None:
"""Send a command to the device."""
packet = bytearray(16+(int(len(command)/16) + 1)*16)
packet[0x00] = 0x0c + len(command) & 0xff
@ -1144,7 +1135,7 @@ class lb1(device):
if responseLength > 0:
self.state_dict = json.loads(payload[0x0e:0x0e+responseLength])
def set_json(self, jsonstr) -> str:
def set_json(self, jsonstr: str) -> str:
"""Send a command to the device and return state."""
reconvert = json.loads(jsonstr)
if 'bulb_sceneidx' in reconvert.keys():
@ -1153,7 +1144,7 @@ class lb1(device):
self.send_command(json.dumps(reconvert))
return json.dumps(self.state_dict)
def set_state(self, state) -> None:
def set_state(self, state: Union[str, int]) -> None:
"""Set the state of the device."""
cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0)
self.send_command(cmd)
@ -1166,7 +1157,7 @@ class lb1(device):
# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode.
# Only tested with Broadlink RM3 Mini (Blackbean)
def setup(ssid, password, security_mode) -> None:
def setup(ssid: str, password: str, security_mode: int) -> None:
"""Set up a new Broadlink device via AP mode."""
# Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
payload = bytearray(0x88)

View File

@ -1,4 +1,5 @@
"""Helper functions."""
from ctypes import c_ushort
import socket
from .exceptions import exception
@ -18,3 +19,40 @@ def get_local_ip() -> str:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('8.8.8.8', 53))
return s.getsockname()[0]
def calculate_crc16(input_data) -> int:
"""Calculate CRC-16."""
crc16_tab = []
crc16_constant = 0xA001
for i in range(0, 256):
crc = c_ushort(i).value
for j in range(0, 8):
if crc & 0x0001:
crc = c_ushort(crc >> 1).value ^ crc16_constant
else:
crc = c_ushort(crc >> 1).value
crc16_tab.append(hex(crc))
try:
is_string = isinstance(input_data, str)
is_bytes = isinstance(input_data, bytes)
if not is_string and not is_bytes:
raise Exception(
"Please provide a string or a byte sequence "
"as argument for calculation."
)
crcValue = 0xFFFF
for c in input_data:
d = ord(c) if is_string else c
tmp = crcValue ^ d
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0)
return crcValue
except Exception as e:
print("EXCEPTION(calculate): {}".format(e))