Add support for Broadlink SP4M-US (0x648b) (#447)
This commit is contained in:
parent
c8a5ea02fd
commit
957baf5447
|
@ -11,7 +11,7 @@ from .exceptions import exception
|
|||
from .light import lb1
|
||||
from .remote import rm, rm4
|
||||
from .sensor import a1
|
||||
from .switch import bg1, mp1, sp1, sp2, sp4
|
||||
from .switch import bg1, mp1, sp1, sp2, sp4l, sp4m
|
||||
|
||||
|
||||
SUPPORTED_TYPES = {
|
||||
|
@ -40,9 +40,10 @@ SUPPORTED_TYPES = {
|
|||
0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"),
|
||||
0x9479: (sp2, "SP3S-US", "Broadlink"),
|
||||
0x947a: (sp2, "SP3S-EU", "Broadlink"),
|
||||
0x7579: (sp4, "SP4L-EU", "Broadlink"),
|
||||
0x7583: (sp4, "SP mini 3", "Broadlink (OEM)"),
|
||||
0x7d11: (sp4, "SP mini 3", "Broadlink (OEM)"),
|
||||
0x648b: (sp4m, "SP4M-US", "Broadlink"),
|
||||
0x7579: (sp4l, "SP4L-EU", "Broadlink"),
|
||||
0x7583: (sp4l, "SP mini 3", "Broadlink"),
|
||||
0x7d11: (sp4l, "SP mini 3", "Broadlink"),
|
||||
0x2712: (rm, "RM pro/pro+", "Broadlink"),
|
||||
0x272a: (rm, "RM pro", "Broadlink"),
|
||||
0x2737: (rm, "RM mini 3", "Broadlink"),
|
||||
|
|
|
@ -219,13 +219,13 @@ class sp2(device):
|
|||
return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
|
||||
|
||||
|
||||
class sp4(device):
|
||||
"""Controls a Broadlink SP4."""
|
||||
class sp4l(device):
|
||||
"""Controls a Broadlink SP4L."""
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
"""Initialize the controller."""
|
||||
device.__init__(self, *args, **kwargs)
|
||||
self.type = "SP4"
|
||||
self.type = "SP4L"
|
||||
|
||||
def set_power(self, state: bool) -> None:
|
||||
"""Set the power state of the device."""
|
||||
|
@ -242,6 +242,7 @@ class sp4(device):
|
|||
indicator: bool = None,
|
||||
ntlbrightness: int = None,
|
||||
maxworktime: int = None,
|
||||
childlock: bool = None,
|
||||
) -> dict:
|
||||
"""Set state of device."""
|
||||
data = {}
|
||||
|
@ -255,9 +256,10 @@ class sp4(device):
|
|||
data["ntlbrightness"] = ntlbrightness
|
||||
if maxworktime is not None:
|
||||
data["maxworktime"] = maxworktime
|
||||
if childlock is not None:
|
||||
data["childlock"] = int(bool(childlock))
|
||||
|
||||
js = json.dumps(data).encode("utf8")
|
||||
packet = self._encode(2, js)
|
||||
packet = self._encode(2, data)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
return self._decode(response)
|
||||
|
||||
|
@ -267,36 +269,25 @@ class sp4(device):
|
|||
return state["pwr"]
|
||||
|
||||
def check_nightlight(self) -> bool:
|
||||
"""Return the night light state of the device."""
|
||||
"""Return the state of the night light."""
|
||||
state = self.get_state()
|
||||
return state["ntlight"]
|
||||
|
||||
def get_state(self) -> dict:
|
||||
"""Get full state of device."""
|
||||
packet = self._encode(1, b"{}")
|
||||
packet = self._encode(1, {})
|
||||
response = self.send_packet(0x6A, packet)
|
||||
return self._decode(response)
|
||||
|
||||
def _encode(self, flag: int, js: str) -> bytes:
|
||||
def _encode(self, flag: int, state: dict) -> bytes:
|
||||
"""Encode a message."""
|
||||
# SP4 support added by Petter Olofsson
|
||||
# packet format is:
|
||||
# 0x00-0x03 header 0xa5a5, 0x5a5a
|
||||
# 0x04-0x05 "0xbeaf" checksum
|
||||
# 0x06 flag (1 for read or 2 write?)
|
||||
# 0x07 unknown (0xb)
|
||||
# 0x08-0x0b length of json
|
||||
# 0x0c- json data
|
||||
packet = bytearray(14)
|
||||
payload = json.dumps(state, separators=(",", ":")).encode()
|
||||
packet = bytearray(12)
|
||||
struct.pack_into(
|
||||
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(js)
|
||||
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(payload)
|
||||
)
|
||||
for i in range(len(js)):
|
||||
packet.append(js[i])
|
||||
|
||||
checksum = 0xBEAF
|
||||
for c in packet:
|
||||
checksum = (checksum + c) & 0xFFFF
|
||||
packet.extend(payload)
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x04] = checksum & 0xFF
|
||||
packet[0x05] = checksum >> 8
|
||||
return packet
|
||||
|
@ -304,8 +295,36 @@ class sp4(device):
|
|||
def _decode(self, response: bytes) -> dict:
|
||||
"""Decode a message."""
|
||||
check_error(response[0x22:0x24])
|
||||
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
return state
|
||||
|
||||
|
||||
class sp4m(sp4l):
|
||||
"""Controls a Broadlink SP4M."""
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
"""Initialize the controller."""
|
||||
device.__init__(self, *args, **kwargs)
|
||||
self.type = "SP4M"
|
||||
|
||||
def _encode(self, flag: int, state: dict) -> bytes:
|
||||
"""Encode a message."""
|
||||
payload = json.dumps(state, separators=(",", ":")).encode()
|
||||
packet = bytearray(14)
|
||||
length = 4 + 2 + 2 + 4 + len(payload)
|
||||
struct.pack_into('<HHHHBBI', packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(payload))
|
||||
packet.extend(payload)
|
||||
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
|
||||
packet[0x06] = checksum & 0xFF
|
||||
packet[0x07] = checksum >> 8
|
||||
return packet
|
||||
|
||||
def _decode(self, response: bytes) -> dict:
|
||||
"""Decode a message."""
|
||||
check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||
state = json.loads(payload[0x0E : 0x0E + js_len])
|
||||
return state
|
||||
|
|
Loading…
Reference in New Issue