1
0
Fork 0

Compare commits

...

8 Commits

Author SHA1 Message Date
Felipe Martins Diel 730853e5fa
Bump version to 0.19.0 (#798) 2024-04-17 03:26:53 -03:00
Felipe Martins Diel cee6a1da59
Merge dev into master (#796) 2024-04-17 03:23:20 -03:00
Felipe Martins Diel 0a9acab2b8
Make type hints compatible with Python 3.6 (#797) 2024-04-17 03:20:13 -03:00
Felipe Martins Diel eb56e7a46f
Merge branch 'master' into dev 2024-04-17 02:58:03 -03:00
Felipe Martins Diel ff4628de1b
Merge new product ids into master (#795)
* Add support for Broadlink SP4M-JP (0x756B) (#782)

* Add support for Luceco/BG Electrical A60 bulb (0x606D) (#766)

* Add support for Luceco EFCF60WSMT (0xA6EF) (#787)

* Add support for Broadlink WS4 (#792)

* Add support for Broadlink SP4D-US (0xA6F4) (#793)
2024-04-17 02:55:38 -03:00
Felipe Martins Diel c4979562c8
Fix type hints (#794) 2024-04-17 02:49:13 -03:00
Felipe Martins Diel 1e11558613
Add support for Tornado 16X SQ air conditioner (0x4E2A) (#520)
* Add support for Tornado 16X SQ air conditioner

* Make Tornado a generic HVAC class

* Better names

* Clean up IntEnums

* Clean up encoders

* Fix indexes

* Improve set_state() interface

* Enumerate presets

* Rename state to power in get_ac_info()

* Paint it black

* Use CRC16 helper class

* Remove log messages

* Fix bugs

* Return state in set_state()
2024-04-17 02:06:36 -03:00
Felipe Martins Diel 66744707f5
Merge new product ids into master (#781)
* Add support for Broadlink SP4L-AU (0xA576) (#731)

* Add support for Broadlink RM mini 3 (0x27B7) (#751)

* Add support for Broadlink LB27 C1 (0x6488) (#752)

* Add support for Broadlink SP mini 3 (0x7549) (#753)
2024-04-09 16:26:43 -03:00
12 changed files with 386 additions and 117 deletions

View File

@ -1,12 +1,12 @@
#!/usr/bin/env python3
"""The python-broadlink library."""
import socket
import typing as t
from typing import Generator, List, Optional, Tuple, Union
from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .climate import hvac, hysen
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
@ -33,6 +33,7 @@ SUPPORTED_TYPES = {
0x7544: ("SP2-CL", "Broadlink"),
0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"),
0x7547: ("SC1", "Broadlink"),
0x7549: ("SP mini 3", "Broadlink (OEM)"),
0x7918: ("SP2", "Broadlink (OEM)"),
0x7919: ("SP2-compatible", "Honeywell"),
0x791A: ("SP2-compatible", "Honeywell"),
@ -54,6 +55,7 @@ SUPPORTED_TYPES = {
},
sp4: {
0x7568: ("SP4L-CN", "Broadlink"),
0x756B: ("SP4M-JP", "Broadlink"),
0x756C: ("SP4M", "Broadlink"),
0x756F: ("MCB1", "Broadlink"),
0x7579: ("SP4L-EU", "Broadlink"),
@ -61,12 +63,15 @@ SUPPORTED_TYPES = {
0x7583: ("SP mini 3", "Broadlink"),
0x7587: ("SP4L-UK", "Broadlink"),
0x7D11: ("SP mini 3", "Broadlink"),
0xA4F9: ("WS4", "Broadlink (OEM)"),
0xA569: ("SP4L-UK", "Broadlink"),
0xA56A: ("MCB1", "Broadlink"),
0xA56B: ("SCB1E", "Broadlink"),
0xA56C: ("SP4L-EU", "Broadlink"),
0xA576: ("SP4L-AU", "Broadlink"),
0xA589: ("SP4L-UK", "Broadlink"),
0xA5D3: ("SP4L-EU", "Broadlink"),
0xA6F4: ("SP4D-US", "Broadlink"),
},
sp4b: {
0x5115: ("SCB1E", "Broadlink"),
@ -82,6 +87,7 @@ SUPPORTED_TYPES = {
rmmini: {
0x2737: ("RM mini 3", "Broadlink"),
0x278F: ("RM mini", "Broadlink"),
0x27B7: ("RM mini 3", "Broadlink"),
0x27C2: ("RM mini 3", "Broadlink"),
0x27C7: ("RM mini 3", "Broadlink"),
0x27CC: ("RM mini 3", "Broadlink"),
@ -158,6 +164,7 @@ SUPPORTED_TYPES = {
lb1: {
0x5043: ("SB800TD", "Broadlink (OEM)"),
0x504E: ("LB1", "Broadlink"),
0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"),
0x606E: ("SB500TD", "Broadlink (OEM)"),
0x60C7: ("LB1", "Broadlink"),
0x60C8: ("LB1", "Broadlink"),
@ -165,10 +172,12 @@ SUPPORTED_TYPES = {
0x644B: ("LB1", "Broadlink"),
0x644C: ("LB27 R1", "Broadlink"),
0x644E: ("LB26 R1", "Broadlink"),
0x6488: ("LB27 C1", "Broadlink"),
},
lb2: {
0xA4F4: ("LB27 R1", "Broadlink"),
0xA5F7: ("LB27 R1", "Broadlink"),
0xA6EF: ("EFCF60WSMT", "Luceco"),
},
S1C: {
0x2722: ("S2KIT", "Broadlink"),
@ -177,6 +186,9 @@ SUPPORTED_TYPES = {
0xA59C: ("S3", "Broadlink"),
0xA64D: ("S3", "Broadlink"),
},
hvac: {
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: {
0x4EAD: ("HY02/HY03", "Hysen"),
},
@ -200,8 +212,8 @@ SUPPORTED_TYPES = {
def gendevice(
dev_type: int,
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = "",
is_locked: bool = False,
) -> Device:
@ -253,26 +265,30 @@ def hello(
def discover(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.List[Device]:
) -> List[Device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses]
def xdiscover(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[Device, None, None]:
) -> Generator[Device, None, None]:
"""Discover devices connected to the local network.
This function returns a generator that yields devices instantly.
"""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses:
yield gendevice(*resp)

View File

@ -1,5 +1,7 @@
"""Support for HVAC units."""
import typing as t
"""Support for climate control."""
import enum
import struct
from typing import List, Sequence
from . import exceptions as e
from .device import Device
@ -19,7 +21,7 @@ class hysen(Device):
TYPE = "HYS"
def send_request(self, request: t.Sequence[int]) -> bytes:
def send_request(self, request: Sequence[int]) -> bytes:
"""Send a request to the device."""
packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little"))
@ -31,15 +33,15 @@ class hysen(Device):
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
if p_len + 2 > len(payload):
raise ValueError(
"hysen_response_error", "first byte of response is not length"
)
nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
return payload[0x02:p_len]
@ -74,7 +76,7 @@ class hysen(Device):
data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF
data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
@ -125,7 +127,9 @@ class hysen(Device):
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
@ -206,7 +210,19 @@ class hysen(Device):
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
[
0x01,
0x10,
0x00,
0x08,
0x00,
0x02,
0x04,
hour,
minute,
second,
day
]
)
# Set timer schedule
@ -215,7 +231,7 @@ class hysen(Device):
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule."""
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
@ -238,3 +254,221 @@ class hysen(Device):
request.append(int(weekend[i]["temp"] * 2))
self.send_request(request)
class hvac(Device):
"""Controls a HVAC.
Supported models:
- Tornado SMART X SQ series
- Aux ASW-H12U3/JIR1DI-US
- Aux ASW-H36U2/LFR1DI-US
"""
TYPE = "HVAC"
@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""
AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4
@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""
HIGH = 1
MID = 2
LOW = 3
AUTO = 5
@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""
NORMAL = 0
TURBO = 1
MUTE = 2
@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""
ON = 0
OFF = 7
@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""
ON = 0
POS1 = 1
POS2 = 2
POS3 = 3
POS4 = 4
POS5 = 5
OFF = 7
def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 10 + len(data)
struct.pack_into(
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
)
packet += data
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
packet += crc.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
prefix = bytes([((command << 4) | 1), 1])
packet = self._encode(prefix + data)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x02:]
def _parse_state(self, data: bytes) -> dict:
"""Parse state."""
state = {}
state["power"] = bool(data[0x08] & 1 << 5)
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
state["mode"] = self.Mode(data[0x05] >> 5)
state["speed"] = self.Speed(data[0x03] >> 5)
state["preset"] = self.Preset(data[0x04] >> 6)
state["sleep"] = bool(data[0x05] & 1 << 2)
state["ifeel"] = bool(data[0x05] & 1 << 3)
state["health"] = bool(data[0x08] & 1 << 1)
state["clean"] = bool(data[0x08] & 1 << 2)
state["display"] = bool(data[0x0A] & 1 << 4)
state["mildew"] = bool(data[0x0A] & 1 << 3)
return state
def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: Mode,
speed: Speed,
preset: Preset,
swing_h: SwHoriz,
swing_v: SwVert,
sleep: bool,
ifeel: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> dict:
"""Set the state of the device."""
# TODO: decode unknown bits
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101
target_temp = round(target_temp * 2) / 2
if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW
elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH
data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2
resp = self._send(0, data)
return self._parse_state(resp)
def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.
Returns:
dict:
power (bool):
target_temp (float): temperature set point 16<n<32
mode (hvac.Mode):
speed (hvac.Speed):
preset (hvac.Preset):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
ifeel (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(1)
if len(resp) < 13:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 15 bytes and received {len(resp) + 2}",
)
return self._parse_state(resp)
def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.
Returns:
dict:
power (bool): power
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if len(resp) < 22:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 24 bytes and received {len(resp) + 2}",
)
ac_info = {}
ac_info["power"] = resp[0x1] & 1
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0
return ac_info

View File

@ -1,5 +1,6 @@
"""Support for covers."""
import time
from typing import Sequence
from . import exceptions as e
from .device import Device
@ -63,7 +64,7 @@ class dooya2(Device):
TYPE = "DT360E-2"
def _send(self, operation: int, data: bytes = b""):
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
@ -120,7 +121,7 @@ class wser(Device):
TYPE = "WSER"
def _send(self, operation: int, data: bytes = b""):
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5

View File

@ -3,7 +3,7 @@ import socket
import threading
import random
import time
import typing as t
from typing import Generator, Optional, Tuple, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@ -17,15 +17,15 @@ from .const import (
)
from .protocol import Datetime
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
def scan(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[HelloResponse, None, None]:
) -> Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -100,8 +100,8 @@ class Device:
def __init__(
self,
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
host: Tuple[str, int],
mac: Union[bytes, str],
devtype: int,
timeout: int = DEFAULT_TIMEOUT,
name: str = "",

View File

@ -1,5 +1,5 @@
"""Helper functions and classes."""
import typing as t
from typing import Dict, List, Sequence
class CRC16:
@ -8,10 +8,10 @@ class CRC16:
CRC tables are cached for performance.
"""
_cache: t.Dict[int, t.List[int]] = {}
_cache: Dict[int, List[int]] = {}
@classmethod
def get_table(cls, polynomial: int) -> t.List[int]:
def get_table(cls, polynomial: int) -> List[int]:
"""Return the CRC-16 table for a polynomial."""
try:
crc_table = cls._cache[polynomial]
@ -31,7 +31,7 @@ class CRC16:
@classmethod
def calculate(
cls,
sequence: t.Sequence[int],
sequence: Sequence[int],
polynomial: int = 0xA001, # CRC-16-ANSI.
init_value: int = 0xFFFF,
) -> int:

View File

@ -1,6 +1,7 @@
"""Support for hubs."""
import struct
import json
from typing import Optional
from . import exceptions as e
from .device import Device
@ -42,7 +43,7 @@ class s3(Device):
return sub_devices
def get_state(self, did: str = None) -> dict:
def get_state(self, did: Optional[str] = None) -> dict:
"""Return the power state of the device."""
state = {}
if did is not None:
@ -55,10 +56,10 @@ class s3(Device):
def set_state(
self,
did: str = None,
pwr1: bool = None,
pwr2: bool = None,
pwr3: bool = None,
did: Optional[str] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
pwr3: Optional[bool] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -81,7 +82,9 @@ class s3(Device):
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -91,5 +94,5 @@ class s3(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state

View File

@ -2,6 +2,7 @@
import enum
import json
import struct
from typing import Optional
from . import exceptions as e
from .device import Device
@ -32,20 +33,20 @@ class lb1(Device):
def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
bulb_sceneidx: int = None,
pwr: Optional[bool] = None,
red: Optional[int] = None,
blue: Optional[int] = None,
green: Optional[int] = None,
brightness: Optional[int] = None,
colortemp: Optional[int] = None,
hue: Optional[int] = None,
saturation: Optional[int] = None,
transitionduration: Optional[int] = None,
maxworktime: Optional[int] = None,
bulb_colormode: Optional[int] = None,
bulb_scenes: Optional[str] = None,
bulb_scene: Optional[str] = None,
bulb_sceneidx: Optional[int] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -101,7 +102,7 @@ class lb1(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE : 0xE + js_len])
state = json.loads(payload[0xE:0xE+js_len])
return state
@ -130,19 +131,19 @@ class lb2(Device):
def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
pwr: Optional[bool] = None,
red: Optional[int] = None,
blue: Optional[int] = None,
green: Optional[int] = None,
brightness: Optional[int] = None,
colortemp: Optional[int] = None,
hue: Optional[int] = None,
saturation: Optional[int] = None,
transitionduration: Optional[int] = None,
maxworktime: Optional[int] = None,
bulb_colormode: Optional[int] = None,
bulb_scenes: Optional[str] = None,
bulb_scene: Optional[str] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -183,7 +184,9 @@ class lb2(Device):
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -193,5 +196,5 @@ class lb2(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state

View File

@ -1,12 +1,12 @@
"""Support for universal remotes."""
import struct
import typing as t
from typing import List, Optional, Tuple
from . import exceptions as e
from .device import Device
def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26
@ -25,7 +25,7 @@ def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
return result
def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
@ -38,8 +38,8 @@ def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError:
raise ValueError("Malformed data.")
except IndexError as err:
raise ValueError("Malformed data.") from err
index += 2
result.append(int(chunk * tick))
@ -88,14 +88,14 @@ class rmpro(rmmini):
"""Sweep frequency."""
self._send(0x19)
def check_frequency(self) -> t.Tuple[bool, float]:
def check_frequency(self) -> Tuple[bool, float]:
"""Return True if the frequency was identified successfully."""
resp = self._send(0x1A)
is_found = bool(resp[0])
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
return is_found, frequency
def find_rf_packet(self, frequency: float = None) -> None:
def find_rf_packet(self, frequency: Optional[float] = None) -> None:
"""Enter radiofrequency learning mode."""
payload = bytearray()
if frequency:
@ -129,7 +129,7 @@ class rmminib(rmmini):
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6 : p_len + 2]
return payload[0x6:p_len+2]
class rm4mini(rmminib):

View File

@ -1,4 +1,6 @@
"""Support for sensors."""
from typing import Sequence
from . import exceptions as e
from .device import Device
@ -45,7 +47,7 @@ class a2(Device):
TYPE = "A2"
def _send(self, operation: int, data: bytes = b""):
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5

View File

@ -1,6 +1,7 @@
"""Support for switches."""
import json
import struct
from typing import Optional
from . import exceptions as e
from .device import Device
@ -127,12 +128,12 @@ class sp4(Device):
def set_state(
self,
pwr: bool = None,
ntlight: bool = None,
indicator: bool = None,
ntlbrightness: int = None,
maxworktime: int = None,
childlock: bool = None,
pwr: Optional[bool] = None,
ntlight: Optional[bool] = None,
indicator: Optional[bool] = None,
ntlbrightness: Optional[int] = None,
maxworktime: Optional[int] = None,
childlock: Optional[bool] = None,
) -> dict:
"""Set state of device."""
state = {}
@ -186,7 +187,7 @@ class sp4(Device):
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state
@ -234,7 +235,7 @@ class sp4b(sp4):
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
state = json.loads(payload[0x0E:0x0E+js_len])
return state
@ -255,13 +256,13 @@ class bg1(Device):
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
pwr: Optional[bool] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
maxworktime: Optional[int] = None,
maxworktime1: Optional[int] = None,
maxworktime2: Optional[int] = None,
idcbrightness: Optional[int] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -291,7 +292,16 @@ class bg1(Device):
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
"<HHHHBBI",
packet,
0,
length,
0xA5A5,
0x5A5A,
0x0000,
flag,
0x0B,
len(data),
)
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
@ -302,7 +312,7 @@ class bg1(Device):
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
state = json.loads(payload[0x0E:0x0E+js_len])
return state
@ -313,19 +323,19 @@ class ehc31(bg1):
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
pwr3: bool = None,
maxworktime1: int = None,
maxworktime2: int = None,
maxworktime3: int = None,
idcbrightness: int = None,
childlock: bool = None,
childlock1: bool = None,
childlock2: bool = None,
childlock3: bool = None,
childlock4: bool = None,
pwr: Optional[bool] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
pwr3: Optional[bool] = None,
maxworktime1: Optional[int] = None,
maxworktime2: Optional[int] = None,
maxworktime3: Optional[int] = None,
idcbrightness: Optional[int] = None,
childlock: Optional[bool] = None,
childlock1: Optional[bool] = None,
childlock2: Optional[bool] = None,
childlock3: Optional[bool] = None,
childlock4: Optional[bool] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -449,7 +459,7 @@ class mp1s(mp1):
def get_value(start, end, factors):
value = sum(
int(payload_str[i - 2 : i]) * factor
int(payload_str[i-2:i]) * factor
for i, factor in zip(range(start, end, -2), factors)
)
return value

View File

@ -2,7 +2,7 @@
import argparse
import base64
import time
import typing as t
from typing import List
import broadlink
from broadlink.const import DEFAULT_PORT
@ -16,7 +16,7 @@ def auto_int(x):
return int(x, 0)
def format_pulses(pulses: t.List[int]) -> str:
def format_pulses(pulses: List[int]) -> str:
"""Format pulses."""
return " ".join(
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
@ -24,7 +24,7 @@ def format_pulses(pulses: t.List[int]) -> str:
)
def parse_pulses(data: t.List[str]) -> t.List[int]:
def parse_pulses(data: List[str]) -> List[int]:
"""Parse pulses."""
return [abs(int(s)) for s in data]

View File

@ -5,7 +5,7 @@
from setuptools import setup, find_packages
version = '0.18.3'
version = '0.19.0'
setup(
name="broadlink",