1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-21 22:51:41 +01:00

Merge dev into master (#796)

This commit is contained in:
Felipe Martins Diel 2024-04-17 03:23:20 -03:00 committed by GitHub
commit cee6a1da59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 856 additions and 242 deletions

View File

@ -50,6 +50,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3)
Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
#### Advanced options
You may need to specify a broadcast address if setup is not working.
```python3
broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255')
```
### Discovery ### Discovery
Use this function to discover devices: Use this function to discover devices:
@ -61,17 +68,19 @@ devices = broadlink.discover()
#### Advanced options #### Advanced options
You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices.
Using the IP address of your local machine:
```python3 ```python3
devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine. devices = broadlink.discover(local_ip_address='192.168.0.100')
``` ```
Using the broadcast address of your subnet:
```python3 ```python3
devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet. devices = broadlink.discover(discover_ip_address='192.168.0.255')
``` ```
If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery:
```python3 ```python3
device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device. device = broadlink.hello('192.168.0.16')
``` ```
If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly:
@ -106,23 +115,33 @@ packet = device.check_data()
### Learning RF codes ### Learning RF codes
Learning IR codes takes place in five steps. Learning RF codes takes place in six steps.
1. Sweep the frequency: 1. Sweep the frequency:
```python3 ```python3
device.sweep_frequency() device.sweep_frequency()
``` ```
2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. 2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn.
3. Enter learning mode: 3. Check if the frequency was successfully identified:
```python3
ok = device.check_frequency()
if ok:
print('Frequency found!')
```
4. Enter learning mode:
```python3 ```python3
device.find_rf_packet() device.find_rf_packet()
``` ```
4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. 5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn.
5. Get the RF packet: 6. Get the RF packet:
```python3 ```python3
packet = device.check_data() packet = device.check_data()
``` ```
#### Notes
Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`.
### Canceling learning ### Canceling learning
You can exit the learning mode in the middle of the process by calling this method: You can exit the learning mode in the middle of the process by calling this method:

View File

@ -1,19 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""The python-broadlink library.""" """The python-broadlink library."""
import socket import socket
import typing as t from typing import Generator, List, Optional, Tuple, Union
from . import exceptions as e from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C from .alarm import S1C
from .climate import hysen from .climate import hvac, hysen
from .cover import dooya from .cover import dooya, dooya2, wser
from .device import Device, ping, scan from .device import Device, ping, scan
from .hub import s3 from .hub import s3
from .light import lb1, lb2 from .light import lb1, lb2
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1 from .sensor import a1, a2
from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
SUPPORTED_TYPES = { SUPPORTED_TYPES = {
sp1: { sp1: {
@ -148,14 +148,19 @@ SUPPORTED_TYPES = {
0x653C: ("RM4 pro", "Broadlink"), 0x653C: ("RM4 pro", "Broadlink"),
}, },
a1: { a1: {
0x2714: ("e-Sensor", "Broadlink"), 0x2714: ("A1", "Broadlink"),
},
a2: {
0x4F60: ("A2", "Broadlink"),
}, },
mp1: { mp1: {
0x4EB5: ("MP1-1K4S", "Broadlink"), 0x4EB5: ("MP1-1K4S", "Broadlink"),
0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"),
0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"),
0x4F65: ("MP1-1K3S2U", "Broadlink"), 0x4F65: ("MP1-1K3S2U", "Broadlink"),
}, },
mp1s: {
0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"),
},
lb1: { lb1: {
0x5043: ("SB800TD", "Broadlink (OEM)"), 0x5043: ("SB800TD", "Broadlink (OEM)"),
0x504E: ("LB1", "Broadlink"), 0x504E: ("LB1", "Broadlink"),
@ -181,22 +186,34 @@ SUPPORTED_TYPES = {
0xA59C: ("S3", "Broadlink"), 0xA59C: ("S3", "Broadlink"),
0xA64D: ("S3", "Broadlink"), 0xA64D: ("S3", "Broadlink"),
}, },
hvac: {
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: { hysen: {
0x4EAD: ("HY02/HY03", "Hysen"), 0x4EAD: ("HY02/HY03", "Hysen"),
}, },
dooya: { dooya: {
0x4E4D: ("DT360E-45/20", "Dooya"), 0x4E4D: ("DT360E-45/20", "Dooya"),
}, },
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: { bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"), 0x51E3: ("BG800/BG900", "BG Electrical"),
}, },
ehc31: {
0x6480: ("EHC31", "BG Electrical"),
},
} }
def gendevice( def gendevice(
dev_type: int, dev_type: int,
host: t.Tuple[str, int], host: Tuple[str, int],
mac: t.Union[bytes, str], mac: Union[bytes, str],
name: str = "", name: str = "",
is_locked: bool = False, is_locked: bool = False,
) -> Device: ) -> Device:
@ -222,7 +239,7 @@ def gendevice(
def hello( def hello(
host: str, ip_address: str,
port: int = DEFAULT_PORT, port: int = DEFAULT_PORT,
timeout: int = DEFAULT_TIMEOUT, timeout: int = DEFAULT_TIMEOUT,
) -> Device: ) -> Device:
@ -232,7 +249,11 @@ def hello(
""" """
try: try:
return next( return next(
xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) xdiscover(
timeout=timeout,
discover_ip_address=ip_address,
discover_ip_port=port,
)
) )
except StopIteration as err: except StopIteration as err:
raise e.NetworkTimeoutError( raise e.NetworkTimeoutError(
@ -244,33 +265,42 @@ def hello(
def discover( def discover(
timeout: int = DEFAULT_TIMEOUT, timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None, local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT, discover_ip_port: int = DEFAULT_PORT,
) -> t.List[Device]: ) -> List[Device]:
"""Discover devices connected to the local network.""" """Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses] return [gendevice(*resp) for resp in responses]
def xdiscover( def xdiscover(
timeout: int = DEFAULT_TIMEOUT, timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None, local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT, discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[Device, None, None]: ) -> Generator[Device, None, None]:
"""Discover devices connected to the local network. """Discover devices connected to the local network.
This function returns a generator that yields devices instantly. This function returns a generator that yields devices instantly.
""" """
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses: for resp in responses:
yield gendevice(*resp) yield gendevice(*resp)
# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode.
# Only tested with Broadlink RM3 Mini (Blackbean) # Only tested with Broadlink RM3 Mini (Blackbean)
def setup(ssid: str, password: str, security_mode: int) -> None: def setup(
ssid: str,
password: str,
security_mode: int,
ip_address: str = DEFAULT_BCAST_ADDR,
) -> None:
"""Set up a new Broadlink device via AP mode.""" """Set up a new Broadlink device via AP mode."""
# Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
payload = bytearray(0x88) payload = bytearray(0x88)
@ -299,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) sock.sendto(payload, (ip_address, DEFAULT_PORT))
sock.close() sock.close()

288
broadlink/climate.py Normal file → Executable file
View File

@ -1,5 +1,7 @@
"""Support for HVAC units.""" """Support for climate control."""
import typing as t import enum
import struct
from typing import List, Sequence
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -19,7 +21,7 @@ class hysen(Device):
TYPE = "HYS" TYPE = "HYS"
def send_request(self, request: t.Sequence[int]) -> bytes: def send_request(self, request: Sequence[int]) -> bytes:
"""Send a request to the device.""" """Send a request to the device."""
packet = bytearray() packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little")) packet.extend((len(request) + 2).to_bytes(2, "little"))
@ -31,27 +33,34 @@ class hysen(Device):
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little") p_len = int.from_bytes(payload[:0x02], "little")
if p_len + 2 > len(payload): nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
raise ValueError( real_crc = CRC16.calculate(payload[0x02:p_len])
"hysen_response_error", "first byte of response is not length"
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
) )
nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
return payload[0x02:p_len] return payload[0x02:p_len]
def _decode_temp(self, payload, base_index):
base_temp = payload[base_index] / 2.0
add_offset = (payload[4] >> 3) & 1 # should offset be added?
offset_raw_value = (payload[17] >> 4) & 3 # offset value
offset = (offset_raw_value + 1) / 10 if add_offset else 0.0
return base_temp + offset
def get_temp(self) -> float: def get_temp(self) -> float:
"""Return the room temperature in degrees celsius.""" """Return the room temperature in degrees celsius."""
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[0x05] / 2.0 return self._decode_temp(payload, 5)
def get_external_temp(self) -> float: def get_external_temp(self) -> float:
"""Return the external temperature in degrees celsius.""" """Return the external temperature in degrees celsius."""
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[18] / 2.0 return self._decode_temp(payload, 18)
def get_full_status(self) -> dict: def get_full_status(self) -> dict:
"""Return the state of the device. """Return the state of the device.
@ -64,9 +73,10 @@ class hysen(Device):
data["power"] = payload[4] & 1 data["power"] = payload[4] & 1
data["active"] = (payload[4] >> 4) & 1 data["active"] = (payload[4] >> 4) & 1
data["temp_manual"] = (payload[4] >> 6) & 1 data["temp_manual"] = (payload[4] >> 6) & 1
data["room_temp"] = payload[5] / 2.0 data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0 data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4 data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8] data["sensor"] = payload[8]
data["osv"] = payload[9] data["osv"] = payload[9]
@ -79,7 +89,7 @@ class hysen(Device):
data["fre"] = payload[15] data["fre"] = payload[15]
data["poweron"] = payload[16] data["poweron"] = payload[16]
data["unknown"] = payload[17] data["unknown"] = payload[17]
data["external_temp"] = payload[18] / 2.0 data["external_temp"] = self._decode_temp(payload, 18)
data["hour"] = payload[19] data["hour"] = payload[19]
data["min"] = payload[20] data["min"] = payload[20]
data["sec"] = payload[21] data["sec"] = payload[21]
@ -117,7 +127,9 @@ class hysen(Device):
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental # The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device.""" """Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
@ -185,16 +197,32 @@ class hysen(Device):
# Set device on(1) or off(0), does not deactivate Wifi connectivity. # Set device on(1) or off(0), does not deactivate Wifi connectivity.
# Remote lock disables control by buttons on thermostat. # Remote lock disables control by buttons on thermostat.
def set_power(self, power: int = 1, remote_lock: int = 0) -> None: # heating_cooling: heating(0) cooling(1)
def set_power(
self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0
) -> None:
"""Set the power state of the device.""" """Set the power state of the device."""
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) state = (heating_cooling << 7) + power
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state])
# set time on device # set time on device
# n.b. day=1 is Monday, ..., day=7 is Sunday # n.b. day=1 is Monday, ..., day=7 is Sunday
def set_time(self, hour: int, minute: int, second: int, day: int) -> None: def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time.""" """Set the time."""
self.send_request( self.send_request(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] [
0x01,
0x10,
0x00,
0x08,
0x00,
0x02,
0x04,
hour,
minute,
second,
day
]
) )
# Set timer schedule # Set timer schedule
@ -203,7 +231,7 @@ class hysen(Device):
# {'start_hour':17, 'start_minute':30, 'temp': 22 } # {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute # Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule.""" """Set timer schedule."""
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
@ -226,3 +254,221 @@ class hysen(Device):
request.append(int(weekend[i]["temp"] * 2)) request.append(int(weekend[i]["temp"] * 2))
self.send_request(request) self.send_request(request)
class hvac(Device):
"""Controls a HVAC.
Supported models:
- Tornado SMART X SQ series
- Aux ASW-H12U3/JIR1DI-US
- Aux ASW-H36U2/LFR1DI-US
"""
TYPE = "HVAC"
@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""
AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4
@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""
HIGH = 1
MID = 2
LOW = 3
AUTO = 5
@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""
NORMAL = 0
TURBO = 1
MUTE = 2
@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""
ON = 0
OFF = 7
@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""
ON = 0
POS1 = 1
POS2 = 2
POS3 = 3
POS4 = 4
POS5 = 5
OFF = 7
def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 10 + len(data)
struct.pack_into(
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
)
packet += data
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
packet += crc.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
prefix = bytes([((command << 4) | 1), 1])
packet = self._encode(prefix + data)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x02:]
def _parse_state(self, data: bytes) -> dict:
"""Parse state."""
state = {}
state["power"] = bool(data[0x08] & 1 << 5)
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
state["mode"] = self.Mode(data[0x05] >> 5)
state["speed"] = self.Speed(data[0x03] >> 5)
state["preset"] = self.Preset(data[0x04] >> 6)
state["sleep"] = bool(data[0x05] & 1 << 2)
state["ifeel"] = bool(data[0x05] & 1 << 3)
state["health"] = bool(data[0x08] & 1 << 1)
state["clean"] = bool(data[0x08] & 1 << 2)
state["display"] = bool(data[0x0A] & 1 << 4)
state["mildew"] = bool(data[0x0A] & 1 << 3)
return state
def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: Mode,
speed: Speed,
preset: Preset,
swing_h: SwHoriz,
swing_v: SwVert,
sleep: bool,
ifeel: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> dict:
"""Set the state of the device."""
# TODO: decode unknown bits
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101
target_temp = round(target_temp * 2) / 2
if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW
elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH
data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2
resp = self._send(0, data)
return self._parse_state(resp)
def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.
Returns:
dict:
power (bool):
target_temp (float): temperature set point 16<n<32
mode (hvac.Mode):
speed (hvac.Speed):
preset (hvac.Preset):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
ifeel (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(1)
if len(resp) < 13:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 15 bytes and received {len(resp) + 2}",
)
return self._parse_state(resp)
def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.
Returns:
dict:
power (bool): power
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if len(resp) < 22:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 24 bytes and received {len(resp) + 2}",
)
ac_info = {}
ac_info["power"] = resp[0x1] & 1
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0
return ac_info

View File

@ -1,5 +1,6 @@
"""Support for covers.""" """Support for covers."""
import time import time
from typing import Sequence
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -8,33 +9,34 @@ from .device import Device
class dooya(Device): class dooya(Device):
"""Controls a Dooya curtain motor.""" """Controls a Dooya curtain motor."""
TYPE = "Dooya DT360E" TYPE = "DT360E"
def _send(self, magic1: int, magic2: int) -> int: def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device.""" """Send a packet to the device."""
packet = bytearray(16) packet = bytearray(16)
packet[0] = 0x09 packet[0x00] = 0x09
packet[2] = 0xBB packet[0x02] = 0xBB
packet[3] = magic1 packet[0x03] = command
packet[4] = magic2 packet[0x04] = attribute
packet[9] = 0xFA packet[0x09] = 0xFA
packet[10] = 0x44 packet[0x0A] = 0x44
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24]) resp = self.send_packet(0x6A, packet)
payload = self.decrypt(response[0x38:]) e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4] return payload[4]
def open(self) -> int: def open(self) -> int:
"""Open the curtain.""" """Open the curtain."""
return self._send(0x01, 0x00) return self._send(0x01)
def close(self) -> int: def close(self) -> int:
"""Close the curtain.""" """Close the curtain."""
return self._send(0x02, 0x00) return self._send(0x02)
def stop(self) -> int: def stop(self) -> int:
"""Stop the curtain.""" """Stop the curtain."""
return self._send(0x03, 0x00) return self._send(0x03)
def get_percentage(self) -> int: def get_percentage(self) -> int:
"""Return the position of the curtain.""" """Return the position of the curtain."""
@ -55,3 +57,126 @@ class dooya(Device):
time.sleep(0.2) time.sleep(0.2)
current = self.get_percentage() current = self.get_percentage()
self.stop() self.stop()
class dooya2(Device):
"""Controls a Dooya curtain motor (version 2)."""
TYPE = "DT360E-2"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def open(self) -> None:
"""Open the curtain."""
self._send(2, [0x00, 0x01, 0x00])
def close(self) -> None:
"""Close the curtain."""
self._send(2, [0x00, 0x02, 0x00])
def stop(self) -> None:
"""Stop the curtain."""
self._send(2, [0x00, 0x03, 0x00])
def get_percentage(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]
def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(2, [0x00, 0x09, new_percentage])
class wser(Device):
"""Controls a Wistar curtain motor"""
TYPE = "WSER"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position

View File

@ -3,7 +3,7 @@ import socket
import threading import threading
import random import random
import time import time
import typing as t from typing import Generator, Optional, Tuple, Union
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@ -17,15 +17,15 @@ from .const import (
) )
from .protocol import Datetime from .protocol import Datetime
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
def scan( def scan(
timeout: int = DEFAULT_TIMEOUT, timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None, local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT, discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[HelloResponse, None, None]: ) -> Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses.""" """Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -76,7 +76,7 @@ def scan(
conn.close() conn.close()
def ping(address: str, port: int = DEFAULT_PORT) -> None: def ping(ip_address: str, port: int = DEFAULT_PORT) -> None:
"""Send a ping packet to an address. """Send a ping packet to an address.
This packet feeds the watchdog timer of firmwares >= v53. This packet feeds the watchdog timer of firmwares >= v53.
@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None:
conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
packet = bytearray(0x30) packet = bytearray(0x30)
packet[0x26] = 1 packet[0x26] = 1
conn.sendto(packet, (address, port)) conn.sendto(packet, (ip_address, port))
class Device: class Device:
@ -100,8 +100,8 @@ class Device:
def __init__( def __init__(
self, self,
host: t.Tuple[str, int], host: Tuple[str, int],
mac: t.Union[bytes, str], mac: Union[bytes, str],
devtype: int, devtype: int,
timeout: int = DEFAULT_TIMEOUT, timeout: int = DEFAULT_TIMEOUT,
name: str = "", name: str = "",

View File

@ -1,5 +1,5 @@
"""Helper functions and classes.""" """Helper functions and classes."""
import typing as t from typing import Dict, List, Sequence
class CRC16: class CRC16:
@ -8,10 +8,10 @@ class CRC16:
CRC tables are cached for performance. CRC tables are cached for performance.
""" """
_cache: t.Dict[int, t.List[int]] = {} _cache: Dict[int, List[int]] = {}
@classmethod @classmethod
def get_table(cls, polynomial: int) -> t.List[int]: def get_table(cls, polynomial: int) -> List[int]:
"""Return the CRC-16 table for a polynomial.""" """Return the CRC-16 table for a polynomial."""
try: try:
crc_table = cls._cache[polynomial] crc_table = cls._cache[polynomial]
@ -31,7 +31,7 @@ class CRC16:
@classmethod @classmethod
def calculate( def calculate(
cls, cls,
sequence: t.Sequence[int], sequence: Sequence[int],
polynomial: int = 0xA001, # CRC-16-ANSI. polynomial: int = 0xA001, # CRC-16-ANSI.
init_value: int = 0xFFFF, init_value: int = 0xFFFF,
) -> int: ) -> int:

View File

@ -1,6 +1,7 @@
"""Support for hubs.""" """Support for hubs."""
import struct import struct
import json import json
from typing import Optional
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -12,25 +13,37 @@ class s3(Device):
TYPE = "S3" TYPE = "S3"
MAX_SUBDEVICES = 8 MAX_SUBDEVICES = 8
def get_subdevices(self) -> list: def get_subdevices(self, step: int = 5) -> list:
"""Return the lit of sub devices.""" """Return a list of sub devices."""
total = self.MAX_SUBDEVICES
sub_devices = [] sub_devices = []
step = 5 seen = set()
index = 0
for index in range(0, self.MAX_SUBDEVICES, step): while index < total:
state = {"count": step, "index": index} state = {"count": step, "index": index}
packet = self._encode(14, state) packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet) resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24]) e.check_error(resp[0x22:0x24])
resp = self._decode(resp) resp = self._decode(resp)
sub_devices.extend(resp["list"]) for device in resp["list"]:
if len(sub_devices) == resp["total"]: did = device["did"]
if did in seen:
continue
seen.add(did)
sub_devices.append(device)
total = resp["total"]
if len(seen) >= total:
break break
index += step
return sub_devices return sub_devices
def get_state(self, did: str = None) -> dict: def get_state(self, did: Optional[str] = None) -> dict:
"""Return the power state of the device.""" """Return the power state of the device."""
state = {} state = {}
if did is not None: if did is not None:
@ -43,10 +56,10 @@ class s3(Device):
def set_state( def set_state(
self, self,
did: str = None, did: Optional[str] = None,
pwr1: bool = None, pwr1: Optional[bool] = None,
pwr2: bool = None, pwr2: Optional[bool] = None,
pwr3: bool = None, pwr3: Optional[bool] = None,
) -> dict: ) -> dict:
"""Set the power state of the device.""" """Set the power state of the device."""
state = {} state = {}
@ -69,7 +82,9 @@ class s3(Device):
# flag: 1 for reading, 2 for writing. # flag: 1 for reading, 2 for writing.
packet = bytearray(12) packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode() data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)) struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data) packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little") packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -79,5 +94,5 @@ class s3(Device):
"""Decode a JSON packet.""" """Decode a JSON packet."""
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0] js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len]) state = json.loads(payload[0x0C:0x0C+js_len])
return state return state

View File

@ -2,6 +2,7 @@
import enum import enum
import json import json
import struct import struct
from typing import Optional
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -32,20 +33,20 @@ class lb1(Device):
def set_state( def set_state(
self, self,
pwr: bool = None, pwr: Optional[bool] = None,
red: int = None, red: Optional[int] = None,
blue: int = None, blue: Optional[int] = None,
green: int = None, green: Optional[int] = None,
brightness: int = None, brightness: Optional[int] = None,
colortemp: int = None, colortemp: Optional[int] = None,
hue: int = None, hue: Optional[int] = None,
saturation: int = None, saturation: Optional[int] = None,
transitionduration: int = None, transitionduration: Optional[int] = None,
maxworktime: int = None, maxworktime: Optional[int] = None,
bulb_colormode: int = None, bulb_colormode: Optional[int] = None,
bulb_scenes: str = None, bulb_scenes: Optional[str] = None,
bulb_scene: str = None, bulb_scene: Optional[str] = None,
bulb_sceneidx: int = None, bulb_sceneidx: Optional[int] = None,
) -> dict: ) -> dict:
"""Set the power state of the device.""" """Set the power state of the device."""
state = {} state = {}
@ -101,7 +102,7 @@ class lb1(Device):
"""Decode a JSON packet.""" """Decode a JSON packet."""
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0] js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE : 0xE + js_len]) state = json.loads(payload[0xE:0xE+js_len])
return state return state
@ -130,19 +131,19 @@ class lb2(Device):
def set_state( def set_state(
self, self,
pwr: bool = None, pwr: Optional[bool] = None,
red: int = None, red: Optional[int] = None,
blue: int = None, blue: Optional[int] = None,
green: int = None, green: Optional[int] = None,
brightness: int = None, brightness: Optional[int] = None,
colortemp: int = None, colortemp: Optional[int] = None,
hue: int = None, hue: Optional[int] = None,
saturation: int = None, saturation: Optional[int] = None,
transitionduration: int = None, transitionduration: Optional[int] = None,
maxworktime: int = None, maxworktime: Optional[int] = None,
bulb_colormode: int = None, bulb_colormode: Optional[int] = None,
bulb_scenes: str = None, bulb_scenes: Optional[str] = None,
bulb_scene: str = None, bulb_scene: Optional[str] = None,
) -> dict: ) -> dict:
"""Set the power state of the device.""" """Set the power state of the device."""
state = {} state = {}
@ -183,7 +184,9 @@ class lb2(Device):
# flag: 1 for reading, 2 for writing. # flag: 1 for reading, 2 for writing.
packet = bytearray(12) packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode() data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)) struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data) packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little") packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -193,5 +196,5 @@ class lb2(Device):
"""Decode a JSON packet.""" """Decode a JSON packet."""
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0] js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len]) state = json.loads(payload[0x0C:0x0C+js_len])
return state return state

View File

@ -1,10 +1,52 @@
"""Support for universal remotes.""" """Support for universal remotes."""
import struct import struct
from typing import List, Optional, Tuple
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26
for pulse in pulses:
div, mod = divmod(int(pulse // tick), 256)
if div:
result.append(0)
result.append(div)
result.append(mod)
data_len = len(result) - 4
result[0x02] = data_len & 0xFF
result[0x03] = data_len >> 8
return result
def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
end = min(256 * data[0x03] + data[0x02] + 4, len(data))
while index < end:
chunk = data[index]
index += 1
if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError as err:
raise ValueError("Malformed data.") from err
index += 2
result.append(int(chunk * tick))
return result
class rmmini(Device): class rmmini(Device):
"""Controls a Broadlink RM mini 3.""" """Controls a Broadlink RM mini 3."""
@ -46,14 +88,19 @@ class rmpro(rmmini):
"""Sweep frequency.""" """Sweep frequency."""
self._send(0x19) self._send(0x19)
def check_frequency(self) -> bool: def check_frequency(self) -> Tuple[bool, float]:
"""Return True if the frequency was identified successfully.""" """Return True if the frequency was identified successfully."""
resp = self._send(0x1A) resp = self._send(0x1A)
return resp[0] == 1 is_found = bool(resp[0])
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
return is_found, frequency
def find_rf_packet(self) -> None: def find_rf_packet(self, frequency: Optional[float] = None) -> None:
"""Enter radiofrequency learning mode.""" """Enter radiofrequency learning mode."""
self._send(0x1B) payload = bytearray()
if frequency:
payload += struct.pack("<I", int(frequency * 1000))
self._send(0x1B, payload)
def cancel_sweep_frequency(self) -> None: def cancel_sweep_frequency(self) -> None:
"""Cancel sweep frequency.""" """Cancel sweep frequency."""
@ -82,7 +129,7 @@ class rmminib(rmmini):
e.check_error(resp[0x22:0x24]) e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:]) payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0] p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6 : p_len + 2] return payload[0x6:p_len+2]
class rm4mini(rmminib): class rm4mini(rmminib):

View File

@ -1,5 +1,5 @@
"""Support for sensors.""" """Support for sensors."""
import struct from typing import Sequence
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -29,19 +29,62 @@ class a1(Device):
def check_sensors_raw(self) -> dict: def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format.""" """Return the state of the sensors in raw format."""
packet = bytearray([0x1]) packet = bytearray([0x1])
response = self.send_packet(0x6A, packet) resp = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24]) e.check_error(resp[0x22:0x24])
payload = self.decrypt(response[0x38:]) data = self.decrypt(resp[0x38:])
data = payload[0x4:]
temperature = struct.unpack("<bb", data[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
humidity = data[0x2] + data[0x3] / 10.0
return { return {
"temperature": temperature, "temperature": data[0x04] + data[0x05] / 10.0,
"humidity": humidity, "humidity": data[0x06] + data[0x07] / 10.0,
"light": data[0x4], "light": data[0x08],
"air_quality": data[0x6], "air_quality": data[0x0A],
"noise": data[0x8], "noise": data[0x0C],
}
class a2(Device):
"""Controls a Broadlink A2."""
TYPE = "A2"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
data = self._send(1)
return {
"temperature": data[0x13] * 256 + data[0x14],
"humidity": data[0x15] * 256 + data[0x16],
"pm10": data[0x0D] * 256 + data[0x0E],
"pm2_5": data[0x0F] * 256 + data[0x10],
"pm1": data[0x11] * 256 + data[0x12],
} }

View File

@ -1,6 +1,7 @@
"""Support for switches.""" """Support for switches."""
import json import json
import struct import struct
from typing import Optional
from . import exceptions as e from . import exceptions as e
from .device import Device from .device import Device
@ -127,12 +128,12 @@ class sp4(Device):
def set_state( def set_state(
self, self,
pwr: bool = None, pwr: Optional[bool] = None,
ntlight: bool = None, ntlight: Optional[bool] = None,
indicator: bool = None, indicator: Optional[bool] = None,
ntlbrightness: int = None, ntlbrightness: Optional[int] = None,
maxworktime: int = None, maxworktime: Optional[int] = None,
childlock: bool = None, childlock: Optional[bool] = None,
) -> dict: ) -> dict:
"""Set state of device.""" """Set state of device."""
state = {} state = {}
@ -186,7 +187,7 @@ class sp4(Device):
e.check_error(response[0x22:0x24]) e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0] js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len]) state = json.loads(payload[0x0C:0x0C+js_len])
return state return state
@ -234,7 +235,7 @@ class sp4b(sp4):
e.check_error(response[0x22:0x24]) e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0] js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len]) state = json.loads(payload[0x0E:0x0E+js_len])
return state return state
@ -255,13 +256,13 @@ class bg1(Device):
def set_state( def set_state(
self, self,
pwr: bool = None, pwr: Optional[bool] = None,
pwr1: bool = None, pwr1: Optional[bool] = None,
pwr2: bool = None, pwr2: Optional[bool] = None,
maxworktime: int = None, maxworktime: Optional[int] = None,
maxworktime1: int = None, maxworktime1: Optional[int] = None,
maxworktime2: int = None, maxworktime2: Optional[int] = None,
idcbrightness: int = None, idcbrightness: Optional[int] = None,
) -> dict: ) -> dict:
"""Set the power state of the device.""" """Set the power state of the device."""
state = {} state = {}
@ -291,7 +292,16 @@ class bg1(Device):
data = json.dumps(state).encode() data = json.dumps(state).encode()
length = 12 + len(data) length = 12 + len(data)
struct.pack_into( struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data) "<HHHHBBI",
packet,
0,
length,
0xA5A5,
0x5A5A,
0x0000,
flag,
0x0B,
len(data),
) )
packet.extend(data) packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
@ -302,10 +312,66 @@ class bg1(Device):
"""Decode a message.""" """Decode a message."""
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0] js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len]) state = json.loads(payload[0x0E:0x0E+js_len])
return state return state
class ehc31(bg1):
"""Controls a BG Electrical smart extension lead."""
TYPE = "EHC31"
def set_state(
self,
pwr: Optional[bool] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
pwr3: Optional[bool] = None,
maxworktime1: Optional[int] = None,
maxworktime2: Optional[int] = None,
maxworktime3: Optional[int] = None,
idcbrightness: Optional[int] = None,
childlock: Optional[bool] = None,
childlock1: Optional[bool] = None,
childlock2: Optional[bool] = None,
childlock3: Optional[bool] = None,
childlock4: Optional[bool] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if pwr3 is not None:
state["pwr3"] = int(bool(pwr3))
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if maxworktime3 is not None:
state["maxworktime3"] = maxworktime3
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness
if childlock is not None:
state["childlock"] = int(bool(childlock))
if childlock1 is not None:
state["childlock1"] = int(bool(childlock1))
if childlock2 is not None:
state["childlock2"] = int(bool(childlock2))
if childlock3 is not None:
state["childlock3"] = int(bool(childlock3))
if childlock4 is not None:
state["childlock4"] = int(bool(childlock4))
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
class mp1(Device): class mp1(Device):
"""Controls a Broadlink MP1.""" """Controls a Broadlink MP1."""
@ -360,3 +426,47 @@ class mp1(Device):
"s3": bool(data & 4), "s3": bool(data & 4),
"s4": bool(data & 8), "s4": bool(data & 8),
} }
class mp1s(mp1):
"""Controls a Broadlink MP1S."""
TYPE = "MP1S"
def get_state(self) -> dict:
"""Return the power state of the device.
voltage in V.
current in A.
power in W.
power consumption in kW·h.
"""
packet = bytearray(16)
packet[0x00] = 0x0E
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2
packet[0x07] = 0xC0
packet[0x08] = 0x01
packet[0x0A] = 0x04
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
payload_str = payload.hex()[4:-6]
def get_value(start, end, factors):
value = sum(
int(payload_str[i-2:i]) * factor
for i, factor in zip(range(start, end, -2), factors)
)
return value
return {
"volt": get_value(34, 30, [10, 0.1]),
"current": get_value(40, 34, [1, 0.01, 0.0001]),
"power": get_value(46, 40, [100, 1, 0.01]),
"totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]),
}

View File

@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature
#### Check humidity #### Check humidity
``` ```
broadlink_cli --device @BEDROOM.device --temperature broadlink_cli --device @BEDROOM.device --humidity
``` ```
### Smart plugs ### Smart plugs

View File

@ -1,68 +1,32 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import base64 import base64
import codecs
import time import time
from typing import List
import broadlink import broadlink
from broadlink.const import DEFAULT_PORT from broadlink.const import DEFAULT_PORT
from broadlink.exceptions import ReadError, StorageError from broadlink.exceptions import ReadError, StorageError
from broadlink.remote import data_to_pulses, pulses_to_data
TICK = 32.84
TIMEOUT = 30 TIMEOUT = 30
IR_TOKEN = 0x26
def auto_int(x): def auto_int(x):
return int(x, 0) return int(x, 0)
def to_microseconds(bytes): def format_pulses(pulses: List[int]) -> str:
result = [] """Format pulses."""
# print bytes[0] # 0x26 = 38for IR return " ".join(
index = 4 f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
while index < len(bytes): for i, pulse in enumerate(pulses)
chunk = bytes[index] )
index += 1
if chunk == 0:
chunk = bytes[index]
chunk = 256 * chunk + bytes[index + 1]
index += 2
result.append(int(round(chunk * TICK)))
if chunk == 0x0d05:
break
return result
def durations_to_broadlink(durations): def parse_pulses(data: List[str]) -> List[int]:
result = bytearray() """Parse pulses."""
result.append(IR_TOKEN) return [abs(int(s)) for s in data]
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num % 256)
return result
def format_durations(data):
result = ''
for i in range(0, len(data)):
if len(result) > 0:
result += ' '
result += ('+' if i % 2 == 0 else '-') + str(data[i])
return result
def parse_durations(str):
result = []
for s in str.split():
result.append(abs(int(s)))
return result
parser = argparse.ArgumentParser(fromfile_prefix_chars='@') parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
@ -83,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on
parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--send", action="store_true", help="send command")
parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--sensors", action="store_true", help="check all sensors")
parser.add_argument("--learn", action="store_true", help="learn command") parser.add_argument("--learn", action="store_true", help="learn command")
parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") parser.add_argument("--rflearn", action="store_true", help="rf scan learning")
parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning")
parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--learnfile", help="save learned command to a specified file")
parser.add_argument("--durations", action="store_true", parser.add_argument("--durations", action="store_true",
help="use durations in micro seconds instead of the Broadlink format") help="use durations in micro seconds instead of the Broadlink format")
@ -111,8 +76,8 @@ if args.joinwifi:
if args.convert: if args.convert:
data = bytearray.fromhex(''.join(args.data)) data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data) pulses = data_to_pulses(data)
print(format_durations(durations)) print(format_pulses(pulses))
if args.temperature: if args.temperature:
print(dev.check_temperature()) print(dev.check_temperature())
if args.humidity: if args.humidity:
@ -124,10 +89,13 @@ if args.sensors:
for key in data: for key in data:
print("{} {}".format(key, data[key])) print("{} {}".format(key, data[key]))
if args.send: if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ data = (
if args.durations else bytearray.fromhex(''.join(args.data)) pulses_to_data(parse_pulses(args.data))
if args.durations
else bytes.fromhex(''.join(args.data))
)
dev.send_data(data) dev.send_data(data)
if args.learn or (args.learnfile and not args.rfscanlearn): if args.learn or (args.learnfile and not args.rflearn):
dev.enter_learning() dev.enter_learning()
print("Learning...") print("Learning...")
start = time.time() start = time.time()
@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn):
print("No data received...") print("No data received...")
exit(1) exit(1)
learned = format_durations(to_microseconds(bytearray(data))) \ print("Packet found!")
if args.durations \ raw_fmt = data.hex()
else ''.join(format(x, '02x') for x in bytearray(data)) base64_fmt = base64.b64encode(data).decode('ascii')
if args.learn: pulse_fmt = format_pulses(data_to_pulses(data))
print(learned)
decode_hex = codecs.getdecoder("hex_codec") print("Raw:", raw_fmt)
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile: if args.learnfile:
print("Saving to {}".format(args.learnfile)) print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file: with open(args.learnfile, "w") as text_file:
text_file.write(learned) text_file.write(pulse_fmt if args.durations else raw_fmt)
if args.check: if args.check:
if dev.check_power(): if dev.check_power():
print('* ON *') print('* ON *')
@ -195,28 +165,33 @@ if args.switch:
else: else:
dev.set_power(True) dev.set_power(True)
print('* Switch to ON *') print('* Switch to ON *')
if args.rfscanlearn: if args.rflearn:
if args.frequency:
frequency = args.frequency
print("Press the button you want to learn, a short press...")
else:
dev.sweep_frequency() dev.sweep_frequency()
print("Learning RF Frequency, press and hold the button to learn...") print("Detecting radiofrequency, press and hold the button to learn...")
start = time.time() start = time.time()
while time.time() - start < TIMEOUT: while time.time() - start < TIMEOUT:
time.sleep(1) time.sleep(1)
if dev.check_frequency(): locked, frequency = dev.check_frequency()
if locked:
break break
else: else:
print("RF Frequency not found") print("Radiofrequency not found")
dev.cancel_sweep_frequency() dev.cancel_sweep_frequency()
exit(1) exit(1)
print("Found RF Frequency - 1 of 2!") print("Radiofrequency detected: {}MHz".format(frequency))
print("You can now let go of the button") print("You can now let go of the button")
input("Press enter to continue...") input("Press enter to continue...")
print("To complete learning, single press the button you want to learn") print("Press the button again, now a short press.")
dev.find_rf_packet() dev.find_rf_packet(frequency)
start = time.time() start = time.time()
while time.time() - start < TIMEOUT: while time.time() - start < TIMEOUT:
@ -231,15 +206,16 @@ if args.rfscanlearn:
print("No data received...") print("No data received...")
exit(1) exit(1)
print("Found RF Frequency - 2 of 2!") print("Packet found!")
learned = format_durations(to_microseconds(bytearray(data))) \ raw_fmt = data.hex()
if args.durations \ base64_fmt = base64.b64encode(data).decode('ascii')
else ''.join(format(x, '02x') for x in bytearray(data)) pulse_fmt = format_pulses(data_to_pulses(data))
if args.learnfile is None:
print(learned) print("Raw:", raw_fmt)
decode_hex = codecs.getdecoder("hex_codec") print("Base64:", base64_fmt)
print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) print("Pulses:", pulse_fmt)
if args.learnfile is not None:
if args.learnfile:
print("Saving to {}".format(args.learnfile)) print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file: with open(args.learnfile, "w") as text_file:
text_file.write(learned) text_file.write(pulse_fmt if args.durations else raw_fmt)