diff --git a/broadlink/climate.py b/broadlink/climate.py index 98770cd..32c6ffe 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,4 +1,4 @@ -"""Support for climate control.""" +"""Support for HVAC units.""" import typing as t from . import exceptions as e @@ -106,8 +106,8 @@ class hysen(Device): # Manual mode will activate last used temperature. # In typical usage call set_temp to activate manual control and set temp. # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] - # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule - # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) + # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: """Set the mode of the device.""" @@ -124,7 +124,7 @@ class hysen(Device): # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, # 1 for anti-freezing function open. Factory default: 0 - # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 + # Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0 def set_advanced( self, loop_mode: int, diff --git a/broadlink/device.py b/broadlink/device.py index c1f8fa8..74e916f 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -9,7 +9,12 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from . import exceptions as e -from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_RETRY_INTVL, DEFAULT_TIMEOUT +from .const import ( + DEFAULT_BCAST_ADDR, + DEFAULT_PORT, + DEFAULT_RETRY_INTVL, + DEFAULT_TIMEOUT, +) from .protocol import Datetime HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] @@ -48,7 +53,7 @@ def scan( try: while (time.time() - start_time) < timeout: time_left = timeout - (time.time() - start_time) - conn.settimeout(min(1, time_left)) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) conn.sendto(packet, (discover_ip_address, discover_ip_port)) while True: diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py index 19fdf40..2343ad6 100644 --- a/broadlink/exceptions.py +++ b/broadlink/exceptions.py @@ -27,6 +27,7 @@ class BroadlinkException(Exception): def __eq__(self, other): """Return self==value.""" + # pylint: disable=unidiomatic-typecheck return type(self) == type(other) and self.args == other.args def __hash__(self): diff --git a/broadlink/protocol.py b/broadlink/protocol.py index effee38..b5b09d7 100644 --- a/broadlink/protocol.py +++ b/broadlink/protocol.py @@ -1,3 +1,4 @@ +"""The networking part of the python-broadlink library.""" import datetime as dt import time diff --git a/broadlink/switch.py b/broadlink/switch.py index 7e0f16e..1079cde 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -6,130 +6,6 @@ from . import exceptions as e from .device import Device -class mp1(Device): - """Controls a Broadlink MP1.""" - - TYPE = "MP1" - - def set_power_mask(self, sid_mask: int, pwr: bool) -> None: - """Set the power state of the device.""" - packet = bytearray(16) - packet[0x00] = 0x0D - packet[0x02] = 0xA5 - packet[0x03] = 0xA5 - packet[0x04] = 0x5A - packet[0x05] = 0x5A - packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) - packet[0x07] = 0xC0 - packet[0x08] = 0x02 - packet[0x0A] = 0x03 - packet[0x0D] = sid_mask - packet[0x0E] = sid_mask if pwr else 0 - - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - - def set_power(self, sid: int, pwr: bool) -> None: - """Set the power state of the device.""" - sid_mask = 0x01 << (sid - 1) - self.set_power_mask(sid_mask, pwr) - - def check_power_raw(self) -> int: - """Return the power state of the device in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0A - packet[0x02] = 0xA5 - packet[0x03] = 0xA5 - packet[0x04] = 0x5A - packet[0x05] = 0x5A - packet[0x06] = 0xAE - packet[0x07] = 0xC0 - packet[0x08] = 0x01 - - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[0x0E] - - def check_power(self) -> dict: - """Return the power state of the device.""" - data = self.check_power_raw() - return { - "s1": bool(data & 1), - "s2": bool(data & 2), - "s3": bool(data & 4), - "s4": bool(data & 8), - } - - -class bg1(Device): - """Controls a BG Electrical smart outlet.""" - - TYPE = "BG1" - - def get_state(self) -> dict: - """Return the power state of the device. - - Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` - """ - packet = self._encode(1, {}) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - return self._decode(response) - - def set_state( - self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, - ) -> dict: - """Set the power state of the device.""" - state = {} - if pwr is not None: - state["pwr"] = int(bool(pwr)) - if pwr1 is not None: - state["pwr1"] = int(bool(pwr1)) - if pwr2 is not None: - state["pwr2"] = int(bool(pwr2)) - if maxworktime is not None: - state["maxworktime"] = maxworktime - if maxworktime1 is not None: - state["maxworktime1"] = maxworktime1 - if maxworktime2 is not None: - state["maxworktime2"] = maxworktime2 - if idcbrightness is not None: - state["idcbrightness"] = idcbrightness - - packet = self._encode(2, state) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - return self._decode(response) - - def _encode(self, flag: int, state: dict) -> bytes: - """Encode a message.""" - packet = bytearray(14) - data = json.dumps(state).encode() - length = 12 + len(data) - struct.pack_into( - " dict: - """Decode a message.""" - payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: bool = None, + pwr1: bool = None, + pwr2: bool = None, + maxworktime: int = None, + maxworktime1: int = None, + maxworktime2: int = None, + idcbrightness: int = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if maxworktime is not None: + state["maxworktime"] = maxworktime + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) + packet[0x07] = 0xC0 + packet[0x08] = 0x02 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if pwr else 0 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_power(self, sid: int, pwr: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, pwr) + + def check_power_raw(self) -> int: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x0E] + + def check_power(self) -> dict: + """Return the power state of the device.""" + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + }