2020-09-17 05:41:32 +02:00
|
|
|
import json
|
|
|
|
from typing import Union
|
|
|
|
|
|
|
|
from .device import device
|
|
|
|
from .exceptions import check_error
|
|
|
|
|
|
|
|
|
|
|
|
class lb1(device):
|
|
|
|
"""Controls a Broadlink LB1."""
|
|
|
|
|
|
|
|
state_dict = []
|
|
|
|
effect_map_dict = {
|
|
|
|
'lovely color': 0,
|
|
|
|
'flashlight': 1,
|
|
|
|
'lightning': 2,
|
|
|
|
'color fading': 3,
|
|
|
|
'color breathing': 4,
|
|
|
|
'multicolor breathing': 5,
|
|
|
|
'color jumping': 6,
|
|
|
|
'multicolor jumping': 7,
|
|
|
|
}
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
|
|
"""Initialize the controller."""
|
|
|
|
device.__init__(self, *args, **kwargs)
|
|
|
|
self.type = "SmartBulb"
|
|
|
|
|
|
|
|
def send_command(self, command: str, type: str = 'set') -> None:
|
|
|
|
"""Send a command to the device."""
|
|
|
|
packet = bytearray(16+(int(len(command)/16) + 1)*16)
|
|
|
|
packet[0x00] = 0x0c + len(command) & 0xff
|
|
|
|
packet[0x02] = 0xa5
|
|
|
|
packet[0x03] = 0xa5
|
|
|
|
packet[0x04] = 0x5a
|
|
|
|
packet[0x05] = 0x5a
|
|
|
|
packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set
|
|
|
|
packet[0x09] = 0x0b
|
|
|
|
packet[0x0a] = len(command)
|
|
|
|
packet[0x0e:] = map(ord, command)
|
|
|
|
|
|
|
|
checksum = sum(packet, 0xbeaf) & 0xffff
|
|
|
|
packet[0x06] = checksum & 0xff # Checksum 1 position
|
|
|
|
packet[0x07] = checksum >> 8 # Checksum 2 position
|
|
|
|
|
|
|
|
response = self.send_packet(0x6a, packet)
|
|
|
|
check_error(response[0x36:0x38])
|
2020-09-20 11:16:49 +02:00
|
|
|
payload = self.decrypt(response[0x38:])
|
2020-09-17 05:41:32 +02:00
|
|
|
|
|
|
|
responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8)
|
|
|
|
if responseLength > 0:
|
|
|
|
self.state_dict = json.loads(payload[0x0e:0x0e+responseLength])
|
|
|
|
|
|
|
|
def set_json(self, jsonstr: str) -> str:
|
|
|
|
"""Send a command to the device and return state."""
|
|
|
|
reconvert = json.loads(jsonstr)
|
|
|
|
if 'bulb_sceneidx' in reconvert.keys():
|
|
|
|
reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255)
|
|
|
|
|
|
|
|
self.send_command(json.dumps(reconvert))
|
|
|
|
return json.dumps(self.state_dict)
|
|
|
|
|
|
|
|
def set_state(self, state: Union[str, int]) -> None:
|
|
|
|
"""Set the state of the device."""
|
|
|
|
cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0)
|
|
|
|
self.send_command(cmd)
|
|
|
|
|
|
|
|
def get_state(self) -> dict:
|
|
|
|
"""Return the state of the device."""
|
|
|
|
cmd = "{}"
|
|
|
|
self.send_command(cmd)
|
|
|
|
return self.state_dict
|