Add support for S3 Hub and LC-1 (1,2&3 gang) light switches (#654)
* https://github.com/mjg59/python-broadlink/issues/647 * Added get_state(did) and update documentation * Fixed pwr3 set_state * Added get_subdevices() * Cleaned up get_subdevices * Updated S3 documentation * Added device id 0xA59C:("S3", "Broadlink") * Improve logic of get_subdevices() Prevents infinite loop. * Black * Move s3 closer to s1c * Update README.md Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
This commit is contained in:
parent
d870560e6e
commit
2b70440786
33
README.md
33
README.md
|
@ -12,6 +12,7 @@ A Python module and CLI for controlling Broadlink devices locally. The following
|
|||
- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD
|
||||
- **Curtain motors**: Dooya DT360E-45/20
|
||||
- **Thermostats**: Hysen HY02B05H
|
||||
- **Hubs**: S3
|
||||
|
||||
## Installation
|
||||
|
||||
|
@ -194,4 +195,34 @@ devices[0].set_state(bulb_colormode=1)
|
|||
### Fetching sensor data
|
||||
```python3
|
||||
data = device.check_sensors()
|
||||
```
|
||||
```
|
||||
|
||||
## Hubs
|
||||
|
||||
### Discovering subdevices
|
||||
```python3
|
||||
device.get_subdevices()
|
||||
```
|
||||
|
||||
### Fetching data
|
||||
Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device.
|
||||
|
||||
```python3
|
||||
device.get_state(did="00000000000000000000a043b0d06963")
|
||||
```
|
||||
|
||||
### Setting state attributes
|
||||
The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches:
|
||||
|
||||
#### Turn on
|
||||
```python3
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr=1)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1)
|
||||
```
|
||||
#### Turn off
|
||||
```python3
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr=0)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0)
|
||||
```
|
||||
|
|
|
@ -9,6 +9,7 @@ from .alarm import S1C
|
|||
from .climate import hysen
|
||||
from .cover import dooya
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
from .light import lb1, lb2
|
||||
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
|
||||
from .sensor import a1
|
||||
|
@ -156,6 +157,10 @@ SUPPORTED_TYPES = {
|
|||
S1C: {
|
||||
0x2722: ("S2KIT", "Broadlink"),
|
||||
},
|
||||
s3: {
|
||||
0xA59C:("S3", "Broadlink"),
|
||||
0xA64D:("S3", "Broadlink"),
|
||||
},
|
||||
hysen: {
|
||||
0x4EAD: ("HY02/HY03", "Hysen"),
|
||||
},
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
"""Support for hubs."""
|
||||
import struct
|
||||
import json
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
||||
|
||||
class s3(Device):
|
||||
"""Controls a Broadlink S3."""
|
||||
|
||||
TYPE = "S3"
|
||||
MAX_SUBDEVICES = 8
|
||||
|
||||
def get_subdevices(self) -> list:
|
||||
"""Return the lit of sub devices."""
|
||||
sub_devices = []
|
||||
step = 5
|
||||
|
||||
for index in range(0, self.MAX_SUBDEVICES, step):
|
||||
state = {"count": step, "index": index}
|
||||
packet = self._encode(14, state)
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
resp = self._decode(resp)
|
||||
|
||||
sub_devices.extend(resp["list"])
|
||||
if len(sub_devices) == resp["total"]:
|
||||
break
|
||||
|
||||
return sub_devices
|
||||
|
||||
def get_state(self, did: str = None) -> dict:
|
||||
"""Return the power state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
|
||||
packet = self._encode(1, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def set_state(
|
||||
self,
|
||||
did: str = None,
|
||||
pwr1: bool = None,
|
||||
pwr2: bool = None,
|
||||
pwr3: bool = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
if pwr1 is not None:
|
||||
state["pwr1"] = int(bool(pwr1))
|
||||
if pwr2 is not None:
|
||||
state["pwr2"] = int(bool(pwr2))
|
||||
if pwr3 is not None:
|
||||
state["pwr3"] = int(bool(pwr3))
|
||||
|
||||
packet = self._encode(2, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def _encode(self, flag: int, state: dict) -> bytes:
|
||||
"""Encode a JSON packet."""
|
||||
# flag: 1 for reading, 2 for writing.
|
||||
packet = bytearray(12)
|
||||
data = json.dumps(state, separators=(",", ":")).encode()
|
||||
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
|
||||
packet.extend(data)
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x04:0x06] = checksum.to_bytes(2, "little")
|
||||
return packet
|
||||
|
||||
def _decode(self, response: bytes) -> dict:
|
||||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
return state
|
Loading…
Reference in New Issue