1
0
Fork 0

Compare commits

...

6 Commits

Author SHA1 Message Date
Felipe Martins Diel 5d53fce011
Merge ab4311e799 into fa44b54d88 2024-04-12 02:27:40 -03:00
Felipe Martins Diel fa44b54d88
Add support for Broadlink A2 (#791)
* Add support for Broadlink A2

* Add supported type

* Fix bugs

* Improve device name
2024-04-12 02:10:06 -03:00
Felipe Martins Diel 24b9d308b6
Fix s3.get_subdevices() (#790)
* Fix s3.get_subdevices()

* Fix docstring
2024-04-10 23:55:41 -03:00
Felipe Martins Diel eb0f98a410
Fix README.md (#789) 2024-04-10 23:15:46 -03:00
Felipe Martins Diel 247be74c33
Expose IR/RF conversion functions (#788)
* Move IR duration<->Broadlink conversion down from CLI

* Fix --learn base64 to not crash with --durations

Also remove its b'...' wrapping.

* Fix IR/RF conversions

---------

Co-authored-by: William Grant <me@williamgrant.id.au>
2024-04-10 22:51:41 -03:00
Felipe Martins Diel ab4311e799 Add support for the FastCon technology 2022-03-19 19:53:29 -03:00
8 changed files with 218 additions and 123 deletions

View File

@ -12,7 +12,7 @@ from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1
from .sensor import a1, a2
from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
SUPPORTED_TYPES = {
@ -142,7 +142,10 @@ SUPPORTED_TYPES = {
0x653C: ("RM4 pro", "Broadlink"),
},
a1: {
0x2714: ("e-Sensor", "Broadlink"),
0x2714: ("A1", "Broadlink"),
},
a2: {
0x4F60: ("A2", "Broadlink"),
},
mp1: {
0x4EB5: ("MP1-1K4S", "Broadlink"),

View File

@ -63,31 +63,32 @@ class dooya2(Device):
TYPE = "DT360E-2"
def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
@ -119,31 +120,32 @@ class wser(Device):
TYPE = "WSER"
def _send(self, operation: int, data: bytes):
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(14)
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
@ -156,24 +158,24 @@ class wser(Device):
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position

View File

@ -12,22 +12,34 @@ class s3(Device):
TYPE = "S3"
MAX_SUBDEVICES = 8
def get_subdevices(self) -> list:
"""Return the lit of sub devices."""
def get_subdevices(self, step: int = 5) -> list:
"""Return a list of sub devices."""
total = self.MAX_SUBDEVICES
sub_devices = []
step = 5
seen = set()
index = 0
for index in range(0, self.MAX_SUBDEVICES, step):
while index < total:
state = {"count": step, "index": index}
packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
resp = self._decode(resp)
sub_devices.extend(resp["list"])
if len(sub_devices) == resp["total"]:
for device in resp["list"]:
did = device["did"]
if did in seen:
continue
seen.add(did)
sub_devices.append(device)
total = resp["total"]
if len(seen) >= total:
break
index += step
return sub_devices
def get_state(self, did: str = None) -> dict:

View File

@ -6,6 +6,47 @@ from . import exceptions as e
from .device import Device
def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26
for pulse in pulses:
div, mod = divmod(int(pulse // tick), 256)
if div:
result.append(0)
result.append(div)
result.append(mod)
data_len = len(result) - 4
result[0x02] = data_len & 0xFF
result[0x03] = data_len >> 8
return result
def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
end = min(256 * data[0x03] + data[0x02] + 4, len(data))
while index < end:
chunk = data[index]
index += 1
if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError:
raise ValueError("Malformed data.")
index += 2
result.append(int(chunk * tick))
return result
class rmmini(Device):
"""Controls a Broadlink RM mini 3."""

View File

@ -1,6 +1,4 @@
"""Support for sensors."""
import struct
from . import exceptions as e
from .device import Device
@ -29,19 +27,62 @@ class a1(Device):
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]
temperature = struct.unpack("<bb", data[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
humidity = data[0x2] + data[0x3] / 10.0
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
data = self.decrypt(resp[0x38:])
return {
"temperature": temperature,
"humidity": humidity,
"light": data[0x4],
"air_quality": data[0x6],
"noise": data[0x8],
"temperature": data[0x04] + data[0x05] / 10.0,
"humidity": data[0x06] + data[0x07] / 10.0,
"light": data[0x08],
"air_quality": data[0x0A],
"noise": data[0x0C],
}
class a2(Device):
"""Controls a Broadlink A2."""
TYPE = "A2"
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
data = self._send(1)
return {
"temperature": data[0x13] * 256 + data[0x14],
"humidity": data[0x15] * 256 + data[0x16],
"pm10": data[0x0D] * 256 + data[0x0E],
"pm2_5": data[0x0F] * 256 + data[0x10],
"pm1": data[0x11] * 256 + data[0x12],
}

View File

@ -116,6 +116,25 @@ class sp4(Device):
"""Controls a Broadlink SP4."""
TYPE = "SP4"
MAX_SUBDEVICES = 8
def get_subdevices(self) -> list:
"""Return the lit of sub devices."""
sub_devices = []
step = 5
for index in range(0, self.MAX_SUBDEVICES, step):
state = {"count": step, "index": index}
packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
resp = self._decode(resp)
sub_devices.extend(resp["list"])
if len(sub_devices) == resp["total"]:
break
return sub_devices
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
@ -127,6 +146,7 @@ class sp4(Device):
def set_state(
self,
did: str = None,
pwr: bool = None,
ntlight: bool = None,
indicator: bool = None,
@ -136,6 +156,8 @@ class sp4(Device):
) -> dict:
"""Set state of device."""
state = {}
if did is not None:
state["did"] = did
if pwr is not None:
state["pwr"] = int(bool(pwr))
if ntlight is not None:
@ -163,9 +185,13 @@ class sp4(Device):
state = self.get_state()
return bool(state["ntlight"])
def get_state(self) -> dict:
"""Get full state of device."""
packet = self._encode(1, {})
def get_state(self, did: str = None) -> dict:
"""Return the state of the device."""
state = {}
if did is not None:
state["did"] = did
packet = self._encode(1, state)
response = self.send_packet(0x6A, packet)
return self._decode(response)

View File

@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature
#### Check humidity
```
broadlink_cli --device @BEDROOM.device --temperature
broadlink_cli --device @BEDROOM.device --humidity
```
### Smart plugs

View File

@ -1,68 +1,32 @@
#!/usr/bin/env python3
import argparse
import base64
import codecs
import time
import typing as t
import broadlink
from broadlink.const import DEFAULT_PORT
from broadlink.exceptions import ReadError, StorageError
from broadlink.remote import data_to_pulses, pulses_to_data
TICK = 32.84
TIMEOUT = 30
IR_TOKEN = 0x26
def auto_int(x):
return int(x, 0)
def to_microseconds(bytes):
result = []
# print bytes[0] # 0x26 = 38for IR
index = 4
while index < len(bytes):
chunk = bytes[index]
index += 1
if chunk == 0:
chunk = bytes[index]
chunk = 256 * chunk + bytes[index + 1]
index += 2
result.append(int(round(chunk * TICK)))
if chunk == 0x0d05:
break
return result
def format_pulses(pulses: t.List[int]) -> str:
"""Format pulses."""
return " ".join(
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
for i, pulse in enumerate(pulses)
)
def durations_to_broadlink(durations):
result = bytearray()
result.append(IR_TOKEN)
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num % 256)
return result
def format_durations(data):
result = ''
for i in range(0, len(data)):
if len(result) > 0:
result += ' '
result += ('+' if i % 2 == 0 else '-') + str(data[i])
return result
def parse_durations(str):
result = []
for s in str.split():
result.append(abs(int(s)))
return result
def parse_pulses(data: t.List[str]) -> t.List[int]:
"""Parse pulses."""
return [abs(int(s)) for s in data]
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
@ -112,8 +76,8 @@ if args.joinwifi:
if args.convert:
data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data)
print(format_durations(durations))
pulses = data_to_pulses(data)
print(format_pulses(pulses))
if args.temperature:
print(dev.check_temperature())
if args.humidity:
@ -125,8 +89,11 @@ if args.sensors:
for key in data:
print("{} {}".format(key, data[key]))
if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
if args.durations else bytearray.fromhex(''.join(args.data))
data = (
pulses_to_data(parse_pulses(args.data))
if args.durations
else bytes.fromhex(''.join(args.data))
)
dev.send_data(data)
if args.learn or (args.learnfile and not args.rflearn):
dev.enter_learning()
@ -144,17 +111,19 @@ if args.learn or (args.learnfile and not args.rflearn):
print("No data received...")
exit(1)
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)
if args.check:
if dev.check_power():
print('* ON *')
@ -238,14 +207,15 @@ if args.rflearn:
exit(1)
print("Packet found!")
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learnfile is None:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0]))))
if args.learnfile is not None:
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)