mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-22 07:00:12 +01:00
Improve code quality (#585)
* Improve docstrings * Fix line-too-long * Disable unidiomatic-typecheck * Move smart plugs to the top * Use constants from const.py * Run black
This commit is contained in:
parent
d48d1347a3
commit
6a54803a36
@ -1,4 +1,4 @@
|
|||||||
"""Support for climate control."""
|
"""Support for HVAC units."""
|
||||||
import typing as t
|
import typing as t
|
||||||
|
|
||||||
from . import exceptions as e
|
from . import exceptions as e
|
||||||
@ -106,8 +106,8 @@ class hysen(Device):
|
|||||||
# Manual mode will activate last used temperature.
|
# Manual mode will activate last used temperature.
|
||||||
# In typical usage call set_temp to activate manual control and set temp.
|
# In typical usage call set_temp to activate manual control and set temp.
|
||||||
# loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ]
|
# loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ]
|
||||||
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule
|
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
|
||||||
# loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule
|
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
|
||||||
# The sensor command is currently experimental
|
# The sensor command is currently experimental
|
||||||
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
|
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
|
||||||
"""Set the mode of the device."""
|
"""Set the mode of the device."""
|
||||||
@ -124,7 +124,7 @@ class hysen(Device):
|
|||||||
# Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C
|
# Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C
|
||||||
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
|
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
|
||||||
# 1 for anti-freezing function open. Factory default: 0
|
# 1 for anti-freezing function open. Factory default: 0
|
||||||
# Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0
|
# Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0
|
||||||
def set_advanced(
|
def set_advanced(
|
||||||
self,
|
self,
|
||||||
loop_mode: int,
|
loop_mode: int,
|
||||||
|
@ -9,7 +9,12 @@ from cryptography.hazmat.backends import default_backend
|
|||||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||||
|
|
||||||
from . import exceptions as e
|
from . import exceptions as e
|
||||||
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_RETRY_INTVL, DEFAULT_TIMEOUT
|
from .const import (
|
||||||
|
DEFAULT_BCAST_ADDR,
|
||||||
|
DEFAULT_PORT,
|
||||||
|
DEFAULT_RETRY_INTVL,
|
||||||
|
DEFAULT_TIMEOUT,
|
||||||
|
)
|
||||||
from .protocol import Datetime
|
from .protocol import Datetime
|
||||||
|
|
||||||
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
|
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
|
||||||
@ -48,7 +53,7 @@ def scan(
|
|||||||
try:
|
try:
|
||||||
while (time.time() - start_time) < timeout:
|
while (time.time() - start_time) < timeout:
|
||||||
time_left = timeout - (time.time() - start_time)
|
time_left = timeout - (time.time() - start_time)
|
||||||
conn.settimeout(min(1, time_left))
|
conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left))
|
||||||
conn.sendto(packet, (discover_ip_address, discover_ip_port))
|
conn.sendto(packet, (discover_ip_address, discover_ip_port))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -27,6 +27,7 @@ class BroadlinkException(Exception):
|
|||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
"""Return self==value."""
|
"""Return self==value."""
|
||||||
|
# pylint: disable=unidiomatic-typecheck
|
||||||
return type(self) == type(other) and self.args == other.args
|
return type(self) == type(other) and self.args == other.args
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
"""The networking part of the python-broadlink library."""
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -6,130 +6,6 @@ from . import exceptions as e
|
|||||||
from .device import Device
|
from .device import Device
|
||||||
|
|
||||||
|
|
||||||
class mp1(Device):
|
|
||||||
"""Controls a Broadlink MP1."""
|
|
||||||
|
|
||||||
TYPE = "MP1"
|
|
||||||
|
|
||||||
def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
|
|
||||||
"""Set the power state of the device."""
|
|
||||||
packet = bytearray(16)
|
|
||||||
packet[0x00] = 0x0D
|
|
||||||
packet[0x02] = 0xA5
|
|
||||||
packet[0x03] = 0xA5
|
|
||||||
packet[0x04] = 0x5A
|
|
||||||
packet[0x05] = 0x5A
|
|
||||||
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
|
|
||||||
packet[0x07] = 0xC0
|
|
||||||
packet[0x08] = 0x02
|
|
||||||
packet[0x0A] = 0x03
|
|
||||||
packet[0x0D] = sid_mask
|
|
||||||
packet[0x0E] = sid_mask if pwr else 0
|
|
||||||
|
|
||||||
response = self.send_packet(0x6A, packet)
|
|
||||||
e.check_error(response[0x22:0x24])
|
|
||||||
|
|
||||||
def set_power(self, sid: int, pwr: bool) -> None:
|
|
||||||
"""Set the power state of the device."""
|
|
||||||
sid_mask = 0x01 << (sid - 1)
|
|
||||||
self.set_power_mask(sid_mask, pwr)
|
|
||||||
|
|
||||||
def check_power_raw(self) -> int:
|
|
||||||
"""Return the power state of the device in raw format."""
|
|
||||||
packet = bytearray(16)
|
|
||||||
packet[0x00] = 0x0A
|
|
||||||
packet[0x02] = 0xA5
|
|
||||||
packet[0x03] = 0xA5
|
|
||||||
packet[0x04] = 0x5A
|
|
||||||
packet[0x05] = 0x5A
|
|
||||||
packet[0x06] = 0xAE
|
|
||||||
packet[0x07] = 0xC0
|
|
||||||
packet[0x08] = 0x01
|
|
||||||
|
|
||||||
response = self.send_packet(0x6A, packet)
|
|
||||||
e.check_error(response[0x22:0x24])
|
|
||||||
payload = self.decrypt(response[0x38:])
|
|
||||||
return payload[0x0E]
|
|
||||||
|
|
||||||
def check_power(self) -> dict:
|
|
||||||
"""Return the power state of the device."""
|
|
||||||
data = self.check_power_raw()
|
|
||||||
return {
|
|
||||||
"s1": bool(data & 1),
|
|
||||||
"s2": bool(data & 2),
|
|
||||||
"s3": bool(data & 4),
|
|
||||||
"s4": bool(data & 8),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class bg1(Device):
|
|
||||||
"""Controls a BG Electrical smart outlet."""
|
|
||||||
|
|
||||||
TYPE = "BG1"
|
|
||||||
|
|
||||||
def get_state(self) -> dict:
|
|
||||||
"""Return the power state of the device.
|
|
||||||
|
|
||||||
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
|
|
||||||
"""
|
|
||||||
packet = self._encode(1, {})
|
|
||||||
response = self.send_packet(0x6A, packet)
|
|
||||||
e.check_error(response[0x22:0x24])
|
|
||||||
return self._decode(response)
|
|
||||||
|
|
||||||
def set_state(
|
|
||||||
self,
|
|
||||||
pwr: bool = None,
|
|
||||||
pwr1: bool = None,
|
|
||||||
pwr2: bool = None,
|
|
||||||
maxworktime: int = None,
|
|
||||||
maxworktime1: int = None,
|
|
||||||
maxworktime2: int = None,
|
|
||||||
idcbrightness: int = None,
|
|
||||||
) -> dict:
|
|
||||||
"""Set the power state of the device."""
|
|
||||||
state = {}
|
|
||||||
if pwr is not None:
|
|
||||||
state["pwr"] = int(bool(pwr))
|
|
||||||
if pwr1 is not None:
|
|
||||||
state["pwr1"] = int(bool(pwr1))
|
|
||||||
if pwr2 is not None:
|
|
||||||
state["pwr2"] = int(bool(pwr2))
|
|
||||||
if maxworktime is not None:
|
|
||||||
state["maxworktime"] = maxworktime
|
|
||||||
if maxworktime1 is not None:
|
|
||||||
state["maxworktime1"] = maxworktime1
|
|
||||||
if maxworktime2 is not None:
|
|
||||||
state["maxworktime2"] = maxworktime2
|
|
||||||
if idcbrightness is not None:
|
|
||||||
state["idcbrightness"] = idcbrightness
|
|
||||||
|
|
||||||
packet = self._encode(2, state)
|
|
||||||
response = self.send_packet(0x6A, packet)
|
|
||||||
e.check_error(response[0x22:0x24])
|
|
||||||
return self._decode(response)
|
|
||||||
|
|
||||||
def _encode(self, flag: int, state: dict) -> bytes:
|
|
||||||
"""Encode a message."""
|
|
||||||
packet = bytearray(14)
|
|
||||||
data = json.dumps(state).encode()
|
|
||||||
length = 12 + len(data)
|
|
||||||
struct.pack_into(
|
|
||||||
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
|
|
||||||
)
|
|
||||||
packet.extend(data)
|
|
||||||
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
|
|
||||||
packet[0x06:0x08] = checksum.to_bytes(2, "little")
|
|
||||||
return packet
|
|
||||||
|
|
||||||
def _decode(self, response: bytes) -> dict:
|
|
||||||
"""Decode a message."""
|
|
||||||
payload = self.decrypt(response[0x38:])
|
|
||||||
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
|
|
||||||
state = json.loads(payload[0x0E : 0x0E + js_len])
|
|
||||||
return state
|
|
||||||
|
|
||||||
|
|
||||||
class sp1(Device):
|
class sp1(Device):
|
||||||
"""Controls a Broadlink SP1."""
|
"""Controls a Broadlink SP1."""
|
||||||
|
|
||||||
@ -360,3 +236,127 @@ class sp4b(sp4):
|
|||||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||||
state = json.loads(payload[0x0E : 0x0E + js_len])
|
state = json.loads(payload[0x0E : 0x0E + js_len])
|
||||||
return state
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
class bg1(Device):
|
||||||
|
"""Controls a BG Electrical smart outlet."""
|
||||||
|
|
||||||
|
TYPE = "BG1"
|
||||||
|
|
||||||
|
def get_state(self) -> dict:
|
||||||
|
"""Return the power state of the device.
|
||||||
|
|
||||||
|
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
|
||||||
|
"""
|
||||||
|
packet = self._encode(1, {})
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
e.check_error(response[0x22:0x24])
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def set_state(
|
||||||
|
self,
|
||||||
|
pwr: bool = None,
|
||||||
|
pwr1: bool = None,
|
||||||
|
pwr2: bool = None,
|
||||||
|
maxworktime: int = None,
|
||||||
|
maxworktime1: int = None,
|
||||||
|
maxworktime2: int = None,
|
||||||
|
idcbrightness: int = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Set the power state of the device."""
|
||||||
|
state = {}
|
||||||
|
if pwr is not None:
|
||||||
|
state["pwr"] = int(bool(pwr))
|
||||||
|
if pwr1 is not None:
|
||||||
|
state["pwr1"] = int(bool(pwr1))
|
||||||
|
if pwr2 is not None:
|
||||||
|
state["pwr2"] = int(bool(pwr2))
|
||||||
|
if maxworktime is not None:
|
||||||
|
state["maxworktime"] = maxworktime
|
||||||
|
if maxworktime1 is not None:
|
||||||
|
state["maxworktime1"] = maxworktime1
|
||||||
|
if maxworktime2 is not None:
|
||||||
|
state["maxworktime2"] = maxworktime2
|
||||||
|
if idcbrightness is not None:
|
||||||
|
state["idcbrightness"] = idcbrightness
|
||||||
|
|
||||||
|
packet = self._encode(2, state)
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
e.check_error(response[0x22:0x24])
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def _encode(self, flag: int, state: dict) -> bytes:
|
||||||
|
"""Encode a message."""
|
||||||
|
packet = bytearray(14)
|
||||||
|
data = json.dumps(state).encode()
|
||||||
|
length = 12 + len(data)
|
||||||
|
struct.pack_into(
|
||||||
|
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
|
||||||
|
)
|
||||||
|
packet.extend(data)
|
||||||
|
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
|
||||||
|
packet[0x06:0x08] = checksum.to_bytes(2, "little")
|
||||||
|
return packet
|
||||||
|
|
||||||
|
def _decode(self, response: bytes) -> dict:
|
||||||
|
"""Decode a message."""
|
||||||
|
payload = self.decrypt(response[0x38:])
|
||||||
|
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
|
||||||
|
state = json.loads(payload[0x0E : 0x0E + js_len])
|
||||||
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
class mp1(Device):
|
||||||
|
"""Controls a Broadlink MP1."""
|
||||||
|
|
||||||
|
TYPE = "MP1"
|
||||||
|
|
||||||
|
def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
|
||||||
|
"""Set the power state of the device."""
|
||||||
|
packet = bytearray(16)
|
||||||
|
packet[0x00] = 0x0D
|
||||||
|
packet[0x02] = 0xA5
|
||||||
|
packet[0x03] = 0xA5
|
||||||
|
packet[0x04] = 0x5A
|
||||||
|
packet[0x05] = 0x5A
|
||||||
|
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
|
||||||
|
packet[0x07] = 0xC0
|
||||||
|
packet[0x08] = 0x02
|
||||||
|
packet[0x0A] = 0x03
|
||||||
|
packet[0x0D] = sid_mask
|
||||||
|
packet[0x0E] = sid_mask if pwr else 0
|
||||||
|
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
e.check_error(response[0x22:0x24])
|
||||||
|
|
||||||
|
def set_power(self, sid: int, pwr: bool) -> None:
|
||||||
|
"""Set the power state of the device."""
|
||||||
|
sid_mask = 0x01 << (sid - 1)
|
||||||
|
self.set_power_mask(sid_mask, pwr)
|
||||||
|
|
||||||
|
def check_power_raw(self) -> int:
|
||||||
|
"""Return the power state of the device in raw format."""
|
||||||
|
packet = bytearray(16)
|
||||||
|
packet[0x00] = 0x0A
|
||||||
|
packet[0x02] = 0xA5
|
||||||
|
packet[0x03] = 0xA5
|
||||||
|
packet[0x04] = 0x5A
|
||||||
|
packet[0x05] = 0x5A
|
||||||
|
packet[0x06] = 0xAE
|
||||||
|
packet[0x07] = 0xC0
|
||||||
|
packet[0x08] = 0x01
|
||||||
|
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
e.check_error(response[0x22:0x24])
|
||||||
|
payload = self.decrypt(response[0x38:])
|
||||||
|
return payload[0x0E]
|
||||||
|
|
||||||
|
def check_power(self) -> dict:
|
||||||
|
"""Return the power state of the device."""
|
||||||
|
data = self.check_power_raw()
|
||||||
|
return {
|
||||||
|
"s1": bool(data & 1),
|
||||||
|
"s2": bool(data & 2),
|
||||||
|
"s3": bool(data & 4),
|
||||||
|
"s4": bool(data & 8),
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user