From 08c020e5977842c09bf8f7b72a3eceefbaaac952 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Thu, 17 Sep 2020 00:41:32 -0300 Subject: [PATCH] Call Mr. Krueger --- broadlink/__init__.py | 975 +----------------------------------------- broadlink/alarm.py | 58 +++ broadlink/climate.py | 203 +++++++++ broadlink/cover.py | 59 +++ broadlink/device.py | 205 +++++++++ broadlink/light.py | 71 +++ broadlink/remote.py | 133 ++++++ broadlink/sensor.py | 42 ++ broadlink/switch.py | 227 ++++++++++ 9 files changed, 1006 insertions(+), 967 deletions(-) create mode 100644 broadlink/alarm.py create mode 100644 broadlink/climate.py create mode 100644 broadlink/cover.py create mode 100644 broadlink/device.py create mode 100644 broadlink/light.py create mode 100644 broadlink/remote.py create mode 100644 broadlink/sensor.py create mode 100644 broadlink/switch.py diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 03e45fc..c3798dc 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,19 +1,19 @@ #!/usr/bin/python3 """The python-broadlink library.""" -import codecs -import json -import random import socket -import struct -import threading import time from datetime import datetime from typing import List, Union, Tuple -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - +from .alarm import S1C +from .cover import dooya +from .climate import hysen +from .device import device from .exceptions import check_error, exception +from .light import lb1 +from .remote import rm, rm2, rm4 +from .sensor import a1 +from .switch import bg1, mp1, sp1, sp2 from .helpers import calculate_crc16, get_local_ip @@ -196,965 +196,6 @@ def discover( return devices -class device: - """Controls a Broadlink device.""" - - def __init__( - self, - host: Tuple[str, int], - mac: Union[bytes, str], - devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, - ) -> None: - """Initialize the controller.""" - self.host = host - self.mac = mac.encode() if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272a - self.timeout = timeout - self.name = name - self.model = model - self.manufacturer = manufacturer - self.is_locked = is_locked - self.count = random.randrange(0xffff) - self.iv = bytearray( - [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) - self.id = bytearray([0, 0, 0, 0]) - self.type = "Unknown" - self.lock = threading.Lock() - - self.aes = None - key = bytearray( - [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) - self.update_aes(key) - - def update_aes(self, key: bytes) -> None: - """Update AES.""" - self.aes = Cipher( - algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() - ) - - def encrypt(self, payload: bytes) -> bytes: - """Encrypt the payload.""" - encryptor = self.aes.encryptor() - return encryptor.update(payload) + encryptor.finalize() - - def decrypt(self, payload: bytes) -> bytes: - """Decrypt the payload.""" - decryptor = self.aes.decryptor() - return decryptor.update(payload) + decryptor.finalize() - - def auth(self) -> bool: - """Authenticate to the device.""" - payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') - - response = self.send_packet(0x65, payload) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False - - self.id = payload[0x03::-1] - self.update_aes(key) - return True - - def get_fwversion(self) -> int: - """Get firmware version.""" - packet = bytearray([0x68]) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[0x4] | payload[0x5] << 8 - - def set_name(self, name: str) -> None: - """Set device name.""" - packet = bytearray(4) - packet += name.encode('utf-8') - packet += bytearray(0x50 - len(packet)) - packet[0x43] = bool(self.is_locked) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - self.name = name - - def set_lock(self, state: bool) -> None: - """Lock/unlock the device.""" - packet = bytearray(4) - packet += self.name.encode('utf-8') - packet += bytearray(0x50 - len(packet)) - packet[0x43] = bool(state) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - self.is_locked = bool(state) - - def get_type(self) -> str: - """Return device type.""" - return self.type - - def send_packet(self, command: int, payload: bytearray) -> bytearray: - """Send a packet to the device.""" - self.count = (self.count + 1) & 0xffff - packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa - packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa - packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xff - packet[0x25] = self.devtype >> 8 - packet[0x26] = command - packet[0x28] = self.count & 0xff - packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[5] - packet[0x2b] = self.mac[4] - packet[0x2c] = self.mac[3] - packet[0x2d] = self.mac[2] - packet[0x2e] = self.mac[1] - packet[0x2f] = self.mac[0] - packet[0x30] = self.id[3] - packet[0x31] = self.id[2] - packet[0x32] = self.id[1] - packet[0x33] = self.id[0] - - # pad the payload for AES encryption - if payload: - payload += bytearray((16 - len(payload)) % 16) - - checksum = sum(payload, 0xbeaf) & 0xffff - packet[0x34] = checksum & 0xff - packet[0x35] = checksum >> 8 - - payload = self.encrypt(payload) - for i in range(len(payload)): - packet.append(payload[i]) - - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - start_time = time.time() - with self.lock: - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - while True: - try: - cs.sendto(packet, self.host) - cs.settimeout(1) - resp, _ = cs.recvfrom(2048) - resp = bytearray(resp) - break - except socket.timeout: - if (time.time() - start_time) > self.timeout: - cs.close() - raise exception(-4000) # Network timeout. - cs.close() - - if len(resp) < 0x30: - raise exception(-4007) # Length error. - - checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: - raise exception(-4008) # Checksum error. - - return resp - - -class mp1(device): - """Controls a Broadlink MP1.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "MP1" - - def set_power_mask(self, sid_mask: int, state: bool) -> None: - """Set the power state of the device.""" - packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xc0 - packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 - - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def set_power(self, sid: int, state: bool) -> None: - """Set the power state of the device.""" - sid_mask = 0x01 << (sid - 1) - self.set_power_mask(sid_mask, state) - - def check_power_raw(self) -> bool: - """Return the power state of the device in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 - packet[0x08] = 0x01 - - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return payload[0x0e] - - def check_power(self) -> dict: - """Return the power state of the device.""" - state = self.check_power_raw() - if state is None: - return {'s1': None, 's2': None, 's3': None, 's4': None} - data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) - return data - - -class bg1(device): - """Controls a BG Electrical smart outlet.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "BG1" - - def get_state(self) -> dict: - """Return the power state of the device. - - Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` - """ - packet = self._encode(1, b'{}') - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def set_state( - self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, - ) -> dict: - """Set the power state of the device.""" - data = {} - if pwr is not None: - data['pwr'] = int(bool(pwr)) - if pwr1 is not None: - data['pwr1'] = int(bool(pwr1)) - if pwr2 is not None: - data['pwr2'] = int(bool(pwr2)) - if maxworktime is not None: - data['maxworktime'] = maxworktime - if maxworktime1 is not None: - data['maxworktime1'] = maxworktime1 - if maxworktime2 is not None: - data['maxworktime2'] = maxworktime2 - if idcbrightness is not None: - data['idcbrightness'] = idcbrightness - js = json.dumps(data).encode('utf8') - packet = self._encode(2, js) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def _encode(self, flag: int, js: str) -> bytearray: - """Encode a message.""" - # The packet format is: - # 0x00-0x01 length - # 0x02-0x05 header - # 0x06-0x07 00 - # 0x08 flag (1 for read or 2 write?) - # 0x09 unknown (0xb) - # 0x0a-0x0d length of json - # 0x0e- json data - packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into('> 8 - return packet - - def _decode(self, response: bytes) -> dict: - """Decode a message.""" - payload = self.decrypt(bytes(response[0x38:])) - js_len = struct.unpack_from(' None: - """Initialize the device.""" - device.__init__(self, *args, **kwargs) - self.type = "SP1" - - def set_power(self, state: bool) -> None: - """Set the power state of the device.""" - packet = bytearray(4) - packet[0] = state - response = self.send_packet(0x66, packet) - check_error(response[0x22:0x24]) - - -class sp2(device): - """Controls a Broadlink SP2.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SP2" - - def set_power(self, state: bool) -> None: - """Set the power state of the device.""" - packet = bytearray(16) - packet[0] = 2 - if self.check_nightlight(): - packet[4] = 3 if state else 2 - else: - packet[4] = 1 if state else 0 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def set_nightlight(self, state: bool) -> None: - """Set the night light state of the device.""" - packet = bytearray(16) - packet[0] = 2 - if self.check_power(): - packet[4] = 3 if state else 1 - else: - packet[4] = 2 if state else 0 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def check_power(self) -> bool: - """Return the power state of the device.""" - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x4], int): - return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) - return bool(ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD) - - def check_nightlight(self) -> bool: - """Return the state of the night light.""" - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x4], int): - return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) - return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF) - - def get_energy(self) -> int: - """Return the energy state of the device.""" - packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x7], int): - energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 - else: - energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int( - hex(ord(payload[0x05]))[2:]) / 100.0 - return energy - - -class a1(device): - """Controls a Broadlink A1.""" - - _SENSORS_AND_LEVELS = ( - ('light', ('dark', 'dim', 'normal', 'bright')), - ('air_quality', ('excellent', 'good', 'normal', 'bad')), - ('noise', ('quiet', 'normal', 'noisy')), - ) - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "A1" - - def check_sensors(self) -> dict: - """Return the state of the sensors.""" - data = self.check_sensors_raw() - for sensor, levels in self._SENSORS_AND_LEVELS: - try: - data[sensor] = levels[data[sensor]] - except IndexError: - data[sensor] = 'unknown' - return data - - def check_sensors_raw(self) -> dict: - """Return the state of the sensors in raw format.""" - packet = bytearray([0x1]) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - data = bytearray(payload[0x4:]) - return { - 'temperature': data[0x0] + data[0x1] / 10.0, - 'humidity': data[0x2] + data[0x3] / 10.0, - 'light': data[0x4], - 'air_quality': data[0x6], - 'noise': data[0x8], - } - - -class rm(device): - """Controls a Broadlink RM.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "RM2" - self._request_header = bytes() - self._code_sending_header = bytes() - - def check_data(self) -> bytes: - """Return the last captured code.""" - packet = bytearray(self._request_header) - packet.append(0x04) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return payload[len(self._request_header) + 4:] - - def send_data(self, data: bytes) -> None: - """Send a code to the device.""" - packet = bytearray(self._code_sending_header) - packet += bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def enter_learning(self) -> None: - """Enter infrared learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x03) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def sweep_frequency(self) -> None: - """Sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x19) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def cancel_sweep_frequency(self) -> None: - """Cancel sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x1e) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def check_frequency(self) -> bool: - """Return True if the frequency was identified successfully.""" - packet = bytearray(self._request_header) - packet.append(0x1a) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def find_rf_packet(self) -> bool: - """Enter radiofrequency learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x1b) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def _check_sensors(self, command: int) -> bytes: - """Return the state of the sensors in raw format.""" - packet = bytearray(self._request_header) - packet.append(command) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return bytearray(payload[len(self._request_header) + 4:]) - - def check_temperature(self) -> int: - """Return the temperature.""" - data = self._check_sensors(0x1) - return data[0x0] + data[0x1] / 10.0 - - def check_sensors(self) -> dict: - """Return the state of the sensors.""" - data = self._check_sensors(0x1) - return {'temperature': data[0x0] + data[0x1] / 10.0} - - -class rm4(rm): - """Controls a Broadlink RM4.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "RM4" - self._request_header = b'\x04\x00' - self._code_sending_header = b'\xda\x00' - - def check_temperature(self) -> int: - """Return the temperature.""" - data = self._check_sensors(0x24) - return data[0x0] + data[0x1] / 100.0 - - def check_humidity(self) -> int: - """Return the humidity.""" - data = self._check_sensors(0x24) - return data[0x2] + data[0x3] / 100.0 - - def check_sensors(self) -> dict: - """Return the state of the sensors.""" - data = self._check_sensors(0x24) - return { - 'temperature': data[0x0] + data[0x1] / 100.0, - 'humidity': data[0x2] + data[0x3] / 100.0 - } - - -# For legacy compatibility - don't use this -class rm2(rm): - def __init__(self): - device.__init__(self, None, None, None) - - def discover(self): - dev = discover() - self.host = dev.host - self.mac = dev.mac - - -class hysen(device): - """Controls a Hysen HVAC.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "Hysen heating controller" - - # Send a request - # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) - # Returns decrypted payload - # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails - # The function prepends length (2 bytes) and appends CRC - - def send_request(self, input_payload: bytes) -> bytes: - """Send a request to the device.""" - crc = calculate_crc16(bytes(input_payload)) - - # first byte is length, +2 for CRC16 - request_payload = bytearray([len(input_payload) + 2, 0x00]) - request_payload.extend(input_payload) - - # append CRC - request_payload.append(crc & 0xFF) - request_payload.append((crc >> 8) & 0xFF) - - # send to device - response = self.send_packet(0x6a, request_payload) - check_error(response[0x22:0x24]) - response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) - - # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) - response_payload_len = response_payload[0] - if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error', 'first byte of response is not length') - crc = calculate_crc16(bytes(response_payload[2:response_payload_len])) - if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): - return response_payload[2:response_payload_len] - raise ValueError('hysen_response_error', 'CRC check on response failed') - - def get_temp(self) -> int: - """Return the room temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[0x05] / 2.0 - - def get_external_temp(self) -> int: - """Return the external temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[18] / 2.0 - - def get_full_status(self) -> dict: - """Return the state of the device. - - Timer schedule included. - """ - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) - data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255) / 2.0 - data['thermostat_temp'] = (payload[6] & 255) / 2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255) / 2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] - - weekday = [] - for i in range(0, 6): - weekday.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekday'] = weekday - weekend = [] - for i in range(6, 8): - weekend.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekend'] = weekend - return data - - # Change controller mode - # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. - # Manual mode will activate last used temperature. - # In typical usage call set_temp to activate manual control and set temp. - # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] - # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule - # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule - # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: - """Set the mode of the device.""" - mode_byte = ((loop_mode + 1) << 4) + auto_mode - self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) - - # Advanced settings - # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, - # 2 for internal control temperature, external limit temperature. Factory default: 0. - # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C - # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C - # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C - # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C - # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C - # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, - # 1 for anti-freezing function open. Factory default: 0 - # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 - def set_advanced( - self, - loop_mode: int, - sensor: int, - osv: int, - dif: int, - svh: int, - svl: int, - adj: float, - fre: int, - poweron: int, - ) -> None: - """Set advanced options.""" - input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, - (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) - self.send_request(input_payload) - - # For backwards compatibility only. Prefer calling set_mode directly. - # Note this function invokes loop_mode=0 and sensor=0. - def switch_to_auto(self) -> None: - """Switch mode to auto.""" - self.set_mode(auto_mode=1, loop_mode=0) - - def switch_to_manual(self) -> None: - """Switch mode to manual.""" - self.set_mode(auto_mode=0, loop_mode=0) - - # Set temperature for manual mode (also activates manual mode if currently in automatic) - def set_temp(self, temp: float) -> None: - """Set the target temperature.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) - - # Set device on(1) or off(0), does not deactivate Wifi connectivity. - # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: - """Set the power state of the device.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) - - # set time on device - # n.b. day=1 is Monday, ..., day=7 is Sunday - def set_time(self, hour: int, minute: int, second: int, day: int) -> None: - """Set the time.""" - self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) - - # Set timer schedule - # Format is the same as you get from get_full_status. - # weekday is a list (ordered) of 6 dicts like: - # {'start_hour':17, 'start_minute':30, 'temp': 22 } - # Each one specifies the thermostat temp that will become effective at start_hour:start_minute - # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: - """Set timer schedule.""" - # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) - - # Now simply append times/temps - # weekday times - for i in range(0, 6): - input_payload.append(weekday[i]['start_hour']) - input_payload.append(weekday[i]['start_minute']) - - # weekend times - for i in range(0, 2): - input_payload.append(weekend[i]['start_hour']) - input_payload.append(weekend[i]['start_minute']) - - # weekday temperatures - for i in range(0, 6): - input_payload.append(int(weekday[i]['temp'] * 2)) - - # weekend temperatures - for i in range(0, 2): - input_payload.append(int(weekend[i]['temp'] * 2)) - - self.send_request(input_payload) - - -S1C_SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex -} - - -class S1C(device): - """Controls a Broadlink S1C.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = 'S1C' - - def get_sensors_status(self) -> dict: - """Return the state of the sensors.""" - packet = bytearray(16) - packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if not payload: - return None - count = payload[0x4] - sensors = payload[0x6:] - sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] - - sens_res = [] - for sens in sensors_a: - status = ord(chr(sens[0])) - _name = str(bytes(sens[4:26]).decode()) - _order = ord(chr(sens[1])) - _type = ord(chr(sens[3])) - _serial = bytes(codecs.encode(sens[26:30], "hex")).decode() - - type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') - - r = { - 'status': status, - 'name': _name.strip('\x00'), - 'type': type_str, - 'order': _order, - 'serial': _serial, - } - if r['serial'] != '00000000': - sens_res.append(r) - result = { - 'count': count, - 'sensors': sens_res - } - return result - - -class dooya(device): - """Controls a Dooya curtain motor.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "Dooya DT360E" - - def _send(self, magic1: int, magic2: int) -> int: - """Send a packet to the device.""" - packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xbb - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xfa - packet[10] = 0x44 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return ord(payload[4]) - - def open(self) -> int: - """Open the curtain.""" - return self._send(0x01, 0x00) - - def close(self) -> int: - """Close the curtain.""" - return self._send(0x02, 0x00) - - def stop(self) -> int: - """Stop the curtain.""" - return self._send(0x03, 0x00) - - def get_percentage(self) -> int: - """Return the position of the curtain.""" - return self._send(0x06, 0x5d) - - def set_percentage_and_wait(self, new_percentage: int) -> None: - """Set the position of the curtain.""" - current = self.get_percentage() - if current > new_percentage: - self.close() - while current is not None and current > new_percentage: - time.sleep(0.2) - current = self.get_percentage() - - elif current < new_percentage: - self.open() - while current is not None and current < new_percentage: - time.sleep(0.2) - current = self.get_percentage() - self.stop() - - -class lb1(device): - """Controls a Broadlink LB1.""" - - state_dict = [] - effect_map_dict = { - 'lovely color': 0, - 'flashlight': 1, - 'lightning': 2, - 'color fading': 3, - 'color breathing': 4, - 'multicolor breathing': 5, - 'color jumping': 6, - 'multicolor jumping': 7, - } - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SmartBulb" - - def send_command(self, command: str, type: str = 'set') -> None: - """Send a command to the device.""" - packet = bytearray(16+(int(len(command)/16) + 1)*16) - packet[0x00] = 0x0c + len(command) & 0xff - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0b - packet[0x0a] = len(command) - packet[0x0e:] = map(ord, command) - - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x06] = checksum & 0xff # Checksum 1 position - packet[0x07] = checksum >> 8 # Checksum 2 position - - response = self.send_packet(0x6a, packet) - check_error(response[0x36:0x38]) - payload = self.decrypt(bytes(response[0x38:])) - - responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) - if responseLength > 0: - self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) - - def set_json(self, jsonstr: str) -> str: - """Send a command to the device and return state.""" - reconvert = json.loads(jsonstr) - if 'bulb_sceneidx' in reconvert.keys(): - reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) - - self.send_command(json.dumps(reconvert)) - return json.dumps(self.state_dict) - - def set_state(self, state: Union[str, int]) -> None: - """Set the state of the device.""" - cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) - self.send_command(cmd) - - def get_state(self) -> dict: - """Return the state of the device.""" - cmd = "{}" - self.send_command(cmd) - return self.state_dict - # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) def setup(ssid: str, password: str, security_mode: int) -> None: diff --git a/broadlink/alarm.py b/broadlink/alarm.py new file mode 100644 index 0000000..ea1bc54 --- /dev/null +++ b/broadlink/alarm.py @@ -0,0 +1,58 @@ +import codecs + +from .device import device +from .exceptions import check_error + + +S1C_SENSORS_TYPES = { + 0x31: 'Door Sensor', # 49 as hex + 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse + 0x21: 'Motion Sensor' # 33 as hex +} + + +class S1C(device): + """Controls a Broadlink S1C.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = 'S1C' + + def get_sensors_status(self) -> dict: + """Return the state of the sensors.""" + packet = bytearray(16) + packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if not payload: + return None + count = payload[0x4] + sensors = payload[0x6:] + sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] + + sens_res = [] + for sens in sensors_a: + status = ord(chr(sens[0])) + _name = str(bytes(sens[4:26]).decode()) + _order = ord(chr(sens[1])) + _type = ord(chr(sens[3])) + _serial = bytes(codecs.encode(sens[26:30], "hex")).decode() + + type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') + + r = { + 'status': status, + 'name': _name.strip('\x00'), + 'type': type_str, + 'order': _order, + 'serial': _serial, + } + if r['serial'] != '00000000': + sens_res.append(r) + result = { + 'count': count, + 'sensors': sens_res + } + return result diff --git a/broadlink/climate.py b/broadlink/climate.py new file mode 100644 index 0000000..427063f --- /dev/null +++ b/broadlink/climate.py @@ -0,0 +1,203 @@ +from typing import List + +from .device import device +from .exceptions import check_error +from .helpers import calculate_crc16 + + +class hysen(device): + """Controls a Hysen HVAC.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "Hysen heating controller" + + # Send a request + # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) + # Returns decrypted payload + # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails + # The function prepends length (2 bytes) and appends CRC + + def send_request(self, input_payload: bytes) -> bytes: + """Send a request to the device.""" + crc = calculate_crc16(bytes(input_payload)) + + # first byte is length, +2 for CRC16 + request_payload = bytearray([len(input_payload) + 2, 0x00]) + request_payload.extend(input_payload) + + # append CRC + request_payload.append(crc & 0xFF) + request_payload.append((crc >> 8) & 0xFF) + + # send to device + response = self.send_packet(0x6a, request_payload) + check_error(response[0x22:0x24]) + response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) + + # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) + response_payload_len = response_payload[0] + if response_payload_len + 2 > len(response_payload): + raise ValueError('hysen_response_error', 'first byte of response is not length') + crc = calculate_crc16(bytes(response_payload[2:response_payload_len])) + if (response_payload[response_payload_len] == crc & 0xFF) and ( + response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): + return response_payload[2:response_payload_len] + raise ValueError('hysen_response_error', 'CRC check on response failed') + + def get_temp(self) -> int: + """Return the room temperature in degrees celsius.""" + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + return payload[0x05] / 2.0 + + def get_external_temp(self) -> int: + """Return the external temperature in degrees celsius.""" + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + return payload[18] / 2.0 + + def get_full_status(self) -> dict: + """Return the state of the device. + + Timer schedule included. + """ + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) + data = {} + data['remote_lock'] = payload[3] & 1 + data['power'] = payload[4] & 1 + data['active'] = (payload[4] >> 4) & 1 + data['temp_manual'] = (payload[4] >> 6) & 1 + data['room_temp'] = (payload[5] & 255) / 2.0 + data['thermostat_temp'] = (payload[6] & 255) / 2.0 + data['auto_mode'] = payload[7] & 15 + data['loop_mode'] = (payload[7] >> 4) & 15 + data['sensor'] = payload[8] + data['osv'] = payload[9] + data['dif'] = payload[10] + data['svh'] = payload[11] + data['svl'] = payload[12] + data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 + if data['room_temp_adj'] > 32767: + data['room_temp_adj'] = 32767 - data['room_temp_adj'] + data['fre'] = payload[15] + data['poweron'] = payload[16] + data['unknown'] = payload[17] + data['external_temp'] = (payload[18] & 255) / 2.0 + data['hour'] = payload[19] + data['min'] = payload[20] + data['sec'] = payload[21] + data['dayofweek'] = payload[22] + + weekday = [] + for i in range(0, 6): + weekday.append( + {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + + data['weekday'] = weekday + weekend = [] + for i in range(6, 8): + weekend.append( + {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + + data['weekend'] = weekend + return data + + # Change controller mode + # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. + # Manual mode will activate last used temperature. + # In typical usage call set_temp to activate manual control and set temp. + # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule + # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule + # The sensor command is currently experimental + def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + """Set the mode of the device.""" + mode_byte = ((loop_mode + 1) << 4) + auto_mode + self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) + + # Advanced settings + # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, + # 2 for internal control temperature, external limit temperature. Factory default: 0. + # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C + # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C + # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C + # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C + # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C + # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, + # 1 for anti-freezing function open. Factory default: 0 + # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 + def set_advanced( + self, + loop_mode: int, + sensor: int, + osv: int, + dif: int, + svh: int, + svl: int, + adj: float, + fre: int, + poweron: int, + ) -> None: + """Set advanced options.""" + input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, + (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) + self.send_request(input_payload) + + # For backwards compatibility only. Prefer calling set_mode directly. + # Note this function invokes loop_mode=0 and sensor=0. + def switch_to_auto(self) -> None: + """Switch mode to auto.""" + self.set_mode(auto_mode=1, loop_mode=0) + + def switch_to_manual(self) -> None: + """Switch mode to manual.""" + self.set_mode(auto_mode=0, loop_mode=0) + + # Set temperature for manual mode (also activates manual mode if currently in automatic) + def set_temp(self, temp: float) -> None: + """Set the target temperature.""" + self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) + + # Set device on(1) or off(0), does not deactivate Wifi connectivity. + # Remote lock disables control by buttons on thermostat. + def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + """Set the power state of the device.""" + self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) + + # set time on device + # n.b. day=1 is Monday, ..., day=7 is Sunday + def set_time(self, hour: int, minute: int, second: int, day: int) -> None: + """Set the time.""" + self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) + + # Set timer schedule + # Format is the same as you get from get_full_status. + # weekday is a list (ordered) of 6 dicts like: + # {'start_hour':17, 'start_minute':30, 'temp': 22 } + # Each one specifies the thermostat temp that will become effective at start_hour:start_minute + # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: + """Set timer schedule.""" + # Begin with some magic values ... + input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) + + # Now simply append times/temps + # weekday times + for i in range(0, 6): + input_payload.append(weekday[i]['start_hour']) + input_payload.append(weekday[i]['start_minute']) + + # weekend times + for i in range(0, 2): + input_payload.append(weekend[i]['start_hour']) + input_payload.append(weekend[i]['start_minute']) + + # weekday temperatures + for i in range(0, 6): + input_payload.append(int(weekday[i]['temp'] * 2)) + + # weekend temperatures + for i in range(0, 2): + input_payload.append(int(weekend[i]['temp'] * 2)) + + self.send_request(input_payload) diff --git a/broadlink/cover.py b/broadlink/cover.py new file mode 100644 index 0000000..485e6c7 --- /dev/null +++ b/broadlink/cover.py @@ -0,0 +1,59 @@ +import time + +from .device import device +from .exceptions import check_error + + +class dooya(device): + """Controls a Dooya curtain motor.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "Dooya DT360E" + + def _send(self, magic1: int, magic2: int) -> int: + """Send a packet to the device.""" + packet = bytearray(16) + packet[0] = 0x09 + packet[2] = 0xbb + packet[3] = magic1 + packet[4] = magic2 + packet[9] = 0xfa + packet[10] = 0x44 + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + return ord(payload[4]) + + def open(self) -> int: + """Open the curtain.""" + return self._send(0x01, 0x00) + + def close(self) -> int: + """Close the curtain.""" + return self._send(0x02, 0x00) + + def stop(self) -> int: + """Stop the curtain.""" + return self._send(0x03, 0x00) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + return self._send(0x06, 0x5d) + + def set_percentage_and_wait(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + current = self.get_percentage() + if current > new_percentage: + self.close() + while current is not None and current > new_percentage: + time.sleep(0.2) + current = self.get_percentage() + + elif current < new_percentage: + self.open() + while current is not None and current < new_percentage: + time.sleep(0.2) + current = self.get_percentage() + self.stop() diff --git a/broadlink/device.py b/broadlink/device.py new file mode 100644 index 0000000..92916f9 --- /dev/null +++ b/broadlink/device.py @@ -0,0 +1,205 @@ +import socket +import threading +import random +import time +from typing import Tuple, Union + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from .exceptions import check_error, exception + + +class device: + """Controls a Broadlink device.""" + + def __init__( + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, + ) -> None: + """Initialize the controller.""" + self.host = host + self.mac = mac.encode() if isinstance(mac, str) else mac + self.devtype = devtype if devtype is not None else 0x272a + self.timeout = timeout + self.name = name + self.model = model + self.manufacturer = manufacturer + self.is_locked = is_locked + self.count = random.randrange(0xffff) + self.iv = bytearray( + [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) + self.id = bytearray([0, 0, 0, 0]) + self.type = "Unknown" + self.lock = threading.Lock() + + self.aes = None + key = bytearray( + [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) + self.update_aes(key) + + def update_aes(self, key: bytes) -> None: + """Update AES.""" + self.aes = Cipher( + algorithms.AES(key), modes.CBC(self.iv), backend=default_backend() + ) + + def encrypt(self, payload: bytes) -> bytes: + """Encrypt the payload.""" + encryptor = self.aes.encryptor() + return encryptor.update(payload) + encryptor.finalize() + + def decrypt(self, payload: bytes) -> bytes: + """Decrypt the payload.""" + decryptor = self.aes.decryptor() + return decryptor.update(payload) + decryptor.finalize() + + def auth(self) -> bool: + """Authenticate to the device.""" + payload = bytearray(0x50) + payload[0x04] = 0x31 + payload[0x05] = 0x31 + payload[0x06] = 0x31 + payload[0x07] = 0x31 + payload[0x08] = 0x31 + payload[0x09] = 0x31 + payload[0x0a] = 0x31 + payload[0x0b] = 0x31 + payload[0x0c] = 0x31 + payload[0x0d] = 0x31 + payload[0x0e] = 0x31 + payload[0x0f] = 0x31 + payload[0x10] = 0x31 + payload[0x11] = 0x31 + payload[0x12] = 0x31 + payload[0x1e] = 0x01 + payload[0x2d] = 0x01 + payload[0x30] = ord('T') + payload[0x31] = ord('e') + payload[0x32] = ord('s') + payload[0x33] = ord('t') + payload[0x34] = ord(' ') + payload[0x35] = ord(' ') + payload[0x36] = ord('1') + + response = self.send_packet(0x65, payload) + check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + key = payload[0x04:0x14] + if len(key) % 16 != 0: + return False + + self.id = payload[0x03::-1] + self.update_aes(key) + return True + + def get_fwversion(self) -> int: + """Get firmware version.""" + packet = bytearray([0x68]) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x4] | payload[0x5] << 8 + + def set_name(self, name: str) -> None: + """Set device name.""" + packet = bytearray(4) + packet += name.encode('utf-8') + packet += bytearray(0x50 - len(packet)) + packet[0x43] = bool(self.is_locked) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + self.name = name + + def set_lock(self, state: bool) -> None: + """Lock/unlock the device.""" + packet = bytearray(4) + packet += self.name.encode('utf-8') + packet += bytearray(0x50 - len(packet)) + packet[0x43] = bool(state) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + self.is_locked = bool(state) + + def get_type(self) -> str: + """Return device type.""" + return self.type + + def send_packet(self, command: int, payload: bytearray) -> bytearray: + """Send a packet to the device.""" + self.count = (self.count + 1) & 0xffff + packet = bytearray(0x38) + packet[0x00] = 0x5a + packet[0x01] = 0xa5 + packet[0x02] = 0xaa + packet[0x03] = 0x55 + packet[0x04] = 0x5a + packet[0x05] = 0xa5 + packet[0x06] = 0xaa + packet[0x07] = 0x55 + packet[0x24] = self.devtype & 0xff + packet[0x25] = self.devtype >> 8 + packet[0x26] = command + packet[0x28] = self.count & 0xff + packet[0x29] = self.count >> 8 + packet[0x2a] = self.mac[5] + packet[0x2b] = self.mac[4] + packet[0x2c] = self.mac[3] + packet[0x2d] = self.mac[2] + packet[0x2e] = self.mac[1] + packet[0x2f] = self.mac[0] + packet[0x30] = self.id[3] + packet[0x31] = self.id[2] + packet[0x32] = self.id[1] + packet[0x33] = self.id[0] + + # pad the payload for AES encryption + if payload: + payload += bytearray((16 - len(payload)) % 16) + + checksum = sum(payload, 0xbeaf) & 0xffff + packet[0x34] = checksum & 0xff + packet[0x35] = checksum >> 8 + + payload = self.encrypt(payload) + for i in range(len(payload)): + packet.append(payload[i]) + + checksum = sum(packet, 0xbeaf) & 0xffff + packet[0x20] = checksum & 0xff + packet[0x21] = checksum >> 8 + + start_time = time.time() + with self.lock: + cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + while True: + try: + cs.sendto(packet, self.host) + cs.settimeout(1) + resp, _ = cs.recvfrom(2048) + resp = bytearray(resp) + break + except socket.timeout: + if (time.time() - start_time) > self.timeout: + cs.close() + raise exception(-4000) # Network timeout. + cs.close() + + if len(resp) < 0x30: + raise exception(-4007) # Length error. + + checksum = resp[0x20] | (resp[0x21] << 8) + if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: + raise exception(-4008) # Checksum error. + + return resp diff --git a/broadlink/light.py b/broadlink/light.py new file mode 100644 index 0000000..27d3ec4 --- /dev/null +++ b/broadlink/light.py @@ -0,0 +1,71 @@ +import json +from typing import Union + +from .device import device +from .exceptions import check_error + + +class lb1(device): + """Controls a Broadlink LB1.""" + + state_dict = [] + effect_map_dict = { + 'lovely color': 0, + 'flashlight': 1, + 'lightning': 2, + 'color fading': 3, + 'color breathing': 4, + 'multicolor breathing': 5, + 'color jumping': 6, + 'multicolor jumping': 7, + } + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "SmartBulb" + + def send_command(self, command: str, type: str = 'set') -> None: + """Send a command to the device.""" + packet = bytearray(16+(int(len(command)/16) + 1)*16) + packet[0x00] = 0x0c + len(command) & 0xff + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set + packet[0x09] = 0x0b + packet[0x0a] = len(command) + packet[0x0e:] = map(ord, command) + + checksum = sum(packet, 0xbeaf) & 0xffff + packet[0x06] = checksum & 0xff # Checksum 1 position + packet[0x07] = checksum >> 8 # Checksum 2 position + + response = self.send_packet(0x6a, packet) + check_error(response[0x36:0x38]) + payload = self.decrypt(bytes(response[0x38:])) + + responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) + if responseLength > 0: + self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) + + def set_json(self, jsonstr: str) -> str: + """Send a command to the device and return state.""" + reconvert = json.loads(jsonstr) + if 'bulb_sceneidx' in reconvert.keys(): + reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + + self.send_command(json.dumps(reconvert)) + return json.dumps(self.state_dict) + + def set_state(self, state: Union[str, int]) -> None: + """Set the state of the device.""" + cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) + self.send_command(cmd) + + def get_state(self) -> dict: + """Return the state of the device.""" + cmd = "{}" + self.send_command(cmd) + return self.state_dict diff --git a/broadlink/remote.py b/broadlink/remote.py new file mode 100644 index 0000000..236f6c4 --- /dev/null +++ b/broadlink/remote.py @@ -0,0 +1,133 @@ +from .device import device +from .exceptions import check_error + + +class rm(device): + """Controls a Broadlink RM.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "RM2" + self._request_header = bytes() + self._code_sending_header = bytes() + + def check_data(self) -> bytes: + """Return the last captured code.""" + packet = bytearray(self._request_header) + packet.append(0x04) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + return payload[len(self._request_header) + 4:] + + def send_data(self, data: bytes) -> None: + """Send a code to the device.""" + packet = bytearray(self._code_sending_header) + packet += bytearray([0x02, 0x00, 0x00, 0x00]) + packet += data + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def enter_learning(self) -> None: + """Enter infrared learning mode.""" + packet = bytearray(self._request_header) + packet.append(0x03) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def sweep_frequency(self) -> None: + """Sweep frequency.""" + packet = bytearray(self._request_header) + packet.append(0x19) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def cancel_sweep_frequency(self) -> None: + """Cancel sweep frequency.""" + packet = bytearray(self._request_header) + packet.append(0x1e) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def check_frequency(self) -> bool: + """Return True if the frequency was identified successfully.""" + packet = bytearray(self._request_header) + packet.append(0x1a) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if payload[len(self._request_header) + 4] == 1: + return True + return False + + def find_rf_packet(self) -> bool: + """Enter radiofrequency learning mode.""" + packet = bytearray(self._request_header) + packet.append(0x1b) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if payload[len(self._request_header) + 4] == 1: + return True + return False + + def _check_sensors(self, command: int) -> bytes: + """Return the state of the sensors in raw format.""" + packet = bytearray(self._request_header) + packet.append(command) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + return bytearray(payload[len(self._request_header) + 4:]) + + def check_temperature(self) -> int: + """Return the temperature.""" + data = self._check_sensors(0x1) + return data[0x0] + data[0x1] / 10.0 + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self._check_sensors(0x1) + return {'temperature': data[0x0] + data[0x1] / 10.0} + + +class rm4(rm): + """Controls a Broadlink RM4.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "RM4" + self._request_header = b'\x04\x00' + self._code_sending_header = b'\xda\x00' + + def check_temperature(self) -> int: + """Return the temperature.""" + data = self._check_sensors(0x24) + return data[0x0] + data[0x1] / 100.0 + + def check_humidity(self) -> int: + """Return the humidity.""" + data = self._check_sensors(0x24) + return data[0x2] + data[0x3] / 100.0 + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self._check_sensors(0x24) + return { + 'temperature': data[0x0] + data[0x1] / 100.0, + 'humidity': data[0x2] + data[0x3] / 100.0 + } + + +# For legacy compatibility - don't use this +class rm2(rm): + def __init__(self): + device.__init__(self, None, None, None) + + def discover(self): + dev = discover() + self.host = dev.host + self.mac = dev.mac + diff --git a/broadlink/sensor.py b/broadlink/sensor.py new file mode 100644 index 0000000..19db0e8 --- /dev/null +++ b/broadlink/sensor.py @@ -0,0 +1,42 @@ +from .device import device +from .exceptions import check_error + + +class a1(device): + """Controls a Broadlink A1.""" + + _SENSORS_AND_LEVELS = ( + ('light', ('dark', 'dim', 'normal', 'bright')), + ('air_quality', ('excellent', 'good', 'normal', 'bad')), + ('noise', ('quiet', 'normal', 'noisy')), + ) + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "A1" + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self.check_sensors_raw() + for sensor, levels in self._SENSORS_AND_LEVELS: + try: + data[sensor] = levels[data[sensor]] + except IndexError: + data[sensor] = 'unknown' + return data + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + packet = bytearray([0x1]) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + data = bytearray(payload[0x4:]) + return { + 'temperature': data[0x0] + data[0x1] / 10.0, + 'humidity': data[0x2] + data[0x3] / 10.0, + 'light': data[0x4], + 'air_quality': data[0x6], + 'noise': data[0x8], + } diff --git a/broadlink/switch.py b/broadlink/switch.py new file mode 100644 index 0000000..fdc24d1 --- /dev/null +++ b/broadlink/switch.py @@ -0,0 +1,227 @@ +import json +import struct + +from .device import device +from .exceptions import check_error + + +class mp1(device): + """Controls a Broadlink MP1.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "MP1" + + def set_power_mask(self, sid_mask: int, state: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0d + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) + packet[0x07] = 0xc0 + packet[0x08] = 0x02 + packet[0x0a] = 0x03 + packet[0x0d] = sid_mask + packet[0x0e] = sid_mask if state else 0 + + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def set_power(self, sid: int, state: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, state) + + def check_power_raw(self) -> bool: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0a + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x06] = 0xae + packet[0x07] = 0xc0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + return payload[0x0e] + + def check_power(self) -> dict: + """Return the power state of the device.""" + state = self.check_power_raw() + if state is None: + return {'s1': None, 's2': None, 's3': None, 's4': None} + data = {} + data['s1'] = bool(state & 0x01) + data['s2'] = bool(state & 0x02) + data['s3'] = bool(state & 0x04) + data['s4'] = bool(state & 0x08) + return data + + +class bg1(device): + """Controls a BG Electrical smart outlet.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "BG1" + + def get_state(self) -> dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, b'{}') + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: bool = None, + pwr1: bool = None, + pwr2: bool = None, + maxworktime: int = None, + maxworktime1: int = None, + maxworktime2: int = None, + idcbrightness: int = None, + ) -> dict: + """Set the power state of the device.""" + data = {} + if pwr is not None: + data['pwr'] = int(bool(pwr)) + if pwr1 is not None: + data['pwr1'] = int(bool(pwr1)) + if pwr2 is not None: + data['pwr2'] = int(bool(pwr2)) + if maxworktime is not None: + data['maxworktime'] = maxworktime + if maxworktime1 is not None: + data['maxworktime1'] = maxworktime1 + if maxworktime2 is not None: + data['maxworktime2'] = maxworktime2 + if idcbrightness is not None: + data['idcbrightness'] = idcbrightness + js = json.dumps(data).encode('utf8') + packet = self._encode(2, js) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, js: str) -> bytearray: + """Encode a message.""" + # The packet format is: + # 0x00-0x01 length + # 0x02-0x05 header + # 0x06-0x07 00 + # 0x08 flag (1 for read or 2 write?) + # 0x09 unknown (0xb) + # 0x0a-0x0d length of json + # 0x0e- json data + packet = bytearray(14) + length = 4 + 2 + 2 + 4 + len(js) + struct.pack_into('> 8 + return packet + + def _decode(self, response: bytes) -> dict: + """Decode a message.""" + payload = self.decrypt(bytes(response[0x38:])) + js_len = struct.unpack_from(' None: + """Initialize the device.""" + device.__init__(self, *args, **kwargs) + self.type = "SP1" + + def set_power(self, state: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(4) + packet[0] = state + response = self.send_packet(0x66, packet) + check_error(response[0x22:0x24]) + + +class sp2(device): + """Controls a Broadlink SP2.""" + + def __init__(self, *args, **kwargs) -> None: + """Initialize the controller.""" + device.__init__(self, *args, **kwargs) + self.type = "SP2" + + def set_power(self, state: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + if self.check_nightlight(): + packet[4] = 3 if state else 2 + else: + packet[4] = 1 if state else 0 + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def set_nightlight(self, state: bool) -> None: + """Set the night light state of the device.""" + packet = bytearray(16) + packet[0] = 2 + if self.check_power(): + packet[4] = 3 if state else 1 + else: + packet[4] = 2 if state else 0 + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) + return bool(ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) + return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF) + + def get_energy(self) -> int: + """Return the energy state of the device.""" + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x7], int): + energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 + else: + energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int( + hex(ord(payload[0x05]))[2:]) / 100.0 + return energy