mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-22 07:00:12 +01:00
Add support for Broadlink LB27 R1 (0xA4F4) (#557)
* Add support for Broadlink LB27 R1 (0xA4F4) * Improve typing
This commit is contained in:
parent
f3e4edcad4
commit
2198400ad6
@ -8,7 +8,7 @@ from .climate import hysen
|
|||||||
from .cover import dooya
|
from .cover import dooya
|
||||||
from .device import device, ping, scan
|
from .device import device, ping, scan
|
||||||
from .exceptions import exception
|
from .exceptions import exception
|
||||||
from .light import lb1
|
from .light import lb1, lb27r1
|
||||||
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
|
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
|
||||||
from .sensor import a1
|
from .sensor import a1
|
||||||
from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
|
from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
|
||||||
@ -105,6 +105,7 @@ SUPPORTED_TYPES = {
|
|||||||
0x60C7: (lb1, "LB1", "Broadlink"),
|
0x60C7: (lb1, "LB1", "Broadlink"),
|
||||||
0x60C8: (lb1, "LB1", "Broadlink"),
|
0x60C8: (lb1, "LB1", "Broadlink"),
|
||||||
0x6112: (lb1, "LB1", "Broadlink"),
|
0x6112: (lb1, "LB1", "Broadlink"),
|
||||||
|
0xA4F4: (lb27r1, "LB27 R1", "Broadlink"),
|
||||||
0x2722: (S1C, "S2KIT", "Broadlink"),
|
0x2722: (S1C, "S2KIT", "Broadlink"),
|
||||||
0x4EAD: (hysen, "HY02B05H", "Hysen"),
|
0x4EAD: (hysen, "HY02B05H", "Hysen"),
|
||||||
0x4E4D: (dooya, "DT360E-45/20", "Dooya"),
|
0x4E4D: (dooya, "DT360E-45/20", "Dooya"),
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
import enum
|
import enum
|
||||||
import json
|
import json
|
||||||
import struct
|
import struct
|
||||||
import typing
|
import typing as t
|
||||||
|
|
||||||
from .device import device
|
from .device import device
|
||||||
from .exceptions import check_error
|
from .exceptions import check_error
|
||||||
@ -58,32 +58,32 @@ class lb1(device):
|
|||||||
if green is not None:
|
if green is not None:
|
||||||
state["green"] = int(green)
|
state["green"] = int(green)
|
||||||
if brightness is not None:
|
if brightness is not None:
|
||||||
state["brightness"] = brightness
|
state["brightness"] = int(brightness)
|
||||||
if colortemp is not None:
|
if colortemp is not None:
|
||||||
state["colortemp"] = colortemp
|
state["colortemp"] = int(colortemp)
|
||||||
if hue is not None:
|
if hue is not None:
|
||||||
state["hue"] = hue
|
state["hue"] = int(hue)
|
||||||
if saturation is not None:
|
if saturation is not None:
|
||||||
state["saturation"] = saturation
|
state["saturation"] = int(saturation)
|
||||||
if transitionduration is not None:
|
if transitionduration is not None:
|
||||||
state["transitionduration"] = transitionduration
|
state["transitionduration"] = int(transitionduration)
|
||||||
if maxworktime is not None:
|
if maxworktime is not None:
|
||||||
state["maxworktime"] = maxworktime
|
state["maxworktime"] = int(maxworktime)
|
||||||
if bulb_colormode is not None:
|
if bulb_colormode is not None:
|
||||||
state["bulb_colormode"] = bulb_colormode
|
state["bulb_colormode"] = int(bulb_colormode)
|
||||||
if bulb_scenes is not None:
|
if bulb_scenes is not None:
|
||||||
state["bulb_scenes"] = bulb_scenes
|
state["bulb_scenes"] = str(bulb_scenes)
|
||||||
if bulb_scene is not None:
|
if bulb_scene is not None:
|
||||||
state["bulb_scene"] = bulb_scene
|
state["bulb_scene"] = str(bulb_scene)
|
||||||
if bulb_sceneidx is not None:
|
if bulb_sceneidx is not None:
|
||||||
state["bulb_sceneidx"] = bulb_sceneidx
|
state["bulb_sceneidx"] = int(bulb_sceneidx)
|
||||||
|
|
||||||
packet = self._encode(2, state)
|
packet = self._encode(2, state)
|
||||||
response = self.send_packet(0x6A, packet)
|
response = self.send_packet(0x6A, packet)
|
||||||
check_error(response[0x22:0x24])
|
check_error(response[0x22:0x24])
|
||||||
return self._decode(response)
|
return self._decode(response)
|
||||||
|
|
||||||
def _encode(self, flag: int, obj: typing.Any) -> bytes:
|
def _encode(self, flag: int, obj: t.Any) -> bytes:
|
||||||
"""Encode a JSON packet."""
|
"""Encode a JSON packet."""
|
||||||
# flag: 1 for reading, 2 for writing.
|
# flag: 1 for reading, 2 for writing.
|
||||||
packet = bytearray(14)
|
packet = bytearray(14)
|
||||||
@ -97,9 +97,102 @@ class lb1(device):
|
|||||||
packet[0x6:0x8] = checksum.to_bytes(2, "little")
|
packet[0x6:0x8] = checksum.to_bytes(2, "little")
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
def _decode(self, response: bytes) -> typing.Any:
|
def _decode(self, response: bytes) -> t.Any:
|
||||||
"""Decode a JSON packet."""
|
"""Decode a JSON packet."""
|
||||||
payload = self.decrypt(response[0x38:])
|
payload = self.decrypt(response[0x38:])
|
||||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||||
state = json.loads(payload[0xE : 0xE + js_len])
|
state = json.loads(payload[0xE : 0xE + js_len])
|
||||||
return state
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
class lb27r1(device):
|
||||||
|
"""Controls a Broadlink LB27 R1."""
|
||||||
|
|
||||||
|
TYPE = "LB27R1"
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class ColorMode(enum.IntEnum):
|
||||||
|
"""Enumerates color modes."""
|
||||||
|
RGB = 0
|
||||||
|
WHITE = 1
|
||||||
|
SCENE = 2
|
||||||
|
|
||||||
|
def get_state(self) -> dict:
|
||||||
|
"""Return the power state of the device.
|
||||||
|
|
||||||
|
Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}`
|
||||||
|
"""
|
||||||
|
packet = self._encode(1, {})
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
check_error(response[0x22:0x24])
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def set_state(
|
||||||
|
self,
|
||||||
|
pwr: bool = None,
|
||||||
|
red: int = None,
|
||||||
|
blue: int = None,
|
||||||
|
green: int = None,
|
||||||
|
brightness: int = None,
|
||||||
|
colortemp: int = None,
|
||||||
|
hue: int = None,
|
||||||
|
saturation: int = None,
|
||||||
|
transitionduration: int = None,
|
||||||
|
maxworktime: int = None,
|
||||||
|
bulb_colormode: int = None,
|
||||||
|
bulb_scenes: str = None,
|
||||||
|
bulb_scene: str = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Set the power state of the device."""
|
||||||
|
state = {}
|
||||||
|
if pwr is not None:
|
||||||
|
state["pwr"] = int(bool(pwr))
|
||||||
|
if red is not None:
|
||||||
|
state["red"] = int(red)
|
||||||
|
if blue is not None:
|
||||||
|
state["blue"] = int(blue)
|
||||||
|
if green is not None:
|
||||||
|
state["green"] = int(green)
|
||||||
|
if brightness is not None:
|
||||||
|
state["brightness"] = int(brightness)
|
||||||
|
if colortemp is not None:
|
||||||
|
state["colortemp"] = int(colortemp)
|
||||||
|
if hue is not None:
|
||||||
|
state["hue"] = int(hue)
|
||||||
|
if saturation is not None:
|
||||||
|
state["saturation"] = int(saturation)
|
||||||
|
if transitionduration is not None:
|
||||||
|
state["transitionduration"] = int(transitionduration)
|
||||||
|
if maxworktime is not None:
|
||||||
|
state["maxworktime"] = int(maxworktime)
|
||||||
|
if bulb_colormode is not None:
|
||||||
|
state["bulb_colormode"] = int(bulb_colormode)
|
||||||
|
if bulb_scenes is not None:
|
||||||
|
state["bulb_scenes"] = str(bulb_scenes)
|
||||||
|
if bulb_scene is not None:
|
||||||
|
state["bulb_scene"] = str(bulb_scene)
|
||||||
|
|
||||||
|
packet = self._encode(2, state)
|
||||||
|
response = self.send_packet(0x6A, packet)
|
||||||
|
check_error(response[0x22:0x24])
|
||||||
|
return self._decode(response)
|
||||||
|
|
||||||
|
def _encode(self, flag: int, obj: t.Any) -> bytes:
|
||||||
|
"""Encode a JSON packet."""
|
||||||
|
# flag: 1 for reading, 2 for writing.
|
||||||
|
packet = bytearray(12)
|
||||||
|
js = json.dumps(obj, separators=[',', ':']).encode()
|
||||||
|
struct.pack_into(
|
||||||
|
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
|
||||||
|
)
|
||||||
|
packet += js
|
||||||
|
checksum = sum(packet[0x6:], 0xC0AD) & 0xFFFF
|
||||||
|
packet[0x4:0x6] = checksum.to_bytes(2, "little")
|
||||||
|
return packet
|
||||||
|
|
||||||
|
def _decode(self, response: bytes) -> t.Any:
|
||||||
|
"""Decode a JSON packet."""
|
||||||
|
payload = self.decrypt(response[0x38:])
|
||||||
|
js_len = struct.unpack_from("<I", payload, 0x8)[0]
|
||||||
|
state = json.loads(payload[0xC : 0xC + js_len])
|
||||||
|
return state
|
||||||
|
Loading…
Reference in New Issue
Block a user