Compare commits
8 Commits
fa44b54d88
...
730853e5fa
Author | SHA1 | Date |
---|---|---|
Felipe Martins Diel | 730853e5fa | |
Felipe Martins Diel | cee6a1da59 | |
Felipe Martins Diel | 0a9acab2b8 | |
Felipe Martins Diel | eb56e7a46f | |
Felipe Martins Diel | ff4628de1b | |
Felipe Martins Diel | c4979562c8 | |
Felipe Martins Diel | 1e11558613 | |
Felipe Martins Diel | 66744707f5 |
|
@ -1,12 +1,12 @@
|
|||
#!/usr/bin/env python3
|
||||
"""The python-broadlink library."""
|
||||
import socket
|
||||
import typing as t
|
||||
from typing import Generator, List, Optional, Tuple, Union
|
||||
|
||||
from . import exceptions as e
|
||||
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
|
||||
from .alarm import S1C
|
||||
from .climate import hysen
|
||||
from .climate import hvac, hysen
|
||||
from .cover import dooya, dooya2, wser
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
|
@ -33,6 +33,7 @@ SUPPORTED_TYPES = {
|
|||
0x7544: ("SP2-CL", "Broadlink"),
|
||||
0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"),
|
||||
0x7547: ("SC1", "Broadlink"),
|
||||
0x7549: ("SP mini 3", "Broadlink (OEM)"),
|
||||
0x7918: ("SP2", "Broadlink (OEM)"),
|
||||
0x7919: ("SP2-compatible", "Honeywell"),
|
||||
0x791A: ("SP2-compatible", "Honeywell"),
|
||||
|
@ -54,6 +55,7 @@ SUPPORTED_TYPES = {
|
|||
},
|
||||
sp4: {
|
||||
0x7568: ("SP4L-CN", "Broadlink"),
|
||||
0x756B: ("SP4M-JP", "Broadlink"),
|
||||
0x756C: ("SP4M", "Broadlink"),
|
||||
0x756F: ("MCB1", "Broadlink"),
|
||||
0x7579: ("SP4L-EU", "Broadlink"),
|
||||
|
@ -61,12 +63,15 @@ SUPPORTED_TYPES = {
|
|||
0x7583: ("SP mini 3", "Broadlink"),
|
||||
0x7587: ("SP4L-UK", "Broadlink"),
|
||||
0x7D11: ("SP mini 3", "Broadlink"),
|
||||
0xA4F9: ("WS4", "Broadlink (OEM)"),
|
||||
0xA569: ("SP4L-UK", "Broadlink"),
|
||||
0xA56A: ("MCB1", "Broadlink"),
|
||||
0xA56B: ("SCB1E", "Broadlink"),
|
||||
0xA56C: ("SP4L-EU", "Broadlink"),
|
||||
0xA576: ("SP4L-AU", "Broadlink"),
|
||||
0xA589: ("SP4L-UK", "Broadlink"),
|
||||
0xA5D3: ("SP4L-EU", "Broadlink"),
|
||||
0xA6F4: ("SP4D-US", "Broadlink"),
|
||||
},
|
||||
sp4b: {
|
||||
0x5115: ("SCB1E", "Broadlink"),
|
||||
|
@ -82,6 +87,7 @@ SUPPORTED_TYPES = {
|
|||
rmmini: {
|
||||
0x2737: ("RM mini 3", "Broadlink"),
|
||||
0x278F: ("RM mini", "Broadlink"),
|
||||
0x27B7: ("RM mini 3", "Broadlink"),
|
||||
0x27C2: ("RM mini 3", "Broadlink"),
|
||||
0x27C7: ("RM mini 3", "Broadlink"),
|
||||
0x27CC: ("RM mini 3", "Broadlink"),
|
||||
|
@ -158,6 +164,7 @@ SUPPORTED_TYPES = {
|
|||
lb1: {
|
||||
0x5043: ("SB800TD", "Broadlink (OEM)"),
|
||||
0x504E: ("LB1", "Broadlink"),
|
||||
0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"),
|
||||
0x606E: ("SB500TD", "Broadlink (OEM)"),
|
||||
0x60C7: ("LB1", "Broadlink"),
|
||||
0x60C8: ("LB1", "Broadlink"),
|
||||
|
@ -165,10 +172,12 @@ SUPPORTED_TYPES = {
|
|||
0x644B: ("LB1", "Broadlink"),
|
||||
0x644C: ("LB27 R1", "Broadlink"),
|
||||
0x644E: ("LB26 R1", "Broadlink"),
|
||||
0x6488: ("LB27 C1", "Broadlink"),
|
||||
},
|
||||
lb2: {
|
||||
0xA4F4: ("LB27 R1", "Broadlink"),
|
||||
0xA5F7: ("LB27 R1", "Broadlink"),
|
||||
0xA6EF: ("EFCF60WSMT", "Luceco"),
|
||||
},
|
||||
S1C: {
|
||||
0x2722: ("S2KIT", "Broadlink"),
|
||||
|
@ -177,6 +186,9 @@ SUPPORTED_TYPES = {
|
|||
0xA59C: ("S3", "Broadlink"),
|
||||
0xA64D: ("S3", "Broadlink"),
|
||||
},
|
||||
hvac: {
|
||||
0x4E2A: ("HVAC", "Licensed manufacturer"),
|
||||
},
|
||||
hysen: {
|
||||
0x4EAD: ("HY02/HY03", "Hysen"),
|
||||
},
|
||||
|
@ -200,8 +212,8 @@ SUPPORTED_TYPES = {
|
|||
|
||||
def gendevice(
|
||||
dev_type: int,
|
||||
host: t.Tuple[str, int],
|
||||
mac: t.Union[bytes, str],
|
||||
host: Tuple[str, int],
|
||||
mac: Union[bytes, str],
|
||||
name: str = "",
|
||||
is_locked: bool = False,
|
||||
) -> Device:
|
||||
|
@ -253,26 +265,30 @@ def hello(
|
|||
|
||||
def discover(
|
||||
timeout: int = DEFAULT_TIMEOUT,
|
||||
local_ip_address: str = None,
|
||||
local_ip_address: Optional[str] = None,
|
||||
discover_ip_address: str = DEFAULT_BCAST_ADDR,
|
||||
discover_ip_port: int = DEFAULT_PORT,
|
||||
) -> t.List[Device]:
|
||||
) -> List[Device]:
|
||||
"""Discover devices connected to the local network."""
|
||||
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
|
||||
responses = scan(
|
||||
timeout, local_ip_address, discover_ip_address, discover_ip_port
|
||||
)
|
||||
return [gendevice(*resp) for resp in responses]
|
||||
|
||||
|
||||
def xdiscover(
|
||||
timeout: int = DEFAULT_TIMEOUT,
|
||||
local_ip_address: str = None,
|
||||
local_ip_address: Optional[str] = None,
|
||||
discover_ip_address: str = DEFAULT_BCAST_ADDR,
|
||||
discover_ip_port: int = DEFAULT_PORT,
|
||||
) -> t.Generator[Device, None, None]:
|
||||
) -> Generator[Device, None, None]:
|
||||
"""Discover devices connected to the local network.
|
||||
|
||||
This function returns a generator that yields devices instantly.
|
||||
"""
|
||||
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
|
||||
responses = scan(
|
||||
timeout, local_ip_address, discover_ip_address, discover_ip_port
|
||||
)
|
||||
for resp in responses:
|
||||
yield gendevice(*resp)
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
"""Support for HVAC units."""
|
||||
import typing as t
|
||||
"""Support for climate control."""
|
||||
import enum
|
||||
import struct
|
||||
from typing import List, Sequence
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
@ -19,7 +21,7 @@ class hysen(Device):
|
|||
|
||||
TYPE = "HYS"
|
||||
|
||||
def send_request(self, request: t.Sequence[int]) -> bytes:
|
||||
def send_request(self, request: Sequence[int]) -> bytes:
|
||||
"""Send a request to the device."""
|
||||
packet = bytearray()
|
||||
packet.extend((len(request) + 2).to_bytes(2, "little"))
|
||||
|
@ -31,15 +33,15 @@ class hysen(Device):
|
|||
payload = self.decrypt(response[0x38:])
|
||||
|
||||
p_len = int.from_bytes(payload[:0x02], "little")
|
||||
if p_len + 2 > len(payload):
|
||||
raise ValueError(
|
||||
"hysen_response_error", "first byte of response is not length"
|
||||
)
|
||||
|
||||
nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
|
||||
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
|
||||
real_crc = CRC16.calculate(payload[0x02:p_len])
|
||||
|
||||
if nom_crc != real_crc:
|
||||
raise ValueError("hysen_response_error", "CRC check on response failed")
|
||||
raise e.DataValidationError(
|
||||
-4008,
|
||||
"Received data packet check error",
|
||||
f"Expected a checksum of {nom_crc} and received {real_crc}",
|
||||
)
|
||||
|
||||
return payload[0x02:p_len]
|
||||
|
||||
|
@ -74,7 +76,7 @@ class hysen(Device):
|
|||
data["heating_cooling"] = (payload[4] >> 7) & 1
|
||||
data["room_temp"] = self._decode_temp(payload, 5)
|
||||
data["thermostat_temp"] = payload[6] / 2.0
|
||||
data["auto_mode"] = payload[7] & 0xF
|
||||
data["auto_mode"] = payload[7] & 0x0F
|
||||
data["loop_mode"] = payload[7] >> 4
|
||||
data["sensor"] = payload[8]
|
||||
data["osv"] = payload[9]
|
||||
|
@ -125,7 +127,9 @@ class hysen(Device):
|
|||
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
|
||||
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
|
||||
# The sensor command is currently experimental
|
||||
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
|
||||
def set_mode(
|
||||
self, auto_mode: int, loop_mode: int, sensor: int = 0
|
||||
) -> None:
|
||||
"""Set the mode of the device."""
|
||||
mode_byte = ((loop_mode + 1) << 4) + auto_mode
|
||||
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
|
||||
|
@ -206,7 +210,19 @@ class hysen(Device):
|
|||
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
|
||||
"""Set the time."""
|
||||
self.send_request(
|
||||
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
|
||||
[
|
||||
0x01,
|
||||
0x10,
|
||||
0x00,
|
||||
0x08,
|
||||
0x00,
|
||||
0x02,
|
||||
0x04,
|
||||
hour,
|
||||
minute,
|
||||
second,
|
||||
day
|
||||
]
|
||||
)
|
||||
|
||||
# Set timer schedule
|
||||
|
@ -215,7 +231,7 @@ class hysen(Device):
|
|||
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
|
||||
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
|
||||
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
|
||||
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
|
||||
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
|
||||
"""Set timer schedule."""
|
||||
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
|
||||
|
||||
|
@ -238,3 +254,221 @@ class hysen(Device):
|
|||
request.append(int(weekend[i]["temp"] * 2))
|
||||
|
||||
self.send_request(request)
|
||||
|
||||
|
||||
class hvac(Device):
|
||||
"""Controls a HVAC.
|
||||
|
||||
Supported models:
|
||||
- Tornado SMART X SQ series
|
||||
- Aux ASW-H12U3/JIR1DI-US
|
||||
- Aux ASW-H36U2/LFR1DI-US
|
||||
"""
|
||||
|
||||
TYPE = "HVAC"
|
||||
|
||||
@enum.unique
|
||||
class Mode(enum.IntEnum):
|
||||
"""Enumerates modes."""
|
||||
|
||||
AUTO = 0
|
||||
COOL = 1
|
||||
DRY = 2
|
||||
HEAT = 3
|
||||
FAN = 4
|
||||
|
||||
@enum.unique
|
||||
class Speed(enum.IntEnum):
|
||||
"""Enumerates fan speed."""
|
||||
|
||||
HIGH = 1
|
||||
MID = 2
|
||||
LOW = 3
|
||||
AUTO = 5
|
||||
|
||||
@enum.unique
|
||||
class Preset(enum.IntEnum):
|
||||
"""Enumerates presets."""
|
||||
|
||||
NORMAL = 0
|
||||
TURBO = 1
|
||||
MUTE = 2
|
||||
|
||||
@enum.unique
|
||||
class SwHoriz(enum.IntEnum):
|
||||
"""Enumerates horizontal swing."""
|
||||
|
||||
ON = 0
|
||||
OFF = 7
|
||||
|
||||
@enum.unique
|
||||
class SwVert(enum.IntEnum):
|
||||
"""Enumerates vertical swing."""
|
||||
|
||||
ON = 0
|
||||
POS1 = 1
|
||||
POS2 = 2
|
||||
POS3 = 3
|
||||
POS4 = 4
|
||||
POS5 = 5
|
||||
OFF = 7
|
||||
|
||||
def _encode(self, data: bytes) -> bytes:
|
||||
"""Encode data for transport."""
|
||||
packet = bytearray(10)
|
||||
p_len = 10 + len(data)
|
||||
struct.pack_into(
|
||||
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
|
||||
)
|
||||
packet += data
|
||||
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
|
||||
packet += crc.to_bytes(2, "little")
|
||||
return packet
|
||||
|
||||
def _decode(self, response: bytes) -> bytes:
|
||||
"""Decode data from transport."""
|
||||
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
p_len = int.from_bytes(payload[:0x02], "little")
|
||||
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
|
||||
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)
|
||||
|
||||
if nom_crc != real_crc:
|
||||
raise e.DataValidationError(
|
||||
-4008,
|
||||
"Received data packet check error",
|
||||
f"Expected a checksum of {nom_crc} and received {real_crc}",
|
||||
)
|
||||
|
||||
d_len = int.from_bytes(payload[0x08:0x0A], "little")
|
||||
return payload[0x0A:0x0A+d_len]
|
||||
|
||||
def _send(self, command: int, data: bytes = b"") -> bytes:
|
||||
"""Send a command to the unit."""
|
||||
prefix = bytes([((command << 4) | 1), 1])
|
||||
packet = self._encode(prefix + data)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
return self._decode(response)[0x02:]
|
||||
|
||||
def _parse_state(self, data: bytes) -> dict:
|
||||
"""Parse state."""
|
||||
state = {}
|
||||
state["power"] = bool(data[0x08] & 1 << 5)
|
||||
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
|
||||
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
|
||||
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
|
||||
state["mode"] = self.Mode(data[0x05] >> 5)
|
||||
state["speed"] = self.Speed(data[0x03] >> 5)
|
||||
state["preset"] = self.Preset(data[0x04] >> 6)
|
||||
state["sleep"] = bool(data[0x05] & 1 << 2)
|
||||
state["ifeel"] = bool(data[0x05] & 1 << 3)
|
||||
state["health"] = bool(data[0x08] & 1 << 1)
|
||||
state["clean"] = bool(data[0x08] & 1 << 2)
|
||||
state["display"] = bool(data[0x0A] & 1 << 4)
|
||||
state["mildew"] = bool(data[0x0A] & 1 << 3)
|
||||
return state
|
||||
|
||||
def set_state(
|
||||
self,
|
||||
power: bool,
|
||||
target_temp: float, # 16<=target_temp<=32
|
||||
mode: Mode,
|
||||
speed: Speed,
|
||||
preset: Preset,
|
||||
swing_h: SwHoriz,
|
||||
swing_v: SwVert,
|
||||
sleep: bool,
|
||||
ifeel: bool,
|
||||
display: bool,
|
||||
health: bool,
|
||||
clean: bool,
|
||||
mildew: bool,
|
||||
) -> dict:
|
||||
"""Set the state of the device."""
|
||||
# TODO: decode unknown bits
|
||||
UNK0 = 0b100
|
||||
UNK1 = 0b1101
|
||||
UNK2 = 0b101
|
||||
|
||||
target_temp = round(target_temp * 2) / 2
|
||||
|
||||
if preset == self.Preset.MUTE:
|
||||
if mode != self.Mode.FAN:
|
||||
raise ValueError("mute is only available in fan mode")
|
||||
speed = self.Speed.LOW
|
||||
|
||||
elif preset == self.Preset.TURBO:
|
||||
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
|
||||
raise ValueError("turbo is only available in cooling/heating")
|
||||
speed = self.Speed.HIGH
|
||||
|
||||
data = bytearray(0x0D)
|
||||
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
|
||||
data[0x01] = (swing_h << 5) | UNK0
|
||||
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
|
||||
data[0x03] = speed << 5
|
||||
data[0x04] = preset << 6
|
||||
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
|
||||
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
|
||||
data[0x0A] = display << 4 | mildew << 3
|
||||
data[0x0C] = UNK2
|
||||
|
||||
resp = self._send(0, data)
|
||||
return self._parse_state(resp)
|
||||
|
||||
def get_state(self) -> dict:
|
||||
"""Returns a dictionary with the unit's parameters.
|
||||
|
||||
Returns:
|
||||
dict:
|
||||
power (bool):
|
||||
target_temp (float): temperature set point 16<n<32
|
||||
mode (hvac.Mode):
|
||||
speed (hvac.Speed):
|
||||
preset (hvac.Preset):
|
||||
swing_h (hvac.SwHoriz):
|
||||
swing_v (hvac.SwVert):
|
||||
sleep (bool):
|
||||
ifeel (bool):
|
||||
display (bool):
|
||||
health (bool):
|
||||
clean (bool):
|
||||
mildew (bool):
|
||||
"""
|
||||
resp = self._send(1)
|
||||
|
||||
if len(resp) < 13:
|
||||
raise e.DataValidationError(
|
||||
-4007,
|
||||
"Received data packet length error",
|
||||
f"Expected at least 15 bytes and received {len(resp) + 2}",
|
||||
)
|
||||
|
||||
return self._parse_state(resp)
|
||||
|
||||
def get_ac_info(self) -> dict:
|
||||
"""Returns dictionary with AC info.
|
||||
|
||||
Returns:
|
||||
dict:
|
||||
power (bool): power
|
||||
ambient_temp (float): ambient temperature
|
||||
"""
|
||||
resp = self._send(2)
|
||||
|
||||
if len(resp) < 22:
|
||||
raise e.DataValidationError(
|
||||
-4007,
|
||||
"Received data packet length error",
|
||||
f"Expected at least 24 bytes and received {len(resp) + 2}",
|
||||
)
|
||||
|
||||
ac_info = {}
|
||||
ac_info["power"] = resp[0x1] & 1
|
||||
|
||||
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
|
||||
if any(ambient_temp):
|
||||
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0
|
||||
|
||||
return ac_info
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Support for covers."""
|
||||
import time
|
||||
from typing import Sequence
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
@ -63,7 +64,7 @@ class dooya2(Device):
|
|||
|
||||
TYPE = "DT360E-2"
|
||||
|
||||
def _send(self, operation: int, data: bytes = b""):
|
||||
def _send(self, operation: int, data: Sequence = b""):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(12)
|
||||
packet[0x02] = 0xA5
|
||||
|
@ -120,7 +121,7 @@ class wser(Device):
|
|||
|
||||
TYPE = "WSER"
|
||||
|
||||
def _send(self, operation: int, data: bytes = b""):
|
||||
def _send(self, operation: int, data: Sequence = b""):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(12)
|
||||
packet[0x02] = 0xA5
|
||||
|
|
|
@ -3,7 +3,7 @@ import socket
|
|||
import threading
|
||||
import random
|
||||
import time
|
||||
import typing as t
|
||||
from typing import Generator, Optional, Tuple, Union
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
|
@ -17,15 +17,15 @@ from .const import (
|
|||
)
|
||||
from .protocol import Datetime
|
||||
|
||||
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
|
||||
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
|
||||
|
||||
|
||||
def scan(
|
||||
timeout: int = DEFAULT_TIMEOUT,
|
||||
local_ip_address: str = None,
|
||||
local_ip_address: Optional[str] = None,
|
||||
discover_ip_address: str = DEFAULT_BCAST_ADDR,
|
||||
discover_ip_port: int = DEFAULT_PORT,
|
||||
) -> t.Generator[HelloResponse, None, None]:
|
||||
) -> Generator[HelloResponse, None, None]:
|
||||
"""Broadcast a hello message and yield responses."""
|
||||
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
@ -100,8 +100,8 @@ class Device:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
host: t.Tuple[str, int],
|
||||
mac: t.Union[bytes, str],
|
||||
host: Tuple[str, int],
|
||||
mac: Union[bytes, str],
|
||||
devtype: int,
|
||||
timeout: int = DEFAULT_TIMEOUT,
|
||||
name: str = "",
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
"""Helper functions and classes."""
|
||||
import typing as t
|
||||
from typing import Dict, List, Sequence
|
||||
|
||||
|
||||
class CRC16:
|
||||
|
@ -8,10 +8,10 @@ class CRC16:
|
|||
CRC tables are cached for performance.
|
||||
"""
|
||||
|
||||
_cache: t.Dict[int, t.List[int]] = {}
|
||||
_cache: Dict[int, List[int]] = {}
|
||||
|
||||
@classmethod
|
||||
def get_table(cls, polynomial: int) -> t.List[int]:
|
||||
def get_table(cls, polynomial: int) -> List[int]:
|
||||
"""Return the CRC-16 table for a polynomial."""
|
||||
try:
|
||||
crc_table = cls._cache[polynomial]
|
||||
|
@ -31,7 +31,7 @@ class CRC16:
|
|||
@classmethod
|
||||
def calculate(
|
||||
cls,
|
||||
sequence: t.Sequence[int],
|
||||
sequence: Sequence[int],
|
||||
polynomial: int = 0xA001, # CRC-16-ANSI.
|
||||
init_value: int = 0xFFFF,
|
||||
) -> int:
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""Support for hubs."""
|
||||
import struct
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
@ -42,7 +43,7 @@ class s3(Device):
|
|||
|
||||
return sub_devices
|
||||
|
||||
def get_state(self, did: str = None) -> dict:
|
||||
def get_state(self, did: Optional[str] = None) -> dict:
|
||||
"""Return the power state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
|
@ -55,10 +56,10 @@ class s3(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
did: str = None,
|
||||
pwr1: bool = None,
|
||||
pwr2: bool = None,
|
||||
pwr3: bool = None,
|
||||
did: Optional[str] = None,
|
||||
pwr1: Optional[bool] = None,
|
||||
pwr2: Optional[bool] = None,
|
||||
pwr3: Optional[bool] = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
|
@ -81,7 +82,9 @@ class s3(Device):
|
|||
# flag: 1 for reading, 2 for writing.
|
||||
packet = bytearray(12)
|
||||
data = json.dumps(state, separators=(",", ":")).encode()
|
||||
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
|
||||
struct.pack_into(
|
||||
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
|
||||
)
|
||||
packet.extend(data)
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x04:0x06] = checksum.to_bytes(2, "little")
|
||||
|
@ -91,5 +94,5 @@ class s3(Device):
|
|||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
state = json.loads(payload[0x0C:0x0C+js_len])
|
||||
return state
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import enum
|
||||
import json
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
@ -32,20 +33,20 @@ class lb1(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
red: int = None,
|
||||
blue: int = None,
|
||||
green: int = None,
|
||||
brightness: int = None,
|
||||
colortemp: int = None,
|
||||
hue: int = None,
|
||||
saturation: int = None,
|
||||
transitionduration: int = None,
|
||||
maxworktime: int = None,
|
||||
bulb_colormode: int = None,
|
||||
bulb_scenes: str = None,
|
||||
bulb_scene: str = None,
|
||||
bulb_sceneidx: int = None,
|
||||
pwr: Optional[bool] = None,
|
||||
red: Optional[int] = None,
|
||||
blue: Optional[int] = None,
|
||||
green: Optional[int] = None,
|
||||
brightness: Optional[int] = None,
|
||||
colortemp: Optional[int] = None,
|
||||
hue: Optional[int] = None,
|
||||
saturation: Optional[int] = None,
|
||||
transitionduration: Optional[int] = None,
|
||||
maxworktime: Optional[int] = None,
|
||||
bulb_colormode: Optional[int] = None,
|
||||
bulb_scenes: Optional[str] = None,
|
||||
bulb_scene: Optional[str] = None,
|
||||
bulb_sceneidx: Optional[int] = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
|
@ -101,7 +102,7 @@ class lb1(Device):
|
|||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||
state = json.loads(payload[0xE : 0xE + js_len])
|
||||
state = json.loads(payload[0xE:0xE+js_len])
|
||||
return state
|
||||
|
||||
|
||||
|
@ -130,19 +131,19 @@ class lb2(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
red: int = None,
|
||||
blue: int = None,
|
||||
green: int = None,
|
||||
brightness: int = None,
|
||||
colortemp: int = None,
|
||||
hue: int = None,
|
||||
saturation: int = None,
|
||||
transitionduration: int = None,
|
||||
maxworktime: int = None,
|
||||
bulb_colormode: int = None,
|
||||
bulb_scenes: str = None,
|
||||
bulb_scene: str = None,
|
||||
pwr: Optional[bool] = None,
|
||||
red: Optional[int] = None,
|
||||
blue: Optional[int] = None,
|
||||
green: Optional[int] = None,
|
||||
brightness: Optional[int] = None,
|
||||
colortemp: Optional[int] = None,
|
||||
hue: Optional[int] = None,
|
||||
saturation: Optional[int] = None,
|
||||
transitionduration: Optional[int] = None,
|
||||
maxworktime: Optional[int] = None,
|
||||
bulb_colormode: Optional[int] = None,
|
||||
bulb_scenes: Optional[str] = None,
|
||||
bulb_scene: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
|
@ -183,7 +184,9 @@ class lb2(Device):
|
|||
# flag: 1 for reading, 2 for writing.
|
||||
packet = bytearray(12)
|
||||
data = json.dumps(state, separators=(",", ":")).encode()
|
||||
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
|
||||
struct.pack_into(
|
||||
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
|
||||
)
|
||||
packet.extend(data)
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x04:0x06] = checksum.to_bytes(2, "little")
|
||||
|
@ -193,5 +196,5 @@ class lb2(Device):
|
|||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
state = json.loads(payload[0x0C:0x0C+js_len])
|
||||
return state
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
"""Support for universal remotes."""
|
||||
import struct
|
||||
import typing as t
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
||||
|
||||
def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
|
||||
def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes:
|
||||
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
|
||||
result = bytearray(4)
|
||||
result[0x00] = 0x26
|
||||
|
@ -25,7 +25,7 @@ def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
|
|||
return result
|
||||
|
||||
|
||||
def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
|
||||
def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]:
|
||||
"""Parse a Broadlink packet into a microsecond duration sequence."""
|
||||
result = []
|
||||
index = 4
|
||||
|
@ -38,8 +38,8 @@ def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
|
|||
if chunk == 0:
|
||||
try:
|
||||
chunk = 256 * data[index] + data[index + 1]
|
||||
except IndexError:
|
||||
raise ValueError("Malformed data.")
|
||||
except IndexError as err:
|
||||
raise ValueError("Malformed data.") from err
|
||||
index += 2
|
||||
|
||||
result.append(int(chunk * tick))
|
||||
|
@ -88,14 +88,14 @@ class rmpro(rmmini):
|
|||
"""Sweep frequency."""
|
||||
self._send(0x19)
|
||||
|
||||
def check_frequency(self) -> t.Tuple[bool, float]:
|
||||
def check_frequency(self) -> Tuple[bool, float]:
|
||||
"""Return True if the frequency was identified successfully."""
|
||||
resp = self._send(0x1A)
|
||||
is_found = bool(resp[0])
|
||||
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
|
||||
return is_found, frequency
|
||||
|
||||
def find_rf_packet(self, frequency: float = None) -> None:
|
||||
def find_rf_packet(self, frequency: Optional[float] = None) -> None:
|
||||
"""Enter radiofrequency learning mode."""
|
||||
payload = bytearray()
|
||||
if frequency:
|
||||
|
@ -129,7 +129,7 @@ class rmminib(rmmini):
|
|||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
p_len = struct.unpack("<H", payload[:0x2])[0]
|
||||
return payload[0x6 : p_len + 2]
|
||||
return payload[0x6:p_len+2]
|
||||
|
||||
|
||||
class rm4mini(rmminib):
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
"""Support for sensors."""
|
||||
from typing import Sequence
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
||||
|
@ -45,7 +47,7 @@ class a2(Device):
|
|||
|
||||
TYPE = "A2"
|
||||
|
||||
def _send(self, operation: int, data: bytes = b""):
|
||||
def _send(self, operation: int, data: Sequence = b""):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(12)
|
||||
packet[0x02] = 0xA5
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""Support for switches."""
|
||||
import json
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
@ -127,12 +128,12 @@ class sp4(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
ntlight: bool = None,
|
||||
indicator: bool = None,
|
||||
ntlbrightness: int = None,
|
||||
maxworktime: int = None,
|
||||
childlock: bool = None,
|
||||
pwr: Optional[bool] = None,
|
||||
ntlight: Optional[bool] = None,
|
||||
indicator: Optional[bool] = None,
|
||||
ntlbrightness: Optional[int] = None,
|
||||
maxworktime: Optional[int] = None,
|
||||
childlock: Optional[bool] = None,
|
||||
) -> dict:
|
||||
"""Set state of device."""
|
||||
state = {}
|
||||
|
@ -186,7 +187,7 @@ class sp4(Device):
|
|||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
state = json.loads(payload[0x0C:0x0C+js_len])
|
||||
return state
|
||||
|
||||
|
||||
|
@ -234,7 +235,7 @@ class sp4b(sp4):
|
|||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||
state = json.loads(payload[0x0E : 0x0E + js_len])
|
||||
state = json.loads(payload[0x0E:0x0E+js_len])
|
||||
return state
|
||||
|
||||
|
||||
|
@ -255,13 +256,13 @@ class bg1(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
pwr1: bool = None,
|
||||
pwr2: bool = None,
|
||||
maxworktime: int = None,
|
||||
maxworktime1: int = None,
|
||||
maxworktime2: int = None,
|
||||
idcbrightness: int = None,
|
||||
pwr: Optional[bool] = None,
|
||||
pwr1: Optional[bool] = None,
|
||||
pwr2: Optional[bool] = None,
|
||||
maxworktime: Optional[int] = None,
|
||||
maxworktime1: Optional[int] = None,
|
||||
maxworktime2: Optional[int] = None,
|
||||
idcbrightness: Optional[int] = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
|
@ -291,7 +292,16 @@ class bg1(Device):
|
|||
data = json.dumps(state).encode()
|
||||
length = 12 + len(data)
|
||||
struct.pack_into(
|
||||
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
|
||||
"<HHHHBBI",
|
||||
packet,
|
||||
0,
|
||||
length,
|
||||
0xA5A5,
|
||||
0x5A5A,
|
||||
0x0000,
|
||||
flag,
|
||||
0x0B,
|
||||
len(data),
|
||||
)
|
||||
packet.extend(data)
|
||||
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
|
||||
|
@ -302,7 +312,7 @@ class bg1(Device):
|
|||
"""Decode a message."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
|
||||
state = json.loads(payload[0x0E : 0x0E + js_len])
|
||||
state = json.loads(payload[0x0E:0x0E+js_len])
|
||||
return state
|
||||
|
||||
|
||||
|
@ -313,19 +323,19 @@ class ehc31(bg1):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
pwr1: bool = None,
|
||||
pwr2: bool = None,
|
||||
pwr3: bool = None,
|
||||
maxworktime1: int = None,
|
||||
maxworktime2: int = None,
|
||||
maxworktime3: int = None,
|
||||
idcbrightness: int = None,
|
||||
childlock: bool = None,
|
||||
childlock1: bool = None,
|
||||
childlock2: bool = None,
|
||||
childlock3: bool = None,
|
||||
childlock4: bool = None,
|
||||
pwr: Optional[bool] = None,
|
||||
pwr1: Optional[bool] = None,
|
||||
pwr2: Optional[bool] = None,
|
||||
pwr3: Optional[bool] = None,
|
||||
maxworktime1: Optional[int] = None,
|
||||
maxworktime2: Optional[int] = None,
|
||||
maxworktime3: Optional[int] = None,
|
||||
idcbrightness: Optional[int] = None,
|
||||
childlock: Optional[bool] = None,
|
||||
childlock1: Optional[bool] = None,
|
||||
childlock2: Optional[bool] = None,
|
||||
childlock3: Optional[bool] = None,
|
||||
childlock4: Optional[bool] = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
|
@ -449,7 +459,7 @@ class mp1s(mp1):
|
|||
|
||||
def get_value(start, end, factors):
|
||||
value = sum(
|
||||
int(payload_str[i - 2 : i]) * factor
|
||||
int(payload_str[i-2:i]) * factor
|
||||
for i, factor in zip(range(start, end, -2), factors)
|
||||
)
|
||||
return value
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
import argparse
|
||||
import base64
|
||||
import time
|
||||
import typing as t
|
||||
from typing import List
|
||||
|
||||
import broadlink
|
||||
from broadlink.const import DEFAULT_PORT
|
||||
|
@ -16,7 +16,7 @@ def auto_int(x):
|
|||
return int(x, 0)
|
||||
|
||||
|
||||
def format_pulses(pulses: t.List[int]) -> str:
|
||||
def format_pulses(pulses: List[int]) -> str:
|
||||
"""Format pulses."""
|
||||
return " ".join(
|
||||
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
|
||||
|
@ -24,7 +24,7 @@ def format_pulses(pulses: t.List[int]) -> str:
|
|||
)
|
||||
|
||||
|
||||
def parse_pulses(data: t.List[str]) -> t.List[int]:
|
||||
def parse_pulses(data: List[str]) -> List[int]:
|
||||
"""Parse pulses."""
|
||||
return [abs(int(s)) for s in data]
|
||||
|
||||
|
|
Loading…
Reference in New Issue