1
0
Fork 0

Compare commits

...

27 Commits

Author SHA1 Message Date
Felipe Martins Diel 730853e5fa
Bump version to 0.19.0 (#798) 2024-04-17 03:26:53 -03:00
Felipe Martins Diel cee6a1da59
Merge dev into master (#796) 2024-04-17 03:23:20 -03:00
Felipe Martins Diel 0a9acab2b8
Make type hints compatible with Python 3.6 (#797) 2024-04-17 03:20:13 -03:00
Felipe Martins Diel eb56e7a46f
Merge branch 'master' into dev 2024-04-17 02:58:03 -03:00
Felipe Martins Diel ff4628de1b
Merge new product ids into master (#795)
* Add support for Broadlink SP4M-JP (0x756B) (#782)

* Add support for Luceco/BG Electrical A60 bulb (0x606D) (#766)

* Add support for Luceco EFCF60WSMT (0xA6EF) (#787)

* Add support for Broadlink WS4 (#792)

* Add support for Broadlink SP4D-US (0xA6F4) (#793)
2024-04-17 02:55:38 -03:00
Felipe Martins Diel c4979562c8
Fix type hints (#794) 2024-04-17 02:49:13 -03:00
Felipe Martins Diel 1e11558613
Add support for Tornado 16X SQ air conditioner (0x4E2A) (#520)
* Add support for Tornado 16X SQ air conditioner

* Make Tornado a generic HVAC class

* Better names

* Clean up IntEnums

* Clean up encoders

* Fix indexes

* Improve set_state() interface

* Enumerate presets

* Rename state to power in get_ac_info()

* Paint it black

* Use CRC16 helper class

* Remove log messages

* Fix bugs

* Return state in set_state()
2024-04-17 02:06:36 -03:00
Felipe Martins Diel fa44b54d88
Add support for Broadlink A2 (#791)
* Add support for Broadlink A2

* Add supported type

* Fix bugs

* Improve device name
2024-04-12 02:10:06 -03:00
Felipe Martins Diel 24b9d308b6
Fix s3.get_subdevices() (#790)
* Fix s3.get_subdevices()

* Fix docstring
2024-04-10 23:55:41 -03:00
Felipe Martins Diel eb0f98a410
Fix README.md (#789) 2024-04-10 23:15:46 -03:00
Felipe Martins Diel 247be74c33
Expose IR/RF conversion functions (#788)
* Move IR duration<->Broadlink conversion down from CLI

* Fix --learn base64 to not crash with --durations

Also remove its b'...' wrapping.

* Fix IR/RF conversions

---------

Co-authored-by: William Grant <me@williamgrant.id.au>
2024-04-10 22:51:41 -03:00
Felipe Martins Diel 84af992dcc
Add support for Wistar smart curtain (0x4F6C) (#786)
* Add support for Wistar smart curtain (0x4F6C)

* Rename wsrc to wser
2024-04-10 16:35:25 -03:00
Felipe Martins Diel 4766d68289
Add support for Dooya DT360E (v2) (#785) 2024-04-09 20:32:41 -03:00
Felipe Martins Diel 821820c61e
Add support for BG Electrical EHC31 (0x6480) (#784) 2024-04-09 19:56:30 -03:00
Felipe Martins Diel cacebe7f3c
Rename MP1S state parameters (#783)
* Rename MP1S state parameters

* Rename get_status to get_state
2024-04-09 19:43:29 -03:00
Hozoy c6bf96da47
Add mp1s get_status function (#762) 2024-04-09 19:23:35 -03:00
Felipe Martins Diel 06c91ae394
Remove auxiliary functions from hysen class (#780) 2024-04-09 16:14:04 -03:00
irsl d7ed9855b9
Thermostat: get the 1st decimal place (#772) 2024-04-09 16:06:38 -03:00
Ian Munsie 634370d878
Add ability to RF scan a specific frequency (#613)
* Add ability to RF scan a specific frequency

This adds an optional parameter to find_rf_packet(), along with a
corresponding --rflearn parameter (defaulting to 433.92) to
broadlink_cli that specifies the frequency to tune to, rather than
requiring the frequency be found via sweeping. This is almost mandatory
for certain types of remotes that do not repeat their signals while the
button is held, and saves significant time when the frequency is known
in advance or when many buttons are to be captured in a row.

Additionally:

- A get_frequency() API is added to return the current frequency the
  device is tuned to.

- A check_frequency_ex() API is added to perform functions of both
  check_frequency() and get_frequency() in a single call.

- broadlink_cli --rfscanlearn will now report the current frequency at 1
  second intervals during sweeping, and will report the frequency it
  finally locks on to.

* Clean up remote.py

* Clean up broadlink_cli

* Update conditional

* Fix message

---------

Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
2024-04-09 15:40:00 -03:00
fustom abcc9aaeed
Add heating_cooling state to Hysen (#722) 2023-01-22 01:50:37 -03:00
Felipe Martins Diel d4dafa386c
Merge 'master' into 'dev' (#734) 2023-01-14 02:01:06 -03:00
Felipe Martins Diel 70180cfbc6 Merge branch 'master' into dev 2022-03-19 19:37:31 -03:00
Felipe Martins Diel bb19504314
Fix instructions for learning RF codes (#632) 2021-10-18 18:23:05 -03:00
Felipe Martins Diel f2a582b8f9
Add support for Broadlink MP1 with power meter (#631) 2021-10-18 15:59:47 -03:00
Felipe Martins Diel 9873af9bc4
Standardize ip_address option (#630) 2021-10-18 14:19:41 -03:00
Felipe Martins Diel 11febb043b
Improve README.md (#629) 2021-10-18 14:05:39 -03:00
Felipe Martins Diel b596984b44
Add ip_address option to setup() (#628)
* Add ip_address option to setup()

* Update README.md
2021-10-18 13:42:32 -03:00
14 changed files with 864 additions and 245 deletions

View File

@ -50,6 +50,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3)
Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
#### Advanced options
You may need to specify a broadcast address if setup is not working.
```python3
broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255')
```
### Discovery
Use this function to discover devices:
@ -61,17 +68,19 @@ devices = broadlink.discover()
#### Advanced options
You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices.
Using the IP address of your local machine:
```python3
devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine.
devices = broadlink.discover(local_ip_address='192.168.0.100')
```
Using the broadcast address of your subnet:
```python3
devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet.
devices = broadlink.discover(discover_ip_address='192.168.0.255')
```
If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery:
```python3
device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device.
device = broadlink.hello('192.168.0.16')
```
If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly:
@ -106,23 +115,33 @@ packet = device.check_data()
### Learning RF codes
Learning IR codes takes place in five steps.
Learning RF codes takes place in six steps.
1. Sweep the frequency:
```python3
device.sweep_frequency()
```
2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn.
3. Enter learning mode:
3. Check if the frequency was successfully identified:
```python3
ok = device.check_frequency()
if ok:
print('Frequency found!')
```
4. Enter learning mode:
```python3
device.find_rf_packet()
```
4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn.
5. Get the RF packet:
5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn.
6. Get the RF packet:
```python3
packet = device.check_data()
```
#### Notes
Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`.
### Canceling learning
You can exit the learning mode in the middle of the process by calling this method:

View File

@ -1,19 +1,19 @@
#!/usr/bin/env python3
"""The python-broadlink library."""
import socket
import typing as t
from typing import Generator, List, Optional, Tuple, Union
from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .cover import dooya
from .climate import hvac, hysen
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
from .sensor import a1, a2
from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
SUPPORTED_TYPES = {
sp1: {
@ -55,6 +55,7 @@ SUPPORTED_TYPES = {
},
sp4: {
0x7568: ("SP4L-CN", "Broadlink"),
0x756B: ("SP4M-JP", "Broadlink"),
0x756C: ("SP4M", "Broadlink"),
0x756F: ("MCB1", "Broadlink"),
0x7579: ("SP4L-EU", "Broadlink"),
@ -62,6 +63,7 @@ SUPPORTED_TYPES = {
0x7583: ("SP mini 3", "Broadlink"),
0x7587: ("SP4L-UK", "Broadlink"),
0x7D11: ("SP mini 3", "Broadlink"),
0xA4F9: ("WS4", "Broadlink (OEM)"),
0xA569: ("SP4L-UK", "Broadlink"),
0xA56A: ("MCB1", "Broadlink"),
0xA56B: ("SCB1E", "Broadlink"),
@ -69,6 +71,7 @@ SUPPORTED_TYPES = {
0xA576: ("SP4L-AU", "Broadlink"),
0xA589: ("SP4L-UK", "Broadlink"),
0xA5D3: ("SP4L-EU", "Broadlink"),
0xA6F4: ("SP4D-US", "Broadlink"),
},
sp4b: {
0x5115: ("SCB1E", "Broadlink"),
@ -145,17 +148,23 @@ SUPPORTED_TYPES = {
0x653C: ("RM4 pro", "Broadlink"),
},
a1: {
0x2714: ("e-Sensor", "Broadlink"),
0x2714: ("A1", "Broadlink"),
},
a2: {
0x4F60: ("A2", "Broadlink"),
},
mp1: {
0x4EB5: ("MP1-1K4S", "Broadlink"),
0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"),
0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"),
0x4F65: ("MP1-1K3S2U", "Broadlink"),
},
mp1s: {
0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"),
},
lb1: {
0x5043: ("SB800TD", "Broadlink (OEM)"),
0x504E: ("LB1", "Broadlink"),
0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"),
0x606E: ("SB500TD", "Broadlink (OEM)"),
0x60C7: ("LB1", "Broadlink"),
0x60C8: ("LB1", "Broadlink"),
@ -168,13 +177,17 @@ SUPPORTED_TYPES = {
lb2: {
0xA4F4: ("LB27 R1", "Broadlink"),
0xA5F7: ("LB27 R1", "Broadlink"),
0xA6EF: ("EFCF60WSMT", "Luceco"),
},
S1C: {
0x2722: ("S2KIT", "Broadlink"),
},
s3: {
0xA59C:("S3", "Broadlink"),
0xA64D:("S3", "Broadlink"),
s3: {
0xA59C: ("S3", "Broadlink"),
0xA64D: ("S3", "Broadlink"),
},
hvac: {
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: {
0x4EAD: ("HY02/HY03", "Hysen"),
@ -182,16 +195,25 @@ SUPPORTED_TYPES = {
dooya: {
0x4E4D: ("DT360E-45/20", "Dooya"),
},
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"),
},
ehc31: {
0x6480: ("EHC31", "BG Electrical"),
},
}
def gendevice(
dev_type: int,
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = "",
is_locked: bool = False,
) -> Device:
@ -217,7 +239,7 @@ def gendevice(
def hello(
host: str,
ip_address: str,
port: int = DEFAULT_PORT,
timeout: int = DEFAULT_TIMEOUT,
) -> Device:
@ -227,7 +249,11 @@ def hello(
"""
try:
return next(
xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port)
xdiscover(
timeout=timeout,
discover_ip_address=ip_address,
discover_ip_port=port,
)
)
except StopIteration as err:
raise e.NetworkTimeoutError(
@ -239,33 +265,42 @@ def hello(
def discover(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.List[Device]:
) -> List[Device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses]
def xdiscover(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[Device, None, None]:
) -> Generator[Device, None, None]:
"""Discover devices connected to the local network.
This function returns a generator that yields devices instantly.
"""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses:
yield gendevice(*resp)
# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode.
# Only tested with Broadlink RM3 Mini (Blackbean)
def setup(ssid: str, password: str, security_mode: int) -> None:
def setup(
ssid: str,
password: str,
security_mode: int,
ip_address: str = DEFAULT_BCAST_ADDR,
) -> None:
"""Set up a new Broadlink device via AP mode."""
# Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
payload = bytearray(0x88)
@ -294,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT))
sock.sendto(payload, (ip_address, DEFAULT_PORT))
sock.close()

288
broadlink/climate.py Normal file → Executable file
View File

@ -1,5 +1,7 @@
"""Support for HVAC units."""
import typing as t
"""Support for climate control."""
import enum
import struct
from typing import List, Sequence
from . import exceptions as e
from .device import Device
@ -19,7 +21,7 @@ class hysen(Device):
TYPE = "HYS"
def send_request(self, request: t.Sequence[int]) -> bytes:
def send_request(self, request: Sequence[int]) -> bytes:
"""Send a request to the device."""
packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little"))
@ -31,27 +33,34 @@ class hysen(Device):
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
if p_len + 2 > len(payload):
raise ValueError(
"hysen_response_error", "first byte of response is not length"
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
return payload[0x02:p_len]
def _decode_temp(self, payload, base_index):
base_temp = payload[base_index] / 2.0
add_offset = (payload[4] >> 3) & 1 # should offset be added?
offset_raw_value = (payload[17] >> 4) & 3 # offset value
offset = (offset_raw_value + 1) / 10 if add_offset else 0.0
return base_temp + offset
def get_temp(self) -> float:
"""Return the room temperature in degrees celsius."""
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[0x05] / 2.0
return self._decode_temp(payload, 5)
def get_external_temp(self) -> float:
"""Return the external temperature in degrees celsius."""
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[18] / 2.0
return self._decode_temp(payload, 18)
def get_full_status(self) -> dict:
"""Return the state of the device.
@ -64,9 +73,10 @@ class hysen(Device):
data["power"] = payload[4] & 1
data["active"] = (payload[4] >> 4) & 1
data["temp_manual"] = (payload[4] >> 6) & 1
data["room_temp"] = payload[5] / 2.0
data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF
data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
@ -79,7 +89,7 @@ class hysen(Device):
data["fre"] = payload[15]
data["poweron"] = payload[16]
data["unknown"] = payload[17]
data["external_temp"] = payload[18] / 2.0
data["external_temp"] = self._decode_temp(payload, 18)
data["hour"] = payload[19]
data["min"] = payload[20]
data["sec"] = payload[21]
@ -117,7 +127,9 @@ class hysen(Device):
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
@ -185,16 +197,32 @@ class hysen(Device):
# Set device on(1) or off(0), does not deactivate Wifi connectivity.
# Remote lock disables control by buttons on thermostat.
def set_power(self, power: int = 1, remote_lock: int = 0) -> None:
# heating_cooling: heating(0) cooling(1)
def set_power(
self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0
) -> None:
"""Set the power state of the device."""
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power])
state = (heating_cooling << 7) + power
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state])
# set time on device
# n.b. day=1 is Monday, ..., day=7 is Sunday
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
[
0x01,
0x10,
0x00,
0x08,
0x00,
0x02,
0x04,
hour,
minute,
second,
day
]
)
# Set timer schedule
@ -203,7 +231,7 @@ class hysen(Device):
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule."""
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
@ -226,3 +254,221 @@ class hysen(Device):
request.append(int(weekend[i]["temp"] * 2))
self.send_request(request)
class hvac(Device):
"""Controls a HVAC.
Supported models:
- Tornado SMART X SQ series
- Aux ASW-H12U3/JIR1DI-US
- Aux ASW-H36U2/LFR1DI-US
"""
TYPE = "HVAC"
@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""
AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4
@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""
HIGH = 1
MID = 2
LOW = 3
AUTO = 5
@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""
NORMAL = 0
TURBO = 1
MUTE = 2
@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""
ON = 0
OFF = 7
@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""
ON = 0
POS1 = 1
POS2 = 2
POS3 = 3
POS4 = 4
POS5 = 5
OFF = 7
def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 10 + len(data)
struct.pack_into(
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
)
packet += data
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
packet += crc.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
prefix = bytes([((command << 4) | 1), 1])
packet = self._encode(prefix + data)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x02:]
def _parse_state(self, data: bytes) -> dict:
"""Parse state."""
state = {}
state["power"] = bool(data[0x08] & 1 << 5)
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
state["mode"] = self.Mode(data[0x05] >> 5)
state["speed"] = self.Speed(data[0x03] >> 5)
state["preset"] = self.Preset(data[0x04] >> 6)
state["sleep"] = bool(data[0x05] & 1 << 2)
state["ifeel"] = bool(data[0x05] & 1 << 3)
state["health"] = bool(data[0x08] & 1 << 1)
state["clean"] = bool(data[0x08] & 1 << 2)
state["display"] = bool(data[0x0A] & 1 << 4)
state["mildew"] = bool(data[0x0A] & 1 << 3)
return state
def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: Mode,
speed: Speed,
preset: Preset,
swing_h: SwHoriz,
swing_v: SwVert,
sleep: bool,
ifeel: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> dict:
"""Set the state of the device."""
# TODO: decode unknown bits
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101
target_temp = round(target_temp * 2) / 2
if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW
elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH
data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2
resp = self._send(0, data)
return self._parse_state(resp)
def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.
Returns:
dict:
power (bool):
target_temp (float): temperature set point 16<n<32
mode (hvac.Mode):
speed (hvac.Speed):
preset (hvac.Preset):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
ifeel (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(1)
if len(resp) < 13:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 15 bytes and received {len(resp) + 2}",
)
return self._parse_state(resp)
def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.
Returns:
dict:
power (bool): power
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if len(resp) < 22:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 24 bytes and received {len(resp) + 2}",
)
ac_info = {}
ac_info["power"] = resp[0x1] & 1
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0
return ac_info

View File

@ -1,5 +1,6 @@
"""Support for covers."""
import time
from typing import Sequence
from . import exceptions as e
from .device import Device
@ -8,33 +9,34 @@ from .device import Device
class dooya(Device):
"""Controls a Dooya curtain motor."""
TYPE = "Dooya DT360E"
TYPE = "DT360E"
def _send(self, magic1: int, magic2: int) -> int:
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
packet[2] = 0xBB
packet[3] = magic1
packet[4] = magic2
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
packet[0x00] = 0x09
packet[0x02] = 0xBB
packet[0x03] = command
packet[0x04] = attribute
packet[0x09] = 0xFA
packet[0x0A] = 0x44
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4]
def open(self) -> int:
"""Open the curtain."""
return self._send(0x01, 0x00)
return self._send(0x01)
def close(self) -> int:
"""Close the curtain."""
return self._send(0x02, 0x00)
return self._send(0x02)
def stop(self) -> int:
"""Stop the curtain."""
return self._send(0x03, 0x00)
return self._send(0x03)
def get_percentage(self) -> int:
"""Return the position of the curtain."""
@ -55,3 +57,126 @@ class dooya(Device):
time.sleep(0.2)
current = self.get_percentage()
self.stop()
class dooya2(Device):
"""Controls a Dooya curtain motor (version 2)."""
TYPE = "DT360E-2"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def open(self) -> None:
"""Open the curtain."""
self._send(2, [0x00, 0x01, 0x00])
def close(self) -> None:
"""Close the curtain."""
self._send(2, [0x00, 0x02, 0x00])
def stop(self) -> None:
"""Stop the curtain."""
self._send(2, [0x00, 0x03, 0x00])
def get_percentage(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]
def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(2, [0x00, 0x09, new_percentage])
class wser(Device):
"""Controls a Wistar curtain motor"""
TYPE = "WSER"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position

View File

@ -3,7 +3,7 @@ import socket
import threading
import random
import time
import typing as t
from typing import Generator, Optional, Tuple, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@ -17,15 +17,15 @@ from .const import (
)
from .protocol import Datetime
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
def scan(
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
local_ip_address: Optional[str] = None,
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[HelloResponse, None, None]:
) -> Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -76,7 +76,7 @@ def scan(
conn.close()
def ping(address: str, port: int = DEFAULT_PORT) -> None:
def ping(ip_address: str, port: int = DEFAULT_PORT) -> None:
"""Send a ping packet to an address.
This packet feeds the watchdog timer of firmwares >= v53.
@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None:
conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
packet = bytearray(0x30)
packet[0x26] = 1
conn.sendto(packet, (address, port))
conn.sendto(packet, (ip_address, port))
class Device:
@ -100,8 +100,8 @@ class Device:
def __init__(
self,
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
host: Tuple[str, int],
mac: Union[bytes, str],
devtype: int,
timeout: int = DEFAULT_TIMEOUT,
name: str = "",

View File

@ -1,5 +1,5 @@
"""Helper functions and classes."""
import typing as t
from typing import Dict, List, Sequence
class CRC16:
@ -8,10 +8,10 @@ class CRC16:
CRC tables are cached for performance.
"""
_cache: t.Dict[int, t.List[int]] = {}
_cache: Dict[int, List[int]] = {}
@classmethod
def get_table(cls, polynomial: int) -> t.List[int]:
def get_table(cls, polynomial: int) -> List[int]:
"""Return the CRC-16 table for a polynomial."""
try:
crc_table = cls._cache[polynomial]
@ -31,7 +31,7 @@ class CRC16:
@classmethod
def calculate(
cls,
sequence: t.Sequence[int],
sequence: Sequence[int],
polynomial: int = 0xA001, # CRC-16-ANSI.
init_value: int = 0xFFFF,
) -> int:

View File

@ -1,6 +1,7 @@
"""Support for hubs."""
import struct
import json
from typing import Optional
from . import exceptions as e
from .device import Device
@ -12,25 +13,37 @@ class s3(Device):
TYPE = "S3"
MAX_SUBDEVICES = 8
def get_subdevices(self) -> list:
"""Return the lit of sub devices."""
def get_subdevices(self, step: int = 5) -> list:
"""Return a list of sub devices."""
total = self.MAX_SUBDEVICES
sub_devices = []
step = 5
seen = set()
index = 0
for index in range(0, self.MAX_SUBDEVICES, step):
while index < total:
state = {"count": step, "index": index}
packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
resp = self._decode(resp)
sub_devices.extend(resp["list"])
if len(sub_devices) == resp["total"]:
for device in resp["list"]:
did = device["did"]
if did in seen:
continue
seen.add(did)
sub_devices.append(device)
total = resp["total"]
if len(seen) >= total:
break
index += step
return sub_devices
def get_state(self, did: str = None) -> dict:
def get_state(self, did: Optional[str] = None) -> dict:
"""Return the power state of the device."""
state = {}
if did is not None:
@ -43,10 +56,10 @@ class s3(Device):
def set_state(
self,
did: str = None,
pwr1: bool = None,
pwr2: bool = None,
pwr3: bool = None,
did: Optional[str] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
pwr3: Optional[bool] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -69,7 +82,9 @@ class s3(Device):
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -79,5 +94,5 @@ class s3(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state

View File

@ -2,6 +2,7 @@
import enum
import json
import struct
from typing import Optional
from . import exceptions as e
from .device import Device
@ -32,20 +33,20 @@ class lb1(Device):
def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
bulb_sceneidx: int = None,
pwr: Optional[bool] = None,
red: Optional[int] = None,
blue: Optional[int] = None,
green: Optional[int] = None,
brightness: Optional[int] = None,
colortemp: Optional[int] = None,
hue: Optional[int] = None,
saturation: Optional[int] = None,
transitionduration: Optional[int] = None,
maxworktime: Optional[int] = None,
bulb_colormode: Optional[int] = None,
bulb_scenes: Optional[str] = None,
bulb_scene: Optional[str] = None,
bulb_sceneidx: Optional[int] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -101,7 +102,7 @@ class lb1(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE : 0xE + js_len])
state = json.loads(payload[0xE:0xE+js_len])
return state
@ -130,19 +131,19 @@ class lb2(Device):
def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
pwr: Optional[bool] = None,
red: Optional[int] = None,
blue: Optional[int] = None,
green: Optional[int] = None,
brightness: Optional[int] = None,
colortemp: Optional[int] = None,
hue: Optional[int] = None,
saturation: Optional[int] = None,
transitionduration: Optional[int] = None,
maxworktime: Optional[int] = None,
bulb_colormode: Optional[int] = None,
bulb_scenes: Optional[str] = None,
bulb_scene: Optional[str] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -183,7 +184,9 @@ class lb2(Device):
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
@ -193,5 +196,5 @@ class lb2(Device):
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state

View File

@ -1,10 +1,52 @@
"""Support for universal remotes."""
import struct
from typing import List, Optional, Tuple
from . import exceptions as e
from .device import Device
def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26
for pulse in pulses:
div, mod = divmod(int(pulse // tick), 256)
if div:
result.append(0)
result.append(div)
result.append(mod)
data_len = len(result) - 4
result[0x02] = data_len & 0xFF
result[0x03] = data_len >> 8
return result
def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
end = min(256 * data[0x03] + data[0x02] + 4, len(data))
while index < end:
chunk = data[index]
index += 1
if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError as err:
raise ValueError("Malformed data.") from err
index += 2
result.append(int(chunk * tick))
return result
class rmmini(Device):
"""Controls a Broadlink RM mini 3."""
@ -46,14 +88,19 @@ class rmpro(rmmini):
"""Sweep frequency."""
self._send(0x19)
def check_frequency(self) -> bool:
def check_frequency(self) -> Tuple[bool, float]:
"""Return True if the frequency was identified successfully."""
resp = self._send(0x1A)
return resp[0] == 1
is_found = bool(resp[0])
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
return is_found, frequency
def find_rf_packet(self) -> None:
def find_rf_packet(self, frequency: Optional[float] = None) -> None:
"""Enter radiofrequency learning mode."""
self._send(0x1B)
payload = bytearray()
if frequency:
payload += struct.pack("<I", int(frequency * 1000))
self._send(0x1B, payload)
def cancel_sweep_frequency(self) -> None:
"""Cancel sweep frequency."""
@ -82,7 +129,7 @@ class rmminib(rmmini):
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6 : p_len + 2]
return payload[0x6:p_len+2]
class rm4mini(rmminib):

View File

@ -1,5 +1,5 @@
"""Support for sensors."""
import struct
from typing import Sequence
from . import exceptions as e
from .device import Device
@ -29,19 +29,62 @@ class a1(Device):
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]
temperature = struct.unpack("<bb", data[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
humidity = data[0x2] + data[0x3] / 10.0
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
data = self.decrypt(resp[0x38:])
return {
"temperature": temperature,
"humidity": humidity,
"light": data[0x4],
"air_quality": data[0x6],
"noise": data[0x8],
"temperature": data[0x04] + data[0x05] / 10.0,
"humidity": data[0x06] + data[0x07] / 10.0,
"light": data[0x08],
"air_quality": data[0x0A],
"noise": data[0x0C],
}
class a2(Device):
"""Controls a Broadlink A2."""
TYPE = "A2"
def _send(self, operation: int, data: Sequence = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
data = self._send(1)
return {
"temperature": data[0x13] * 256 + data[0x14],
"humidity": data[0x15] * 256 + data[0x16],
"pm10": data[0x0D] * 256 + data[0x0E],
"pm2_5": data[0x0F] * 256 + data[0x10],
"pm1": data[0x11] * 256 + data[0x12],
}

View File

@ -1,6 +1,7 @@
"""Support for switches."""
import json
import struct
from typing import Optional
from . import exceptions as e
from .device import Device
@ -127,12 +128,12 @@ class sp4(Device):
def set_state(
self,
pwr: bool = None,
ntlight: bool = None,
indicator: bool = None,
ntlbrightness: int = None,
maxworktime: int = None,
childlock: bool = None,
pwr: Optional[bool] = None,
ntlight: Optional[bool] = None,
indicator: Optional[bool] = None,
ntlbrightness: Optional[int] = None,
maxworktime: Optional[int] = None,
childlock: Optional[bool] = None,
) -> dict:
"""Set state of device."""
state = {}
@ -186,7 +187,7 @@ class sp4(Device):
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
state = json.loads(payload[0x0C:0x0C+js_len])
return state
@ -234,7 +235,7 @@ class sp4b(sp4):
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
state = json.loads(payload[0x0E:0x0E+js_len])
return state
@ -255,13 +256,13 @@ class bg1(Device):
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
pwr: Optional[bool] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
maxworktime: Optional[int] = None,
maxworktime1: Optional[int] = None,
maxworktime2: Optional[int] = None,
idcbrightness: Optional[int] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
@ -291,7 +292,16 @@ class bg1(Device):
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
"<HHHHBBI",
packet,
0,
length,
0xA5A5,
0x5A5A,
0x0000,
flag,
0x0B,
len(data),
)
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
@ -302,10 +312,66 @@ class bg1(Device):
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
state = json.loads(payload[0x0E:0x0E+js_len])
return state
class ehc31(bg1):
"""Controls a BG Electrical smart extension lead."""
TYPE = "EHC31"
def set_state(
self,
pwr: Optional[bool] = None,
pwr1: Optional[bool] = None,
pwr2: Optional[bool] = None,
pwr3: Optional[bool] = None,
maxworktime1: Optional[int] = None,
maxworktime2: Optional[int] = None,
maxworktime3: Optional[int] = None,
idcbrightness: Optional[int] = None,
childlock: Optional[bool] = None,
childlock1: Optional[bool] = None,
childlock2: Optional[bool] = None,
childlock3: Optional[bool] = None,
childlock4: Optional[bool] = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if pwr3 is not None:
state["pwr3"] = int(bool(pwr3))
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if maxworktime3 is not None:
state["maxworktime3"] = maxworktime3
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness
if childlock is not None:
state["childlock"] = int(bool(childlock))
if childlock1 is not None:
state["childlock1"] = int(bool(childlock1))
if childlock2 is not None:
state["childlock2"] = int(bool(childlock2))
if childlock3 is not None:
state["childlock3"] = int(bool(childlock3))
if childlock4 is not None:
state["childlock4"] = int(bool(childlock4))
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
class mp1(Device):
"""Controls a Broadlink MP1."""
@ -360,3 +426,47 @@ class mp1(Device):
"s3": bool(data & 4),
"s4": bool(data & 8),
}
class mp1s(mp1):
"""Controls a Broadlink MP1S."""
TYPE = "MP1S"
def get_state(self) -> dict:
"""Return the power state of the device.
voltage in V.
current in A.
power in W.
power consumption in kW·h.
"""
packet = bytearray(16)
packet[0x00] = 0x0E
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2
packet[0x07] = 0xC0
packet[0x08] = 0x01
packet[0x0A] = 0x04
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
payload_str = payload.hex()[4:-6]
def get_value(start, end, factors):
value = sum(
int(payload_str[i-2:i]) * factor
for i, factor in zip(range(start, end, -2), factors)
)
return value
return {
"volt": get_value(34, 30, [10, 0.1]),
"current": get_value(40, 34, [1, 0.01, 0.0001]),
"power": get_value(46, 40, [100, 1, 0.01]),
"totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]),
}

View File

@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature
#### Check humidity
```
broadlink_cli --device @BEDROOM.device --temperature
broadlink_cli --device @BEDROOM.device --humidity
```
### Smart plugs

View File

@ -1,68 +1,32 @@
#!/usr/bin/env python3
import argparse
import base64
import codecs
import time
from typing import List
import broadlink
from broadlink.const import DEFAULT_PORT
from broadlink.exceptions import ReadError, StorageError
from broadlink.remote import data_to_pulses, pulses_to_data
TICK = 32.84
TIMEOUT = 30
IR_TOKEN = 0x26
def auto_int(x):
return int(x, 0)
def to_microseconds(bytes):
result = []
# print bytes[0] # 0x26 = 38for IR
index = 4
while index < len(bytes):
chunk = bytes[index]
index += 1
if chunk == 0:
chunk = bytes[index]
chunk = 256 * chunk + bytes[index + 1]
index += 2
result.append(int(round(chunk * TICK)))
if chunk == 0x0d05:
break
return result
def format_pulses(pulses: List[int]) -> str:
"""Format pulses."""
return " ".join(
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
for i, pulse in enumerate(pulses)
)
def durations_to_broadlink(durations):
result = bytearray()
result.append(IR_TOKEN)
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num % 256)
return result
def format_durations(data):
result = ''
for i in range(0, len(data)):
if len(result) > 0:
result += ' '
result += ('+' if i % 2 == 0 else '-') + str(data[i])
return result
def parse_durations(str):
result = []
for s in str.split():
result.append(abs(int(s)))
return result
def parse_pulses(data: List[str]) -> List[int]:
"""Parse pulses."""
return [abs(int(s)) for s in data]
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
@ -83,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on
parser.add_argument("--send", action="store_true", help="send command")
parser.add_argument("--sensors", action="store_true", help="check all sensors")
parser.add_argument("--learn", action="store_true", help="learn command")
parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning")
parser.add_argument("--rflearn", action="store_true", help="rf scan learning")
parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning")
parser.add_argument("--learnfile", help="save learned command to a specified file")
parser.add_argument("--durations", action="store_true",
help="use durations in micro seconds instead of the Broadlink format")
@ -111,8 +76,8 @@ if args.joinwifi:
if args.convert:
data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data)
print(format_durations(durations))
pulses = data_to_pulses(data)
print(format_pulses(pulses))
if args.temperature:
print(dev.check_temperature())
if args.humidity:
@ -124,10 +89,13 @@ if args.sensors:
for key in data:
print("{} {}".format(key, data[key]))
if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
if args.durations else bytearray.fromhex(''.join(args.data))
data = (
pulses_to_data(parse_pulses(args.data))
if args.durations
else bytes.fromhex(''.join(args.data))
)
dev.send_data(data)
if args.learn or (args.learnfile and not args.rfscanlearn):
if args.learn or (args.learnfile and not args.rflearn):
dev.enter_learning()
print("Learning...")
start = time.time()
@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn):
print("No data received...")
exit(1)
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)
if args.check:
if dev.check_power():
print('* ON *')
@ -195,28 +165,33 @@ if args.switch:
else:
dev.set_power(True)
print('* Switch to ON *')
if args.rfscanlearn:
dev.sweep_frequency()
print("Learning RF Frequency, press and hold the button to learn...")
start = time.time()
while time.time() - start < TIMEOUT:
time.sleep(1)
if dev.check_frequency():
break
if args.rflearn:
if args.frequency:
frequency = args.frequency
print("Press the button you want to learn, a short press...")
else:
print("RF Frequency not found")
dev.cancel_sweep_frequency()
exit(1)
dev.sweep_frequency()
print("Detecting radiofrequency, press and hold the button to learn...")
print("Found RF Frequency - 1 of 2!")
print("You can now let go of the button")
start = time.time()
while time.time() - start < TIMEOUT:
time.sleep(1)
locked, frequency = dev.check_frequency()
if locked:
break
else:
print("Radiofrequency not found")
dev.cancel_sweep_frequency()
exit(1)
input("Press enter to continue...")
print("Radiofrequency detected: {}MHz".format(frequency))
print("You can now let go of the button")
print("To complete learning, single press the button you want to learn")
input("Press enter to continue...")
dev.find_rf_packet()
print("Press the button again, now a short press.")
dev.find_rf_packet(frequency)
start = time.time()
while time.time() - start < TIMEOUT:
@ -231,15 +206,16 @@ if args.rfscanlearn:
print("No data received...")
exit(1)
print("Found RF Frequency - 2 of 2!")
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learnfile is None:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0]))))
if args.learnfile is not None:
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)

View File

@ -5,7 +5,7 @@
from setuptools import setup, find_packages
version = '0.18.3'
version = '0.19.0'
setup(
name="broadlink",