mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-21 22:51:41 +01:00
Add sp4 refactored (#429)
* Add SP4 device class * Remove legacy byte conversion * Implement suggested improvements * Apply some love to _encode() and _decode() too. * Remove unnecessary safeguards * Remove unnecessary return values * Add missing periods to the comments. Co-authored-by: Martin Samuelsson <httpov@columbiegg.com>
This commit is contained in:
parent
6ab23e9261
commit
9248ee6b0c
@ -13,7 +13,7 @@ from .helpers import get_local_ip
|
|||||||
from .light import lb1
|
from .light import lb1
|
||||||
from .remote import rm, rm2, rm4
|
from .remote import rm, rm2, rm4
|
||||||
from .sensor import a1
|
from .sensor import a1
|
||||||
from .switch import bg1, mp1, sp1, sp2
|
from .switch import bg1, mp1, sp1, sp2, sp4
|
||||||
|
|
||||||
|
|
||||||
def get_devices() -> Dict[int, Tuple[Type[device], str, str]]:
|
def get_devices() -> Dict[int, Tuple[Type[device], str, str]]:
|
||||||
@ -44,6 +44,7 @@ def get_devices() -> Dict[int, Tuple[Type[device], str, str]]:
|
|||||||
0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"),
|
0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"),
|
||||||
0x9479: (sp2, "SP3S-US", "Broadlink"),
|
0x9479: (sp2, "SP3S-US", "Broadlink"),
|
||||||
0x947a: (sp2, "SP3S-EU", "Broadlink"),
|
0x947a: (sp2, "SP3S-EU", "Broadlink"),
|
||||||
|
0x7579: (sp4, "SP4L-EU", "Broadlink"),
|
||||||
0x2712: (rm, "RM pro/pro+", "Broadlink"),
|
0x2712: (rm, "RM pro/pro+", "Broadlink"),
|
||||||
0x272a: (rm, "RM pro", "Broadlink"),
|
0x272a: (rm, "RM pro", "Broadlink"),
|
||||||
0x2737: (rm, "RM mini 3", "Broadlink"),
|
0x2737: (rm, "RM mini 3", "Broadlink"),
|
||||||
|
@ -216,3 +216,95 @@ class sp2(device):
|
|||||||
check_error(response[0x22:0x24])
|
check_error(response[0x22:0x24])
|
||||||
payload = self.decrypt(response[0x38:])
|
payload = self.decrypt(response[0x38:])
|
||||||
return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
|
return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
|
||||||
|
|
||||||
|
|
||||||
|
class sp4(device):
|
||||||
|
"""Controls a Broadlink SP4."""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
"""Initialize the controller."""
|
||||||
|
device.__init__(self, *args, **kwargs)
|
||||||
|
self.type = "SP4"
|
||||||
|
|
||||||
|
def set_power(self, state: bool) -> None:
|
||||||
|
"""Set the power state of the device."""
|
||||||
|
self.set_state(pwr=state)
|
||||||
|
|
||||||
|
def set_nightlight(self, state: bool) -> None:
|
||||||
|
"""Set the night light state of the device."""
|
||||||
|
self.set_state(ntlight=state)
|
||||||
|
|
||||||
|
def set_state(
|
||||||
|
self,
|
||||||
|
pwr: bool = None,
|
||||||
|
ntlight: bool = None,
|
||||||
|
indicator: bool = None,
|
||||||
|
ntlbrightness: int = None,
|
||||||
|
maxworktime: int = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Set state of device."""
|
||||||
|
data = {}
|
||||||
|
if pwr is not None:
|
||||||
|
data["pwr"] = pwr
|
||||||
|
if ntlight is not None:
|
||||||
|
data["ntlight"] = ntlight
|
||||||
|
if indicator is not None:
|
||||||
|
data["indicator"] = indicator
|
||||||
|
if ntlbrightness is not None:
|
||||||
|
data["ntlbrightness"] = ntlbrightness
|
||||||
|
if maxworktime is not None:
|
||||||
|
data["maxworktime"] = maxworktime
|
||||||
|
|
||||||
|
js = json.dumps(data).encode("utf8")
|
||||||
|
packet = self._encode(2, js)
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def check_power(self) -> bool:
|
||||||
|
"""Return the power state of the device."""
|
||||||
|
state = self.get_state()
|
||||||
|
return state["pwr"]
|
||||||
|
|
||||||
|
def check_nightlight(self) -> bool:
|
||||||
|
"""Return the night light state of the device."""
|
||||||
|
state = self.get_state()
|
||||||
|
return state["ntlight"]
|
||||||
|
|
||||||
|
def get_state(self) -> dict:
|
||||||
|
"""Get full state of device."""
|
||||||
|
packet = self._encode(1, b"{}")
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def _encode(self, flag: int, js: str) -> bytes:
|
||||||
|
"""Encode a message."""
|
||||||
|
# SP4 support added by Petter Olofsson
|
||||||
|
# packet format is:
|
||||||
|
# 0x00-0x03 header 0xa5a5, 0x5a5a
|
||||||
|
# 0x04-0x05 "0xbeaf" checksum
|
||||||
|
# 0x06 flag (1 for read or 2 write?)
|
||||||
|
# 0x07 unknown (0xb)
|
||||||
|
# 0x08-0x0b length of json
|
||||||
|
# 0x0c- json data
|
||||||
|
packet = bytearray(14)
|
||||||
|
struct.pack_into(
|
||||||
|
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(js)
|
||||||
|
)
|
||||||
|
for i in range(len(js)):
|
||||||
|
packet.append(js[i])
|
||||||
|
|
||||||
|
checksum = 0xBEAF
|
||||||
|
for c in packet:
|
||||||
|
checksum = (checksum + c) & 0xFFFF
|
||||||
|
packet[0x04] = checksum & 0xFF
|
||||||
|
packet[0x05] = checksum >> 8
|
||||||
|
return packet
|
||||||
|
|
||||||
|
def _decode(self, response: bytes) -> dict:
|
||||||
|
"""Decode a message."""
|
||||||
|
check_error(response[0x22:0x24])
|
||||||
|
|
||||||
|
payload = self.decrypt(response[0x38:])
|
||||||
|
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||||
|
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||||
|
return state
|
||||||
|
Loading…
Reference in New Issue
Block a user