mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-10 18:00:12 +01:00
Refactor the LB1 class (#517)
* Refactor the LB1 class * General improvements * Enumerate bulb color modes * Clean up encoder Co-authored-by: Felipe Martins Diel <felipe-diel@hotmail.com>
This commit is contained in:
parent
b33bbdbbcf
commit
9a04f68212
17
README.md
17
README.md
@ -129,3 +129,20 @@ Check power state on a SmartPowerStrip:
|
||||
```
|
||||
state = devices[0].check_power()
|
||||
```
|
||||
|
||||
Get state on a bulb
|
||||
```
|
||||
state=devices[0].get_state()
|
||||
```
|
||||
|
||||
Set a state on a bulb
|
||||
```
|
||||
devices[0].set_state(pwr=0)
|
||||
devices[0].set_state(pwr=1)
|
||||
devices[0].set_state(brightness=75)
|
||||
devices[0].set_state(bulb_colormode=0)
|
||||
devices[0].set_state(blue=255)
|
||||
devices[0].set_state(red=0)
|
||||
devices[0].set_state(green=128)
|
||||
devices[0].set_state(bulb_colormode=1)
|
||||
```
|
@ -1,6 +1,8 @@
|
||||
"""Support for lights."""
|
||||
import enum
|
||||
import json
|
||||
from typing import Union
|
||||
import struct
|
||||
import typing
|
||||
|
||||
from .device import device
|
||||
from .exceptions import check_error
|
||||
@ -9,66 +11,98 @@ from .exceptions import check_error
|
||||
class lb1(device):
|
||||
"""Controls a Broadlink LB1."""
|
||||
|
||||
state_dict = []
|
||||
effect_map_dict = {
|
||||
"lovely color": 0,
|
||||
"flashlight": 1,
|
||||
"lightning": 2,
|
||||
"color fading": 3,
|
||||
"color breathing": 4,
|
||||
"multicolor breathing": 5,
|
||||
"color jumping": 6,
|
||||
"multicolor jumping": 7,
|
||||
}
|
||||
@enum.unique
|
||||
class ColorMode(enum.IntEnum):
|
||||
"""Enumerates color modes."""
|
||||
RGB = 0
|
||||
WHITE = 1
|
||||
SCENE = 2
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
"""Initialize the controller."""
|
||||
device.__init__(self, *args, **kwargs)
|
||||
self.type = "SmartBulb"
|
||||
|
||||
def send_command(self, command: str, type: str = "set") -> None:
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(16 + (int(len(command) / 16) + 1) * 16)
|
||||
packet[0x00] = 0x0C + len(command) & 0xFF
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set
|
||||
packet[0x09] = 0x0B
|
||||
packet[0x0A] = len(command)
|
||||
packet[0x0E:] = map(ord, command)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x06] = checksum & 0xFF # Checksum 1 position
|
||||
packet[0x07] = checksum >> 8 # Checksum 2 position
|
||||
|
||||
response = self.send_packet(0x6A, packet)
|
||||
check_error(response[0x36:0x38])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
|
||||
responseLength = int(payload[0x0A]) | (int(payload[0x0B]) << 8)
|
||||
if responseLength > 0:
|
||||
self.state_dict = json.loads(payload[0x0E : 0x0E + responseLength])
|
||||
|
||||
def set_json(self, jsonstr: str) -> str:
|
||||
"""Send a command to the device and return state."""
|
||||
reconvert = json.loads(jsonstr)
|
||||
if "bulb_sceneidx" in reconvert.keys():
|
||||
reconvert["bulb_sceneidx"] = self.effect_map_dict.get(
|
||||
reconvert["bulb_sceneidx"], 255
|
||||
)
|
||||
|
||||
self.send_command(json.dumps(reconvert))
|
||||
return json.dumps(self.state_dict)
|
||||
|
||||
def set_state(self, state: Union[str, int]) -> None:
|
||||
"""Set the state of the device."""
|
||||
cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0)
|
||||
self.send_command(cmd)
|
||||
self.type = "LB1"
|
||||
|
||||
def get_state(self) -> dict:
|
||||
"""Return the state of the device."""
|
||||
cmd = "{}"
|
||||
self.send_command(cmd)
|
||||
return self.state_dict
|
||||
"""Return the power state of the device.
|
||||
|
||||
Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}`
|
||||
"""
|
||||
packet = self._encode(1, {})
|
||||
response = self.send_packet(0x6A, packet)
|
||||
check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def set_state(
|
||||
self,
|
||||
pwr: bool = None,
|
||||
red: int = None,
|
||||
blue: int = None,
|
||||
green: int = None,
|
||||
brightness: int = None,
|
||||
colortemp: int = None,
|
||||
hue: int = None,
|
||||
saturation: int = None,
|
||||
transitionduration: int = None,
|
||||
maxworktime: int = None,
|
||||
bulb_colormode: int = None,
|
||||
bulb_scenes: str = None,
|
||||
bulb_scene: str = None,
|
||||
bulb_sceneidx: int = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
if pwr is not None:
|
||||
state["pwr"] = int(bool(pwr))
|
||||
if red is not None:
|
||||
state["red"] = int(red)
|
||||
if blue is not None:
|
||||
state["blue"] = int(blue)
|
||||
if green is not None:
|
||||
state["green"] = int(green)
|
||||
if brightness is not None:
|
||||
state["brightness"] = brightness
|
||||
if colortemp is not None:
|
||||
state["colortemp"] = colortemp
|
||||
if hue is not None:
|
||||
state["hue"] = hue
|
||||
if saturation is not None:
|
||||
state["saturation"] = saturation
|
||||
if transitionduration is not None:
|
||||
state["transitionduration"] = transitionduration
|
||||
if maxworktime is not None:
|
||||
state["maxworktime"] = maxworktime
|
||||
if bulb_colormode is not None:
|
||||
state["bulb_colormode"] = bulb_colormode
|
||||
if bulb_scenes is not None:
|
||||
state["bulb_scenes"] = bulb_scenes
|
||||
if bulb_scene is not None:
|
||||
state["bulb_scene"] = bulb_scene
|
||||
if bulb_sceneidx is not None:
|
||||
state["bulb_sceneidx"] = bulb_sceneidx
|
||||
|
||||
packet = self._encode(2, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def _encode(self, flag: int, obj: typing.Any) -> bytes:
|
||||
"""Encode a JSON packet."""
|
||||
# flag: 1 for reading, 2 for writing.
|
||||
packet = bytearray(14)
|
||||
js = json.dumps(obj, separators=[',', ':']).encode()
|
||||
p_len = 12 + len(js)
|
||||
struct.pack_into(
|
||||
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
|
||||
)
|
||||
packet += js
|
||||
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
|
||||
packet[0x6:0x8] = checksum.to_bytes(2, "little")
|
||||
return packet
|
||||
|
||||
def _decode(self, response: bytes) -> typing.Any:
|
||||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0xA)[0]
|
||||
state = json.loads(payload[0xE : 0xE + js_len])
|
||||
return state
|
||||
|
Loading…
Reference in New Issue
Block a user