mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-21 06:40:12 +01:00
Merge branch 'master' into dev
This commit is contained in:
commit
70180cfbc6
37
README.md
37
README.md
@ -2,16 +2,17 @@
|
||||
|
||||
A Python module and CLI for controlling Broadlink devices locally. The following devices are supported:
|
||||
|
||||
- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S
|
||||
- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate
|
||||
- **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01
|
||||
- **Switches**: MCB1, SC1, SCB1E, SCB2
|
||||
- **Outlets**: BG 800, BG 900
|
||||
- **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2
|
||||
- **Environment sensors**: A1
|
||||
- **Alarm kits**: S1C, S2KIT
|
||||
- **Light bulbs**: LB1, LB2, SB800TD
|
||||
- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD
|
||||
- **Curtain motors**: Dooya DT360E-45/20
|
||||
- **Thermostats**: Hysen HY02B05H
|
||||
- **Hubs**: S3
|
||||
|
||||
## Installation
|
||||
|
||||
@ -213,4 +214,34 @@ devices[0].set_state(bulb_colormode=1)
|
||||
### Fetching sensor data
|
||||
```python3
|
||||
data = device.check_sensors()
|
||||
```
|
||||
```
|
||||
|
||||
## Hubs
|
||||
|
||||
### Discovering subdevices
|
||||
```python3
|
||||
device.get_subdevices()
|
||||
```
|
||||
|
||||
### Fetching data
|
||||
Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device.
|
||||
|
||||
```python3
|
||||
device.get_state(did="00000000000000000000a043b0d06963")
|
||||
```
|
||||
|
||||
### Setting state attributes
|
||||
The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches:
|
||||
|
||||
#### Turn on
|
||||
```python3
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr=1)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1)
|
||||
```
|
||||
#### Turn off
|
||||
```python3
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr=0)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0)
|
||||
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0)
|
||||
```
|
||||
|
@ -9,6 +9,7 @@ from .alarm import S1C
|
||||
from .climate import hysen
|
||||
from .cover import dooya
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
from .light import lb1, lb2
|
||||
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
|
||||
from .sensor import a1
|
||||
@ -111,6 +112,7 @@ SUPPORTED_TYPES = {
|
||||
},
|
||||
rm4mini: {
|
||||
0x51DA: ("RM4 mini", "Broadlink"),
|
||||
0x5209: ("RM4 TV mate", "Broadlink"),
|
||||
0x6070: ("RM4C mini", "Broadlink"),
|
||||
0x610E: ("RM4 mini", "Broadlink"),
|
||||
0x610F: ("RM4C mini", "Broadlink"),
|
||||
@ -122,6 +124,7 @@ SUPPORTED_TYPES = {
|
||||
0x653A: ("RM4 mini", "Broadlink"),
|
||||
},
|
||||
rm4pro: {
|
||||
0x5213: ("RM4 pro", "Broadlink"),
|
||||
0x6026: ("RM4 pro", "Broadlink"),
|
||||
0x6184: ("RM4C pro", "Broadlink"),
|
||||
0x61A2: ("RM4 pro", "Broadlink"),
|
||||
@ -146,13 +149,20 @@ SUPPORTED_TYPES = {
|
||||
0x60C7: ("LB1", "Broadlink"),
|
||||
0x60C8: ("LB1", "Broadlink"),
|
||||
0x6112: ("LB1", "Broadlink"),
|
||||
0x644C: ("LB27 R1", "Broadlink"),
|
||||
0x644E: ("LB26 R1", "Broadlink"),
|
||||
},
|
||||
lb2: {
|
||||
0xA4F4: ("LB27 R1", "Broadlink"),
|
||||
0xA5F7: ("LB27 R1", "Broadlink"),
|
||||
},
|
||||
S1C: {
|
||||
0x2722: ("S2KIT", "Broadlink"),
|
||||
},
|
||||
s3: {
|
||||
0xA59C:("S3", "Broadlink"),
|
||||
0xA64D:("S3", "Broadlink"),
|
||||
},
|
||||
hysen: {
|
||||
0x4EAD: ("HY02/HY03", "Hysen"),
|
||||
},
|
||||
|
83
broadlink/hub.py
Normal file
83
broadlink/hub.py
Normal file
@ -0,0 +1,83 @@
|
||||
"""Support for hubs."""
|
||||
import struct
|
||||
import json
|
||||
|
||||
from . import exceptions as e
|
||||
from .device import Device
|
||||
|
||||
|
||||
class s3(Device):
|
||||
"""Controls a Broadlink S3."""
|
||||
|
||||
TYPE = "S3"
|
||||
MAX_SUBDEVICES = 8
|
||||
|
||||
def get_subdevices(self) -> list:
|
||||
"""Return the lit of sub devices."""
|
||||
sub_devices = []
|
||||
step = 5
|
||||
|
||||
for index in range(0, self.MAX_SUBDEVICES, step):
|
||||
state = {"count": step, "index": index}
|
||||
packet = self._encode(14, state)
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
resp = self._decode(resp)
|
||||
|
||||
sub_devices.extend(resp["list"])
|
||||
if len(sub_devices) == resp["total"]:
|
||||
break
|
||||
|
||||
return sub_devices
|
||||
|
||||
def get_state(self, did: str = None) -> dict:
|
||||
"""Return the power state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
|
||||
packet = self._encode(1, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def set_state(
|
||||
self,
|
||||
did: str = None,
|
||||
pwr1: bool = None,
|
||||
pwr2: bool = None,
|
||||
pwr3: bool = None,
|
||||
) -> dict:
|
||||
"""Set the power state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
if pwr1 is not None:
|
||||
state["pwr1"] = int(bool(pwr1))
|
||||
if pwr2 is not None:
|
||||
state["pwr2"] = int(bool(pwr2))
|
||||
if pwr3 is not None:
|
||||
state["pwr3"] = int(bool(pwr3))
|
||||
|
||||
packet = self._encode(2, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
return self._decode(response)
|
||||
|
||||
def _encode(self, flag: int, state: dict) -> bytes:
|
||||
"""Encode a JSON packet."""
|
||||
# flag: 1 for reading, 2 for writing.
|
||||
packet = bytearray(12)
|
||||
data = json.dumps(state, separators=(",", ":")).encode()
|
||||
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
|
||||
packet.extend(data)
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[0x04:0x06] = checksum.to_bytes(2, "little")
|
||||
return packet
|
||||
|
||||
def _decode(self, response: bytes) -> dict:
|
||||
"""Decode a JSON packet."""
|
||||
payload = self.decrypt(response[0x38:])
|
||||
js_len = struct.unpack_from("<I", payload, 0x08)[0]
|
||||
state = json.loads(payload[0x0C : 0x0C + js_len])
|
||||
return state
|
Loading…
Reference in New Issue
Block a user