1
0
Fork 0

Compare commits

...

78 Commits

Author SHA1 Message Date
Felipe Martins Diel b07de4cc29 Return state in set_state() 2024-04-17 02:01:26 -03:00
Felipe Martins Diel fbc0f01994 Fix bugs 2024-04-17 01:52:52 -03:00
Felipe Martins Diel f840d5d187 Remove log messages 2024-04-17 01:16:41 -03:00
Felipe Martins Diel d97f887888 Use CRC16 helper class 2024-04-17 01:10:48 -03:00
Felipe Martins Diel bcde9f644f Paint it black 2024-04-17 00:31:23 -03:00
Felipe Martins Diel 855254e65a Merge remote-tracking branch 'upstream/dev' into hvac-support 2024-04-17 00:12:04 -03:00
Felipe Martins Diel fa44b54d88
Add support for Broadlink A2 (#791)
* Add support for Broadlink A2

* Add supported type

* Fix bugs

* Improve device name
2024-04-12 02:10:06 -03:00
Felipe Martins Diel 24b9d308b6
Fix s3.get_subdevices() (#790)
* Fix s3.get_subdevices()

* Fix docstring
2024-04-10 23:55:41 -03:00
Felipe Martins Diel eb0f98a410
Fix README.md (#789) 2024-04-10 23:15:46 -03:00
Felipe Martins Diel 247be74c33
Expose IR/RF conversion functions (#788)
* Move IR duration<->Broadlink conversion down from CLI

* Fix --learn base64 to not crash with --durations

Also remove its b'...' wrapping.

* Fix IR/RF conversions

---------

Co-authored-by: William Grant <me@williamgrant.id.au>
2024-04-10 22:51:41 -03:00
Felipe Martins Diel 84af992dcc
Add support for Wistar smart curtain (0x4F6C) (#786)
* Add support for Wistar smart curtain (0x4F6C)

* Rename wsrc to wser
2024-04-10 16:35:25 -03:00
Felipe Martins Diel 4766d68289
Add support for Dooya DT360E (v2) (#785) 2024-04-09 20:32:41 -03:00
Felipe Martins Diel 821820c61e
Add support for BG Electrical EHC31 (0x6480) (#784) 2024-04-09 19:56:30 -03:00
Felipe Martins Diel cacebe7f3c
Rename MP1S state parameters (#783)
* Rename MP1S state parameters

* Rename get_status to get_state
2024-04-09 19:43:29 -03:00
Hozoy c6bf96da47
Add mp1s get_status function (#762) 2024-04-09 19:23:35 -03:00
Felipe Martins Diel 06c91ae394
Remove auxiliary functions from hysen class (#780) 2024-04-09 16:14:04 -03:00
irsl d7ed9855b9
Thermostat: get the 1st decimal place (#772) 2024-04-09 16:06:38 -03:00
Ian Munsie 634370d878
Add ability to RF scan a specific frequency (#613)
* Add ability to RF scan a specific frequency

This adds an optional parameter to find_rf_packet(), along with a
corresponding --rflearn parameter (defaulting to 433.92) to
broadlink_cli that specifies the frequency to tune to, rather than
requiring the frequency be found via sweeping. This is almost mandatory
for certain types of remotes that do not repeat their signals while the
button is held, and saves significant time when the frequency is known
in advance or when many buttons are to be captured in a row.

Additionally:

- A get_frequency() API is added to return the current frequency the
  device is tuned to.

- A check_frequency_ex() API is added to perform functions of both
  check_frequency() and get_frequency() in a single call.

- broadlink_cli --rfscanlearn will now report the current frequency at 1
  second intervals during sweeping, and will report the frequency it
  finally locks on to.

* Clean up remote.py

* Clean up broadlink_cli

* Update conditional

* Fix message

---------

Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
2024-04-09 15:40:00 -03:00
fustom abcc9aaeed
Add heating_cooling state to Hysen (#722) 2023-01-22 01:50:37 -03:00
Felipe Martins Diel d4dafa386c
Merge 'master' into 'dev' (#734) 2023-01-14 02:01:06 -03:00
Felipe Martins Diel 3c183eaaef
Bump version to 0.18.3 (#728) 2022-11-20 15:48:08 -03:00
Felipe Martins Diel 9d5339ab8e
Merge new product ids into master (#726)
* Add support for RM4C mini (0x520D) (#694)

* Add support for SP4L-US (0x648C) (#707)

* Add support for RM4C mate (0x5211) (#709)

* Add support for RM4 mini (0x521C) (#710)

* Add support for LB1 (0x644B) (#717)
2022-11-20 15:44:34 -03:00
Felipe Martins Diel 47b3245050
Fix Github actions (#727)
* Bump checkout to v3 and setup-python to v4

* Remove unused branches

* Fix ubuntu at v20.04
2022-11-20 15:37:31 -03:00
Felipe Martins Diel a86e9cbb9c
Bump version to 0.18.2 (#687) 2022-05-23 02:38:55 -03:00
Felipe Martins Diel d5cf63bf45
Merge new product ids into master (#686)
* Add support for Broadlink RM4 mini (0x5216) (#671)

* Add support for Broadlink RM4 pro 0x520B (#673)

* Add support for SP4L-UK 0xA569 (#677)

* Fixing typo in rm4pro device definitions (#682)

* Add support for Bestcon RM4C pro (0x5218) (#683)

* Add support for Broadlink RM4 TV mate (0x5212) (#684)

* Add support for Broadlink RM4 mini (0x520C) (#685)
2022-05-23 02:35:28 -03:00
Felipe Martins Diel 70180cfbc6 Merge branch 'master' into dev 2022-03-19 19:37:31 -03:00
Steven Dodd 2b70440786
Add support for S3 Hub and LC-1 (1,2&3 gang) light switches (#654)
* https://github.com/mjg59/python-broadlink/issues/647

* Added get_state(did) and update documentation

* Fixed pwr3 set_state

* Added get_subdevices()

* Cleaned up get_subdevices

* Updated S3 documentation

* Added device id 0xA59C:("S3", "Broadlink")

* Improve logic of get_subdevices()

Prevents infinite loop.

* Black

* Move s3 closer to s1c

* Update README.md

Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
2022-03-19 19:31:14 -03:00
Felipe Martins Diel d870560e6e
Bump version to 0.18.1 (#668) 2022-03-19 15:43:00 -03:00
Felipe Martins Diel 3bdb6dfb92
Merge new product ids into master (#667)
* Add support for Broadlink LB26 R1 (0x644E) (#636)

* Add support for Broadlink LB26 R1 (0x644E)

* Add Broadlink LB26 R1 to README.md

* Add missing device codes for LB27 R1 Smart Bulbs (#644)

These are two missing codes.
This closes issue #639

* Add support for Broadlink RM4 pro (0x5213) (#649)

* Add support for Broadlink RM4 TV mate (0x5209) (#655)

* Move 0x644C and 0x644E to the LB1 class (#666)

Co-authored-by: Mathew Verdouw <mathew@vtc.com.au>
2022-03-19 15:28:54 -03:00
Felipe Martins Diel bb19504314
Fix instructions for learning RF codes (#632) 2021-10-18 18:23:05 -03:00
Felipe Martins Diel f2a582b8f9
Add support for Broadlink MP1 with power meter (#631) 2021-10-18 15:59:47 -03:00
Felipe Martins Diel 9873af9bc4
Standardize ip_address option (#630) 2021-10-18 14:19:41 -03:00
Felipe Martins Diel 11febb043b
Improve README.md (#629) 2021-10-18 14:05:39 -03:00
Felipe Martins Diel b596984b44
Add ip_address option to setup() (#628)
* Add ip_address option to setup()

* Update README.md
2021-10-18 13:42:32 -03:00
Felipe Martins Diel 24ef7302bd
Bump version to 0.18.0 (#621) 2021-10-17 13:56:11 -03:00
Felipe Martins Diel 26ee3192d9 Change the type and model of the hysen class (#627) 2021-10-17 13:25:00 -03:00
Felipe Martins Diel dc3cf509fc
Add pull request template (#626) 2021-10-17 10:20:41 -03:00
Felipe Martins Diel a721087c07
Improve README.md (#625) 2021-10-17 09:20:33 -03:00
Felipe Martins Diel e29170c754
Fix indentation of README.md (#624) 2021-10-17 06:11:45 -03:00
Felipe Martins Diel 62f81bc281
Add support for Broadlink SCB1E (0xA56B) (#623) 2021-10-17 05:36:42 -03:00
Felipe Martins Diel 84bec957ad
Fix flake8 tests (#622) 2021-10-13 03:31:19 -03:00
1UPNuke ca1634575e Add support for Clas Ohlson SL-2 E14 (0x6065) (#600) 2021-09-12 16:55:08 -03:00
Felipe Martins Diel a84a628d1c
Add support for Broadlink RM mini 3 (0x27DC) (#594) 2021-05-15 15:06:38 -03:00
Felipe Martins Diel 3f92850a5f
Add support for Broadlink SP4L-EU (0xA56C) (#593) 2021-05-11 17:25:22 -03:00
Felipe Martins Diel bc44166702
Refactor SUPPORTED_TYPES (#592) 2021-05-06 14:52:11 -03:00
Felipe Martins Diel 1ae12e7d1c
Remove local_ip_address option from hello() (#591) 2021-05-03 17:34:22 -03:00
Felipe Martins Diel c6390924bf
Add support for Broadlink SP4L-AU (0x757B) (#590) 2021-05-03 16:02:54 -03:00
Felipe Martins Diel b43b296ff3
Add support for Broadlink RM mini 3 (0x6507) (#589) 2021-05-01 15:05:31 -03:00
Felipe Martins Diel 2d863bd6c1
Rename the lb27r1 class to lb2 (#586) 2021-04-29 20:31:30 -03:00
Felipe Martins Diel 6a54803a36
Improve code quality (#585)
* Improve docstrings

* Fix line-too-long

* Disable unidiomatic-typecheck

* Move smart plugs to the top

* Use constants from const.py

* Run black
2021-04-29 19:51:22 -03:00
Felipe Martins Diel d48d1347a3
Move constants to const.py (#584) 2021-04-29 18:59:58 -03:00
Felipe Martins Diel e1f3b83efd Add support for Broadlink SP4L-EU (0xA5D3) (#582) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 49322ddaae Add support for Broadlink SP4L-CN (0x7568) (#577) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel fc5c33ee97 Use the absolute position to read the lock status (#575) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 12fdf01631 Improve code quality (#572)
* Improve typing

* Use better names

* Clean up switch.py

* Remove unused import

* Run black

* Remove unnecessary comments

* Clean up climate.py
2021-04-26 17:57:02 -03:00
Felipe Martins Diel b77e803864 Use CamelCase for the Device class (#570) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 36b293bf05 Raise exceptions explicitly (#569) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 056434ab46 Add support for Broadlink RM4C pro (0x6184) (#568) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 67b674859f Segregate CRC16.get_table() from CRC16.calculate() (#567) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 4e1e690762 Encapsulate crc16() to avoid global (#566) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel 86b5d0727c Improve CRC-16 function (#565)
* Rename calculate_crc16 to crc16

* Apply PEP-8 naming conventions

* Remove unnecessary import

* Accept any sequence type

* Remove unnecessary conversions

* Expose polynomial and initial value as kwargs

* Remove unnecessary bitwise operations

* Store the CRC-16 table for performance

* Add missing type hints

* Update docstring

* General improvements
2021-04-26 17:57:02 -03:00
Felipe Martins Diel 1a8ee21a34 Make better use of namespaces (#564)
Use namespaces for typing and exceptions.
2021-04-26 17:57:02 -03:00
Felipe Martins Diel 2198400ad6 Add support for Broadlink LB27 R1 (0xA4F4) (#557)
* Add support for Broadlink LB27 R1 (0xA4F4)

* Improve typing
2021-04-26 17:57:02 -03:00
Johnson Chin f3e4edcad4 Add support for Broadlink SP4L-UK (0x7587) (#561)
* Add new SP4L-UK type

* Switch: SP4 check power and nightlight to return as boolean

Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
2021-04-26 17:57:02 -03:00
Felipe Martins Diel de0cebc00f Add support for Broadlink SCB2 (0x6494) (#558) 2021-04-26 17:57:02 -03:00
Felipe Martins Diel d45c9d0850 Improve the CLI (#555)
* Add check humidity option

* Rename type to devtype

* Remove unnecessary try except clause

* Add commands to README.md
2021-04-26 17:57:01 -03:00
Andrew Berry 822b3c3266
Add a TROUBLESHOOTING doc with WiFi password notes (#563)
* Add a TROUBLESHOOTING doc with WiFi password notes

* Update TROUBLESHOOTING.md

Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com>
2021-04-11 21:41:07 -03:00
Felipe Martins Diel 9ff6b2d48e 0.17.0 2021-03-11 18:10:04 -03:00
Felipe Martins Diel 9eeee0deda
Bump cryptography from 2.6.1 to 3.2 (#553) 2021-03-11 18:07:56 -03:00
Felipe Martins Diel 335399ef2f
Add new devices to README.md (#552) 2021-03-11 07:36:20 -03:00
Felipe Martins Diel 7c0b4d529f
Improve exceptions (#551) 2021-03-11 04:33:14 -03:00
Felipe Martins Diel a11b7233c9
Improve repr(device) and str(device) (#550) 2021-03-11 02:51:29 -03:00
Felipe Martins Diel 90a43835e8
Reset connection ID and AES key before sending Client Key Exchange packets (#549) 2021-03-11 01:25:57 -03:00
Felipe Martins Diel 20b9eed6bc
Add a method to update device name and lock status (#537) 2021-03-11 01:00:10 -03:00
Felipe Martins Diel 5dee06c815
Make 0x2711 a sp2s device (#538) 2021-02-17 00:38:18 -03:00
Felipe Martins Diel 39ee67bb98
Split the rm and rm4 classes into smaller classes (#529) 2021-02-16 18:14:11 -03:00
Felipe Martins Diel 1b73cfce3a
Split the sp2 class into smaller classes (#521) 2021-02-16 16:38:10 -03:00
Felipe Martins Diel 008846ba41
Fix index (#533) 2021-02-04 17:16:17 -03:00
23 changed files with 1981 additions and 1019 deletions

55
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1,55 @@
<!--
You are amazing! Thanks for contributing to our project!
Please fill the template to help maintainers processing your PR.
Don't forget to create the PR against the correct branch:
- new product id -> new_product_ids
- anything else -> dev
-->
## Context
<!--
Summarize the motivation and context of the change.
Which issue are you dealing with?
-->
## Proposed change
<!--
Describe the change. How are you fixing the issue?
-->
## Type of change
<!--
What type of change does your PR introduce?
Please, check only 1 box!
-->
- [ ] Dependency upgrade
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New device
- [ ] New product id (the device is already supported with a different id)
- [ ] New feature (which adds functionality to an existing device)
- [ ] Breaking change (fix/feature causing existing functionality to break)
- [ ] Code quality improvements to existing code or addition of tests
- [ ] Documentation
## Additional information
<!--
Link docs and related issues, when applicable.
-->
- This PR fixes issue: fixes #
- This PR is related to:
- Link to documentation pull request:
## Checklist
<!--
Please do your best to check these boxes.
-->
- [ ] The code change is tested and works locally.
- [ ] The code has been formatted using Black.
- [ ] The code follows the [Zen of Python](https://www.python.org/dev/peps/pep-0020/).
- [ ] I am creating the Pull Request against the correct branch.
- [ ] Documentation added/updated.

View File

@ -2,26 +2,27 @@ name: Python flake8
on:
push:
branches: [ main, master, dev, development ]
branches: [ master, dev ]
pull_request:
branches: [ main, master, dev, development ]
branches: [ master, dev ]
jobs:
test:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 wemake-python-styleguide
pip install wheel
pip install flake8 flake8-quotes
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |

287
README.md
View File

@ -1,150 +1,204 @@
Python control for Broadlink devices
===============================================
# python-broadlink
A simple Python API for controlling Broadlink devices. At present, the following devices are supported:
A Python module and CLI for controlling Broadlink devices locally. The following devices are supported:
- Universal remotes: `RM pro`, `RM pro+`, `RM pro plus`, `RM mini 3`, `RM4 pro`, `RM4 mini`, `RM4C mini`, `RM4S`
- Smart plugs: `SP1`, `SP2`, `SP mini`, `SP mini+`, `SP3`, `SP3S`, `SP4L`, `SP4M`
- Power strips: `MP1-1K4S`, `MP1-1K3S2U`, `MP2`
- Control box: `SC1`, `SCB1E`, `MCB1`
- Sensors: `A1`
- Alarm kit: `S1C`, `S2KIT`
- Light bulb: `LB1`, `SB800TD`
- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate
- **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01
- **Switches**: MCB1, SC1, SCB1E, SCB2
- **Outlets**: BG 800, BG 900
- **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2
- **Environment sensors**: A1
- **Alarm kits**: S1C, S2KIT
- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD
- **Curtain motors**: Dooya DT360E-45/20
- **Thermostats**: Hysen HY02B05H
- **Hubs**: S3
Other devices with Broadlink DNA:
- Smart plugs: `Ankuoo NEO`, `Ankuoo NEO PRO`, `Efergy Ego`, `BG AHC/U-01`
- Outlet: `BG 800`, `BG 900`
- Curtain motor: `Dooya DT360E-45/20`
- Thermostat: `Hysen HY02B05H`
## Installation
There is currently no support for the cloud API.
Use pip3 to install the latest version of this module.
Example use
-----------
Setup a new device on your local wireless network:
1. Put the device into AP Mode
1. Long press the reset button until the blue LED is blinking quickly.
2. Long press again until blue LED is blinking slowly.
3. Manually connect to the WiFi SSID named BroadlinkProv.
2. Run setup() and provide your ssid, network password (if secured), and set the security mode
1. Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
```
import broadlink
pip3 install broadlink
```
## Basic functions
First, open Python 3 and import this module.
```
python3
```
```python3
import broadlink
```
Now let's try some functions...
### Setup
In order to control the device, you need to connect it to your local network. If you have already configured the device with the Broadlink app, this step is not necessary.
1. Put the device into AP Mode.
- Long press the reset button until the blue LED is blinking quickly.
- Long press again until blue LED is blinking slowly.
- Manually connect to the WiFi SSID named BroadlinkProv.
2. Connect the device to your local network with the setup function.
```python3
broadlink.setup('myssid', 'mynetworkpass', 3)
```
Discover available devices on the local network:
```
import broadlink
Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
devices = broadlink.discover(timeout=5)
#### Advanced options
You may need to specify a broadcast address if setup is not working.
```python3
broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255')
```
### Discovery
Use this function to discover devices:
```python3
devices = broadlink.discover()
```
#### Advanced options
You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices.
Using your machine's IP address with `local_ip_address`
```
import broadlink
devices = broadlink.discover(timeout=5, local_ip_address='192.168.0.100')
Using the IP address of your local machine:
```python3
devices = broadlink.discover(local_ip_address='192.168.0.100')
```
Using your subnet's broadcast address with `discover_ip_address`
```
import broadlink
devices = broadlink.discover(timeout=5, discover_ip_address='192.168.0.255')
Using the broadcast address of your subnet:
```python3
devices = broadlink.discover(discover_ip_address='192.168.0.255')
```
Obtain the authentication key required for further communication:
```
devices[0].auth()
If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery:
```python3
device = broadlink.hello('192.168.0.16')
```
Enter learning mode:
```
devices[0].enter_learning()
If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly:
```python3
for device in broadlink.xdiscover():
print(device) # Example action. Do whatever you want here.
```
Sweep RF frequencies:
```
devices[0].sweep_frequency()
### Authentication
After discovering the device, call the `auth()` method to obtain the authentication key required for further communication:
```python3
device.auth()
```
Cancel sweep RF frequencies:
```
devices[0].cancel_sweep_frequency()
```
Check whether a frequency has been found:
```
found = devices[0].check_frequency()
```
(This will return True if the RM has locked onto a frequency, False otherwise)
The next steps depend on the type of device you want to control.
Attempt to learn an RF packet:
```
found = devices[0].find_rf_packet()
```
(This will return True if a packet has been found, False otherwise)
## Universal remotes
Obtain an IR or RF packet while in learning mode:
```
ir_packet = devices[0].check_data()
```
(This will return None if the device does not have a packet to return)
### Learning IR codes
Send an IR or RF packet:
Learning IR codes takes place in three steps.
1. Enter learning mode:
```python3
device.enter_learning()
```
devices[0].send_data(ir_packet)
2. When the LED blinks, point the remote at the Broadlink device and press the button you want to learn.
3. Get the IR packet.
```python3
packet = device.check_data()
```
Obtain temperature data from an RM2:
### Learning RF codes
Learning RF codes takes place in six steps.
1. Sweep the frequency:
```python3
device.sweep_frequency()
```
devices[0].check_temperature()
2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn.
3. Check if the frequency was successfully identified:
```python3
ok = device.check_frequency()
if ok:
print('Frequency found!')
```
4. Enter learning mode:
```python3
device.find_rf_packet()
```
5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn.
6. Get the RF packet:
```python3
packet = device.check_data()
```
Obtain sensor data from an A1:
```
data = devices[0].check_sensors()
#### Notes
Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`.
### Canceling learning
You can exit the learning mode in the middle of the process by calling this method:
```python3
device.cancel_sweep_frequency()
```
Set power state on a SmartPlug SP2/SP3/SP4:
```
devices[0].set_power(True)
### Sending IR/RF packets
```python3
device.send_data(packet)
```
Check power state on a SmartPlug:
```
state = devices[0].check_power()
### Fetching sensor data
```python3
data = device.check_sensors()
```
Check energy consumption on a SmartPlug:
```
state = devices[0].get_energy()
## Switches
### Setting power state
```python3
device.set_power(True)
device.set_power(False)
```
Set power state for S1 on a SmartPowerStrip MP1:
```
devices[0].set_power(1, True)
### Checking power state
```python3
state = device.check_power()
```
Check power state on a SmartPowerStrip:
```
state = devices[0].check_power()
### Checking energy consumption
```python3
state = device.get_energy()
```
Get state on a bulb
```
state=devices[0].get_state()
## Power strips
### Setting power state
```python3
device.set_power(1, True) # Example socket. It could be 2 or 3.
device.set_power(1, False)
```
Set a state on a bulb
### Checking power state
```python3
state = device.check_power()
```
## Light bulbs
### Fetching data
```python3
state = device.get_state()
```
### Setting state attributes
```python3
devices[0].set_state(pwr=0)
devices[0].set_state(pwr=1)
devices[0].set_state(brightness=75)
@ -154,3 +208,40 @@ devices[0].set_state(red=0)
devices[0].set_state(green=128)
devices[0].set_state(bulb_colormode=1)
```
## Environment sensors
### Fetching sensor data
```python3
data = device.check_sensors()
```
## Hubs
### Discovering subdevices
```python3
device.get_subdevices()
```
### Fetching data
Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device.
```python3
device.get_state(did="00000000000000000000a043b0d06963")
```
### Setting state attributes
The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches:
#### Turn on
```python3
device.set_state(did="00000000000000000000a043b0d0783a", pwr=1)
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1)
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1)
```
#### Turn off
```python3
device.set_state(did="00000000000000000000a043b0d0783a", pwr=0)
device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0)
device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0)
```

9
TROUBLESHOOTING.md Normal file
View File

@ -0,0 +1,9 @@
# Troubleshooting
## Firmware issues
### AP setup fails with non-alphanumeric passwords
Some devices ship with firmware that cannot connect to WLANs with non-alphanumeric passwords. To fix this, update the firmware to the latest version. You can also change the password to one with just letters and numbers or create a separate guest network with a simpler password.
_First seen on Broadlink RM4 pro 0x6026. Already fixed in firmware v52079._

View File

@ -1,186 +1,297 @@
#!/usr/bin/env python3
"""The python-broadlink library."""
import socket
from typing import Generator, List, Union, Tuple
import typing as t
from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen, hvac
from .cover import dooya
from .device import device, ping, scan
from .exceptions import exception
from .light import lb1
from .remote import rm, rm4
from .sensor import a1
from .switch import bg1, mp1, sp1, sp2, sp4, sp4b
from .climate import hvac, hysen
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro
from .sensor import a1, a2
from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b
SUPPORTED_TYPES = {
0x0000: (sp1, "SP1", "Broadlink"),
0x2711: (sp2, "SP2", "Broadlink"),
0x2716: (sp2, "NEO PRO", "Ankuoo"),
0x2717: (sp2, "NEO", "Ankuoo"),
0x2719: (sp2, "SP2-compatible", "Honeywell"),
0x271A: (sp2, "SP2-compatible", "Honeywell"),
0x271D: (sp2, "Ego", "Efergy"),
0x2720: (sp2, "SP mini", "Broadlink"),
0x2728: (sp2, "SP2-compatible", "URANT"),
0x2733: (sp2, "SP3", "Broadlink"),
0x2736: (sp2, "SP mini+", "Broadlink"),
0x273E: (sp2, "SP mini", "Broadlink"),
0x7530: (sp2, "SP2", "Broadlink (OEM)"),
0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"),
0x753E: (sp2, "SP mini 3", "Broadlink"),
0x7540: (sp2, "MP2", "Broadlink"),
0x7544: (sp2, "SP2-CL", "Broadlink"),
0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"),
0x7547: (sp2, "SC1", "Broadlink"),
0x7918: (sp2, "SP2", "Broadlink (OEM)"),
0x7919: (sp2, "SP2-compatible", "Honeywell"),
0x791A: (sp2, "SP2-compatible", "Honeywell"),
0x7D00: (sp2, "SP3-EU", "Broadlink (OEM)"),
0x7D0D: (sp2, "SP mini 3", "Broadlink (OEM)"),
0x9479: (sp2, "SP3S-US", "Broadlink"),
0x947A: (sp2, "SP3S-EU", "Broadlink"),
0x756C: (sp4, "SP4M", "Broadlink"),
0x756F: (sp4, "MCB1", "Broadlink"),
0x7579: (sp4, "SP4L-EU", "Broadlink"),
0x7583: (sp4, "SP mini 3", "Broadlink"),
0x7D11: (sp4, "SP mini 3", "Broadlink"),
0xA56A: (sp4, "MCB1", "Broadlink"),
0xA589: (sp4, "SP4L-UK", "Broadlink"),
0x5115: (sp4b, "SCB1E", "Broadlink"),
0x51E2: (sp4b, "AHC/U-01", "BG Electrical"),
0x6111: (sp4b, "MCB1", "Broadlink"),
0x6113: (sp4b, "SCB1E", "Broadlink"),
0x618B: (sp4b, "SP4L-EU", "Broadlink"),
0x6489: (sp4b, "SP4L-AU", "Broadlink"),
0x648B: (sp4b, "SP4M-US", "Broadlink"),
0x2712: (rm, "RM pro/pro+", "Broadlink"),
0x272A: (rm, "RM pro", "Broadlink"),
0x2737: (rm, "RM mini 3", "Broadlink"),
0x273D: (rm, "RM pro", "Broadlink"),
0x277C: (rm, "RM home", "Broadlink"),
0x2783: (rm, "RM home", "Broadlink"),
0x2787: (rm, "RM pro", "Broadlink"),
0x278B: (rm, "RM plus", "Broadlink"),
0x278F: (rm, "RM mini", "Broadlink"),
0x2797: (rm, "RM pro+", "Broadlink"),
0x279D: (rm, "RM pro+", "Broadlink"),
0x27A1: (rm, "RM plus", "Broadlink"),
0x27A6: (rm, "RM plus", "Broadlink"),
0x27A9: (rm, "RM pro+", "Broadlink"),
0x27C2: (rm, "RM mini 3", "Broadlink"),
0x27C3: (rm, "RM pro+", "Broadlink"),
0x27C7: (rm, "RM mini 3", "Broadlink"),
0x27CC: (rm, "RM mini 3", "Broadlink"),
0x27CD: (rm, "RM mini 3", "Broadlink"),
0x27D0: (rm, "RM mini 3", "Broadlink"),
0x27D1: (rm, "RM mini 3", "Broadlink"),
0x27D3: (rm, "RM mini 3", "Broadlink"),
0x27DE: (rm, "RM mini 3", "Broadlink"),
0x51DA: (rm4, "RM4 mini", "Broadlink"),
0x5F36: (rm4, "RM mini 3", "Broadlink"),
0x6026: (rm4, "RM4 pro", "Broadlink"),
0x6070: (rm4, "RM4C mini", "Broadlink"),
0x610E: (rm4, "RM4 mini", "Broadlink"),
0x610F: (rm4, "RM4C mini", "Broadlink"),
0x61A2: (rm4, "RM4 pro", "Broadlink"),
0x62BC: (rm4, "RM4 mini", "Broadlink"),
0x62BE: (rm4, "RM4C mini", "Broadlink"),
0x6364: (rm4, "RM4S", "Broadlink"),
0x648D: (rm4, "RM4 mini", "Broadlink"),
0x649B: (rm4, "RM4 pro", "Broadlink"),
0x6508: (rm4, "RM mini 3", "Broadlink"),
0x6539: (rm4, "RM4C mini", "Broadlink"),
0x653A: (rm4, "RM4 mini", "Broadlink"),
0x653C: (rm4, "RM4 pro", "Broadlink"),
0x2714: (a1, "e-Sensor", "Broadlink"),
0x4EB5: (mp1, "MP1-1K4S", "Broadlink"),
0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"),
0x4F1B: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"),
0x4F65: (mp1, "MP1-1K3S2U", "Broadlink"),
0x5043: (lb1, "SB800TD", "Broadlink (OEM)"),
0x504E: (lb1, "LB1", "Broadlink"),
0x60C7: (lb1, "LB1", "Broadlink"),
0x60C8: (lb1, "LB1", "Broadlink"),
0x6112: (lb1, "LB1", "Broadlink"),
0x2722: (S1C, "S2KIT", "Broadlink"),
0X4E2A: (hvac, "HVAC", "Licensed manufacturer"),
0x4EAD: (hysen, "HY02B05H", "Hysen"),
0x4E4D: (dooya, "DT360E-45/20", "Dooya"),
0x51E3: (bg1, "BG800/BG900", "BG Electrical"),
sp1: {
0x0000: ("SP1", "Broadlink"),
},
sp2: {
0x2717: ("NEO", "Ankuoo"),
0x2719: ("SP2-compatible", "Honeywell"),
0x271A: ("SP2-compatible", "Honeywell"),
0x2720: ("SP mini", "Broadlink"),
0x2728: ("SP2-compatible", "URANT"),
0x273E: ("SP mini", "Broadlink"),
0x7530: ("SP2", "Broadlink (OEM)"),
0x7539: ("SP2-IL", "Broadlink (OEM)"),
0x753E: ("SP mini 3", "Broadlink"),
0x7540: ("MP2", "Broadlink"),
0x7544: ("SP2-CL", "Broadlink"),
0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"),
0x7547: ("SC1", "Broadlink"),
0x7918: ("SP2", "Broadlink (OEM)"),
0x7919: ("SP2-compatible", "Honeywell"),
0x791A: ("SP2-compatible", "Honeywell"),
0x7D0D: ("SP mini 3", "Broadlink (OEM)"),
},
sp2s: {
0x2711: ("SP2", "Broadlink"),
0x2716: ("NEO PRO", "Ankuoo"),
0x271D: ("Ego", "Efergy"),
0x2736: ("SP mini+", "Broadlink"),
},
sp3: {
0x2733: ("SP3", "Broadlink"),
0x7D00: ("SP3-EU", "Broadlink (OEM)"),
},
sp3s: {
0x9479: ("SP3S-US", "Broadlink"),
0x947A: ("SP3S-EU", "Broadlink"),
},
sp4: {
0x7568: ("SP4L-CN", "Broadlink"),
0x756C: ("SP4M", "Broadlink"),
0x756F: ("MCB1", "Broadlink"),
0x7579: ("SP4L-EU", "Broadlink"),
0x757B: ("SP4L-AU", "Broadlink"),
0x7583: ("SP mini 3", "Broadlink"),
0x7587: ("SP4L-UK", "Broadlink"),
0x7D11: ("SP mini 3", "Broadlink"),
0xA569: ("SP4L-UK", "Broadlink"),
0xA56A: ("MCB1", "Broadlink"),
0xA56B: ("SCB1E", "Broadlink"),
0xA56C: ("SP4L-EU", "Broadlink"),
0xA589: ("SP4L-UK", "Broadlink"),
0xA5D3: ("SP4L-EU", "Broadlink"),
},
sp4b: {
0x5115: ("SCB1E", "Broadlink"),
0x51E2: ("AHC/U-01", "BG Electrical"),
0x6111: ("MCB1", "Broadlink"),
0x6113: ("SCB1E", "Broadlink"),
0x618B: ("SP4L-EU", "Broadlink"),
0x6489: ("SP4L-AU", "Broadlink"),
0x648B: ("SP4M-US", "Broadlink"),
0x648C: ("SP4L-US", "Broadlink"),
0x6494: ("SCB2", "Broadlink"),
},
rmmini: {
0x2737: ("RM mini 3", "Broadlink"),
0x278F: ("RM mini", "Broadlink"),
0x27C2: ("RM mini 3", "Broadlink"),
0x27C7: ("RM mini 3", "Broadlink"),
0x27CC: ("RM mini 3", "Broadlink"),
0x27CD: ("RM mini 3", "Broadlink"),
0x27D0: ("RM mini 3", "Broadlink"),
0x27D1: ("RM mini 3", "Broadlink"),
0x27D3: ("RM mini 3", "Broadlink"),
0x27DC: ("RM mini 3", "Broadlink"),
0x27DE: ("RM mini 3", "Broadlink"),
},
rmpro: {
0x2712: ("RM pro/pro+", "Broadlink"),
0x272A: ("RM pro", "Broadlink"),
0x273D: ("RM pro", "Broadlink"),
0x277C: ("RM home", "Broadlink"),
0x2783: ("RM home", "Broadlink"),
0x2787: ("RM pro", "Broadlink"),
0x278B: ("RM plus", "Broadlink"),
0x2797: ("RM pro+", "Broadlink"),
0x279D: ("RM pro+", "Broadlink"),
0x27A1: ("RM plus", "Broadlink"),
0x27A6: ("RM plus", "Broadlink"),
0x27A9: ("RM pro+", "Broadlink"),
0x27C3: ("RM pro+", "Broadlink"),
},
rmminib: {
0x5F36: ("RM mini 3", "Broadlink"),
0x6507: ("RM mini 3", "Broadlink"),
0x6508: ("RM mini 3", "Broadlink"),
},
rm4mini: {
0x51DA: ("RM4 mini", "Broadlink"),
0x5209: ("RM4 TV mate", "Broadlink"),
0x520C: ("RM4 mini", "Broadlink"),
0x520D: ("RM4C mini", "Broadlink"),
0x5211: ("RM4C mate", "Broadlink"),
0x5212: ("RM4 TV mate", "Broadlink"),
0x5216: ("RM4 mini", "Broadlink"),
0x521C: ("RM4 mini", "Broadlink"),
0x6070: ("RM4C mini", "Broadlink"),
0x610E: ("RM4 mini", "Broadlink"),
0x610F: ("RM4C mini", "Broadlink"),
0x62BC: ("RM4 mini", "Broadlink"),
0x62BE: ("RM4C mini", "Broadlink"),
0x6364: ("RM4S", "Broadlink"),
0x648D: ("RM4 mini", "Broadlink"),
0x6539: ("RM4C mini", "Broadlink"),
0x653A: ("RM4 mini", "Broadlink"),
},
rm4pro: {
0x520B: ("RM4 pro", "Broadlink"),
0x5213: ("RM4 pro", "Broadlink"),
0x5218: ("RM4C pro", "Broadlink"),
0x6026: ("RM4 pro", "Broadlink"),
0x6184: ("RM4C pro", "Broadlink"),
0x61A2: ("RM4 pro", "Broadlink"),
0x649B: ("RM4 pro", "Broadlink"),
0x653C: ("RM4 pro", "Broadlink"),
},
a1: {
0x2714: ("A1", "Broadlink"),
},
a2: {
0x4F60: ("A2", "Broadlink"),
},
mp1: {
0x4EB5: ("MP1-1K4S", "Broadlink"),
0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"),
0x4F65: ("MP1-1K3S2U", "Broadlink"),
},
mp1s: {
0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"),
},
lb1: {
0x5043: ("SB800TD", "Broadlink (OEM)"),
0x504E: ("LB1", "Broadlink"),
0x606E: ("SB500TD", "Broadlink (OEM)"),
0x60C7: ("LB1", "Broadlink"),
0x60C8: ("LB1", "Broadlink"),
0x6112: ("LB1", "Broadlink"),
0x644B: ("LB1", "Broadlink"),
0x644C: ("LB27 R1", "Broadlink"),
0x644E: ("LB26 R1", "Broadlink"),
},
lb2: {
0xA4F4: ("LB27 R1", "Broadlink"),
0xA5F7: ("LB27 R1", "Broadlink"),
},
S1C: {
0x2722: ("S2KIT", "Broadlink"),
},
s3: {
0xA59C: ("S3", "Broadlink"),
0xA64D: ("S3", "Broadlink"),
},
hvac: {
0x4E2A: ("HVAC", "Licensed manufacturer"),
},
hysen: {
0x4EAD: ("HY02/HY03", "Hysen"),
},
dooya: {
0x4E4D: ("DT360E-45/20", "Dooya"),
},
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"),
},
ehc31: {
0x6480: ("EHC31", "BG Electrical"),
},
}
def gendevice(
dev_type: int,
host: Tuple[str, int],
mac: Union[bytes, str],
name: str = None,
is_locked: bool = None,
) -> device:
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
name: str = "",
is_locked: bool = False,
) -> Device:
"""Generate a device."""
try:
dev_class, model, manufacturer = SUPPORTED_TYPES[dev_type]
for dev_cls, products in SUPPORTED_TYPES.items():
try:
model, manufacturer = products[dev_type]
except KeyError:
return device(host, mac, dev_type, name=name, is_locked=is_locked)
except KeyError:
continue
return dev_class(
host,
mac,
dev_type,
name=name,
model=model,
manufacturer=manufacturer,
is_locked=is_locked,
)
return dev_cls(
host,
mac,
dev_type,
name=name,
model=model,
manufacturer=manufacturer,
is_locked=is_locked,
)
return Device(host, mac, dev_type, name=name, is_locked=is_locked)
def hello(
host: str,
port: int = 80,
timeout: int = 10,
local_ip_address: str = None,
) -> device:
ip_address: str,
port: int = DEFAULT_PORT,
timeout: int = DEFAULT_TIMEOUT,
) -> Device:
"""Direct device discovery.
Useful if the device is locked.
"""
try:
return next(xdiscover(timeout, local_ip_address, host, port))
except StopIteration:
raise exception(-4000) # Network timeout.
return next(
xdiscover(
timeout=timeout,
discover_ip_address=ip_address,
discover_ip_port=port,
)
)
except StopIteration as err:
raise e.NetworkTimeoutError(
-4000,
"Network timeout",
f"No response received within {timeout}s",
) from err
def discover(
timeout: int = 10,
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> List[device]:
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.List[Device]:
"""Discover devices connected to the local network."""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
return [gendevice(*resp) for resp in responses]
def xdiscover(
timeout: int = 10,
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[device, None, None]:
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[Device, None, None]:
"""Discover devices connected to the local network.
This function returns a generator that yields devices instantly.
"""
responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port)
responses = scan(
timeout, local_ip_address, discover_ip_address, discover_ip_port
)
for resp in responses:
yield gendevice(*resp)
# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode.
# Only tested with Broadlink RM3 Mini (Blackbean)
def setup(ssid: str, password: str, security_mode: int) -> None:
def setup(
ssid: str,
password: str,
security_mode: int,
ip_address: str = DEFAULT_BCAST_ADDR,
) -> None:
"""Set up a new Broadlink device via AP mode."""
# Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2)
payload = bytearray(0x88)
@ -209,5 +320,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(payload, ("255.255.255.255", 80))
sock.sendto(payload, (ip_address, DEFAULT_PORT))
sock.close()

View File

@ -1,28 +1,26 @@
"""Support for alarm kits."""
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class S1C(device):
class S1C(Device):
"""Controls a Broadlink S1C."""
TYPE = "S1C"
_SENSORS_TYPES = {
0x31: "Door Sensor", # 49 as hex
0x91: "Key Fob", # 145 as hex, as serial on fob corpse
0x21: "Motion Sensor", # 33 as hex
0x31: "Door Sensor",
0x91: "Key Fob",
0x21: "Motion Sensor",
}
def get_sensors_status(self) -> dict:
"""Return the state of the sensors."""
packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
packet[0] = 0x06
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if not payload:
return None
count = payload[0x4]
sensor_data = payload[0x6:]
sensors = [

420
broadlink/climate.py Normal file → Executable file
View File

@ -1,92 +1,95 @@
"""Support for climate control."""
from typing import List
import logging
from enum import IntEnum, unique
import enum
import struct
from typing import List, Sequence
from .device import device
from .exceptions import check_error
from .helpers import calculate_crc16
from . import exceptions as e
from .device import Device
from .helpers import CRC16
class hysen(device):
"""Controls a Hysen HVAC."""
class hysen(Device):
"""Controls a Hysen heating thermostat.
TYPE = "Hysen heating controller"
This device is manufactured by Hysen and sold under different
brands, including Floureon, Beca Energy, Beok and Decdeal.
# Send a request
# input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00])
# Returns decrypted payload
# New behaviour: raises a ValueError if the device response indicates an error or CRC check fails
# The function prepends length (2 bytes) and appends CRC
Supported models:
- HY02B05H
- HY03WE
"""
def send_request(self, input_payload: bytes) -> bytes:
TYPE = "HYS"
def send_request(self, request: Sequence[int]) -> bytes:
"""Send a request to the device."""
crc = calculate_crc16(input_payload)
packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little"))
packet.extend(request)
packet.extend(CRC16.calculate(request).to_bytes(2, "little"))
# first byte is length, +2 for CRC16
request_payload = bytearray([len(input_payload) + 2, 0x00])
request_payload.extend(input_payload)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
# append CRC
request_payload.append(crc & 0xFF)
request_payload.append((crc >> 8) & 0xFF)
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
# send to device
response = self.send_packet(0x6A, request_payload)
check_error(response[0x22:0x24])
response_payload = self.decrypt(response[0x38:])
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
response_payload_len = response_payload[0]
if response_payload_len + 2 > len(response_payload):
raise ValueError(
"hysen_response_error", "first byte of response is not length"
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
crc = calculate_crc16(response_payload[2:response_payload_len])
if (response_payload[response_payload_len] == crc & 0xFF) and (
response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF
):
return response_payload[2:response_payload_len]
raise ValueError("hysen_response_error", "CRC check on response failed")
def get_temp(self) -> int:
return payload[0x02:p_len]
def _decode_temp(self, payload, base_index):
base_temp = payload[base_index] / 2.0
add_offset = (payload[4] >> 3) & 1 # should offset be added?
offset_raw_value = (payload[17] >> 4) & 3 # offset value
offset = (offset_raw_value + 1) / 10 if add_offset else 0.0
return base_temp + offset
def get_temp(self) -> float:
"""Return the room temperature in degrees celsius."""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]))
return payload[0x05] / 2.0
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return self._decode_temp(payload, 5)
def get_external_temp(self) -> int:
def get_external_temp(self) -> float:
"""Return the external temperature in degrees celsius."""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]))
return payload[18] / 2.0
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return self._decode_temp(payload, 18)
def get_full_status(self) -> dict:
"""Return the state of the device.
Timer schedule included.
"""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]))
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])
data = {}
data["remote_lock"] = payload[3] & 1
data["power"] = payload[4] & 1
data["active"] = (payload[4] >> 4) & 1
data["temp_manual"] = (payload[4] >> 6) & 1
data["room_temp"] = (payload[5] & 255) / 2.0
data["thermostat_temp"] = (payload[6] & 255) / 2.0
data["auto_mode"] = payload[7] & 15
data["loop_mode"] = (payload[7] >> 4) & 15
data["heating_cooling"] = (payload[4] >> 7) & 1
data["room_temp"] = self._decode_temp(payload, 5)
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0x0F
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
data["dif"] = payload[10]
data["svh"] = payload[11]
data["svl"] = payload[12]
data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0
if data["room_temp_adj"] > 32767:
data["room_temp_adj"] = 32767 - data["room_temp_adj"]
data["room_temp_adj"] = (
int.from_bytes(payload[13:15], "big", signed=True) / 10.0
)
data["fre"] = payload[15]
data["poweron"] = payload[16]
data["unknown"] = payload[17]
data["external_temp"] = (payload[18] & 255) / 2.0
data["external_temp"] = self._decode_temp(payload, 18)
data["hour"] = payload[19]
data["min"] = payload[20]
data["sec"] = payload[21]
@ -121,13 +124,15 @@ class hysen(device):
# Manual mode will activate last used temperature.
# In typical usage call set_temp to activate manual control and set temp.
# loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ]
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule
# loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule
# E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule)
# loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule)
# The sensor command is currently experimental
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
def set_mode(
self, auto_mode: int, loop_mode: int, sensor: int = 0
) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]))
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
# Advanced settings
# Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor,
@ -136,10 +141,10 @@ class hysen(device):
# Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C
# Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C
# Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C
# Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C
# Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
# 1 for anti-freezing function open. Factory default: 0
# Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0
# Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0
def set_advanced(
self,
loop_mode: int,
@ -153,7 +158,7 @@ class hysen(device):
poweron: int,
) -> None:
"""Set advanced options."""
input_payload = bytearray(
self.send_request(
[
0x01,
0x10,
@ -168,13 +173,12 @@ class hysen(device):
dif,
svh,
svl,
(int(adj * 2) >> 8 & 0xFF),
(int(adj * 2) & 0xFF),
int(adj * 10) >> 8 & 0xFF,
int(adj * 10) & 0xFF,
fre,
poweron,
]
)
self.send_request(input_payload)
# For backwards compatibility only. Prefer calling set_mode directly.
# Note this function invokes loop_mode=0 and sensor=0.
@ -189,22 +193,36 @@ class hysen(device):
# Set temperature for manual mode (also activates manual mode if currently in automatic)
def set_temp(self, temp: float) -> None:
"""Set the target temperature."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]))
self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])
# Set device on(1) or off(0), does not deactivate Wifi connectivity.
# Remote lock disables control by buttons on thermostat.
def set_power(self, power: int = 1, remote_lock: int = 0) -> None:
# heating_cooling: heating(0) cooling(1)
def set_power(
self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0
) -> None:
"""Set the power state of the device."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power]))
state = (heating_cooling << 7) + power
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state])
# set time on device
# n.b. day=1 is Monday, ..., day=7 is Sunday
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(
bytearray(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
)
[
0x01,
0x10,
0x00,
0x08,
0x00,
0x02,
0x04,
hour,
minute,
second,
day
]
)
# Set timer schedule
@ -215,71 +233,78 @@ class hysen(device):
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
"""Set timer schedule."""
# Begin with some magic values ...
input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18])
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
# Now simply append times/temps
# weekday times
for i in range(0, 6):
input_payload.append(weekday[i]["start_hour"])
input_payload.append(weekday[i]["start_minute"])
request.append(weekday[i]["start_hour"])
request.append(weekday[i]["start_minute"])
# weekend times
for i in range(0, 2):
input_payload.append(weekend[i]["start_hour"])
input_payload.append(weekend[i]["start_minute"])
request.append(weekend[i]["start_hour"])
request.append(weekend[i]["start_minute"])
# weekday temperatures
for i in range(0, 6):
input_payload.append(int(weekday[i]["temp"] * 2))
request.append(int(weekday[i]["temp"] * 2))
# weekend temperatures
for i in range(0, 2):
input_payload.append(int(weekend[i]["temp"] * 2))
request.append(int(weekend[i]["temp"] * 2))
self.send_request(input_payload)
self.send_request(request)
class hvac(device):
class hvac(Device):
"""Controls a HVAC.
Supported models:
- Tornado SMART X SQ series.
- Tornado SMART X SQ series
- Aux ASW-H12U3/JIR1DI-US
- Aux ASW-H36U2/LFR1DI-US
"""
@unique
class Mode(IntEnum):
TYPE = "HVAC"
@enum.unique
class Mode(enum.IntEnum):
"""Enumerates modes."""
AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4
@unique
class Speed(IntEnum):
@enum.unique
class Speed(enum.IntEnum):
"""Enumerates fan speed."""
HIGH = 1
MID = 2
LOW = 3
AUTO = 5
@unique
class Preset(IntEnum):
@enum.unique
class Preset(enum.IntEnum):
"""Enumerates presets."""
NORMAL = 0
TURBO = 1
MUTE = 2
@unique
class SwHoriz(IntEnum):
@enum.unique
class SwHoriz(enum.IntEnum):
"""Enumerates horizontal swing."""
ON = 0
OFF = 7
@unique
class SwVert(IntEnum):
@enum.unique
class SwVert(enum.IntEnum):
"""Enumerates vertical swing."""
ON = 0
POS1 = 1
POS2 = 2
@ -288,50 +313,109 @@ class hvac(device):
POS5 = 5
OFF = 7
def __init__(self, *args, **kwargs):
device.__init__(self, *args, **kwargs)
self.type = "HVAC"
def _crc(self, data: bytes) -> int:
"""Calculate CRC of a byte object."""
s = sum([v if i % 2 == 0 else v << 8 for i, v in enumerate(data)])
# trim the overflow and add it to smallest bit
s = (s & 0xffff) + (s >> 16)
return (0xffff - s) # TODO: fix: we can't return negative values
def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 8 + len(data)
struct.pack_into("<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data))
p_len = 10 + len(data)
struct.pack_into(
"<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data)
)
packet += data
packet += self._crc(packet[2:]).to_bytes(2, "little")
crc = CRC16.calculate(packet[0x02:], polynomial=0x9BE4)
packet += crc.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x2], "little")
checksum = int.from_bytes(payload[p_len:p_len+2], "little")
p_len = int.from_bytes(payload[:0x02], "little")
nom_crc = int.from_bytes(payload[p_len:p_len+2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)
if checksum != self._crc(payload[0x2:p_len]):
logging.debug(
"Checksum incorrect (calculated %s actual %s).",
checksum.hex(), payload[p_len:p_len+2].hex()
if nom_crc != real_crc:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_crc} and received {real_crc}",
)
d_len = int.from_bytes(payload[0x8:0xA], "little")
return payload[0xA:0xA+d_len]
d_len = int.from_bytes(payload[0x08:0x0A], "little")
return payload[0x0A:0x0A+d_len]
def _send(self, command: int, data: bytes = b'') -> bytes:
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a command to the unit."""
command = bytes([((command << 4) | 1), 1])
packet = self._encode(command + data)
logging.debug("Payload:\n%s", packet.hex(' '))
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response)[0x2:]
prefix = bytes([((command << 4) | 1), 1])
packet = self._encode(prefix + data)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)[0x02:]
def _parse_state(self, data: bytes) -> dict:
"""Parse state."""
state = {}
state["power"] = bool(data[0x08] & 1 << 5)
state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5
state["swing_v"] = self.SwVert(data[0x00] & 0b111)
state["swing_h"] = self.SwHoriz(data[0x01] >> 5)
state["mode"] = self.Mode(data[0x05] >> 5)
state["speed"] = self.Speed(data[0x03] >> 5)
state["preset"] = self.Preset(data[0x04] >> 6)
state["sleep"] = bool(data[0x05] & 1 << 2)
state["ifeel"] = bool(data[0x05] & 1 << 3)
state["health"] = bool(data[0x08] & 1 << 1)
state["clean"] = bool(data[0x08] & 1 << 2)
state["display"] = bool(data[0x0A] & 1 << 4)
state["mildew"] = bool(data[0x0A] & 1 << 3)
return state
def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: Mode,
speed: Speed,
preset: Preset,
swing_h: SwHoriz,
swing_v: SwVert,
sleep: bool,
ifeel: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> dict:
"""Set the state of the device."""
# TODO: decode unknown bits
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101
target_temp = round(target_temp * 2) / 2
if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW
elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH
data = bytearray(0x0D)
data[0x00] = (int(target_temp) - 8 << 3) | swing_v
data[0x01] = (swing_h << 5) | UNK0
data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x03] = speed << 5
data[0x04] = preset << 6
data[0x05] = mode << 5 | sleep << 2 | ifeel << 3
data[0x08] = power << 5 | clean << 2 | (health and 0b11)
data[0x0A] = display << 4 | mildew << 3
data[0x0C] = UNK2
resp = self._send(0, data)
return self._parse_state(resp)
def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.
@ -346,37 +430,22 @@ class hvac(device):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
ifeel (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(0x1)
resp = self._send(1)
if (len(resp) != 0xF):
raise ValueError(f"unexpected resp size: {len(resp)}")
if len(resp) < 13:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 15 bytes and received {len(resp) + 2}",
)
logging.debug("Received resp:\n%s", resp.hex(' '))
logging.debug("0b[R] mask: %x, 0c[R] mask: %x, cmnd_16: %x",
resp[0x3] & 0xF, resp[0x4] & 0xF, resp[0x4])
state = {}
state['power'] = resp[0x8] & 1 << 5
state['target_temp'] = 8 + (resp[0x0] >> 3) + (resp[0x4] >> 7) * 0.5
state['swing_v'] = self.SwVert(resp[0x0] & 0b111)
state['swing_h'] = self.SwHoriz(resp[0x1] >> 5)
state['mode'] = self.Mode(resp[0x5] >> 5)
state['speed'] = self.Speed(resp[0x3] >> 5)
state['preset'] = self.Preset(resp[0x4] >> 6)
state['sleep'] = bool(resp[0x5] & 1 << 2)
state['health'] = bool(resp[0x8] & 1 << 1)
state['clean'] = bool(resp[0x8] & 1 << 2)
state['display'] = bool(resp[0xA] & 1 << 4)
state['mildew'] = bool(resp[0xA] & 1 << 3)
logging.debug("State: %s", state)
return state
return self._parse_state(resp)
def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.
@ -387,68 +456,19 @@ class hvac(device):
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if (len(resp) != 0x18):
raise ValueError(f"unexpected resp size: {len(resp)}")
logging.debug("Received resp:\n%s", resp.hex(' '))
if len(resp) < 22:
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 24 bytes and received {len(resp) + 2}",
)
ac_info = {}
ac_info["power"] = resp[0x1] & 1
ambient_temp = resp[0x5] & 0b11111, resp[0x15] & 0b11111
ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0
logging.debug("AC info: %s", ac_info)
return ac_info
def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: int, # hvac.Mode
speed: int, # hvac.Speed
preset: int, # hvac.Preset
swing_h: int, # hvac.SwHoriz
swing_v: int, # hvac.SwVert
sleep: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> None:
"""Set the state of the device."""
# TODO: What does these values represent?
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101
target_temp = round(target_temp * 2) / 2
if not (16 <= target_temp <= 32):
raise ValueError(f"target_temp out of range: {target_temp}")
if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW
elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH
data = bytearray(0xD)
data[0x0] = (int(target_temp) - 8 << 3) | swing_v
data[0x1] = (swing_h << 5) | UNK0
data[0x2] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x3] = speed << 5
data[0x4] = preset << 6
data[0x5] = mode << 5 | (sleep << 2)
data[0x8] = (power << 5 | clean << 2 | health * 0b11)
data[0xA] = display << 4 | mildew << 3
data[0xC] = UNK2
logging.debug("Constructed payload data:\n%s", data.hex(' '))
response_payload = self._send(0, data)
logging.debug("Response payload:\n%s", response_payload.hex(' '))

5
broadlink/const.py Normal file
View File

@ -0,0 +1,5 @@
"""Constants."""
DEFAULT_BCAST_ADDR = "255.255.255.255"
DEFAULT_PORT = 80
DEFAULT_RETRY_INTVL = 1
DEFAULT_TIMEOUT = 10

View File

@ -1,40 +1,41 @@
"""Support for covers."""
import time
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class dooya(device):
class dooya(Device):
"""Controls a Dooya curtain motor."""
TYPE = "Dooya DT360E"
TYPE = "DT360E"
def _send(self, magic1: int, magic2: int) -> int:
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
packet[2] = 0xBB
packet[3] = magic1
packet[4] = magic2
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
packet[0x00] = 0x09
packet[0x02] = 0xBB
packet[0x03] = command
packet[0x04] = attribute
packet[0x09] = 0xFA
packet[0x0A] = 0x44
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4]
def open(self) -> int:
"""Open the curtain."""
return self._send(0x01, 0x00)
return self._send(0x01)
def close(self) -> int:
"""Close the curtain."""
return self._send(0x02, 0x00)
return self._send(0x02)
def stop(self) -> int:
"""Stop the curtain."""
return self._send(0x03, 0x00)
return self._send(0x03)
def get_percentage(self) -> int:
"""Return the position of the curtain."""
@ -55,3 +56,126 @@ class dooya(device):
time.sleep(0.2)
current = self.get_percentage()
self.stop()
class dooya2(Device):
"""Controls a Dooya curtain motor (version 2)."""
TYPE = "DT360E-2"
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def open(self) -> None:
"""Open the curtain."""
self._send(2, [0x00, 0x01, 0x00])
def close(self) -> None:
"""Close the curtain."""
self._send(2, [0x00, 0x02, 0x00])
def stop(self) -> None:
"""Stop the curtain."""
self._send(2, [0x00, 0x03, 0x00])
def get_percentage(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]
def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(2, [0x00, 0x09, new_percentage])
class wser(Device):
"""Controls a Wistar curtain motor"""
TYPE = "WSER"
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4A, 0x31, 0xA0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xA0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4C, 0x73, 0xA0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xA0])
position = resp[0x0E]
return position

View File

@ -3,23 +3,29 @@ import socket
import threading
import random
import time
from typing import Generator, Tuple, Union
import typing as t
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .exceptions import check_error, exception
from . import exceptions as e
from .const import (
DEFAULT_BCAST_ADDR,
DEFAULT_PORT,
DEFAULT_RETRY_INTVL,
DEFAULT_TIMEOUT,
)
from .protocol import Datetime
HelloResponse = Tuple[int, Tuple[str, int], str, str, bool]
HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool]
def scan(
timeout: int = 10,
timeout: int = DEFAULT_TIMEOUT,
local_ip_address: str = None,
discover_ip_address: str = "255.255.255.255",
discover_ip_port: int = 80,
) -> Generator[HelloResponse, None, None]:
discover_ip_address: str = DEFAULT_BCAST_ADDR,
discover_ip_port: int = DEFAULT_PORT,
) -> t.Generator[HelloResponse, None, None]:
"""Broadcast a hello message and yield responses."""
conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -47,7 +53,7 @@ def scan(
try:
while (time.time() - start_time) < timeout:
time_left = timeout - (time.time() - start_time)
conn.settimeout(min(1, time_left))
conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left))
conn.sendto(packet, (discover_ip_address, discover_ip_port))
while True:
@ -64,13 +70,13 @@ def scan(
discovered.append((host, mac, devtype))
name = resp[0x40:].split(b"\x00")[0].decode()
is_locked = bool(resp[-1])
is_locked = bool(resp[0x7F])
yield devtype, host, mac, name, is_locked
finally:
conn.close()
def ping(address: str, port: int = 80) -> None:
def ping(ip_address: str, port: int = DEFAULT_PORT) -> None:
"""Send a ping packet to an address.
This packet feeds the watchdog timer of firmwares >= v53.
@ -81,62 +87,71 @@ def ping(address: str, port: int = 80) -> None:
conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
packet = bytearray(0x30)
packet[0x26] = 1
conn.sendto(packet, (address, port))
conn.sendto(packet, (ip_address, port))
class device:
class Device:
"""Controls a Broadlink device."""
TYPE = "Unknown"
__INIT_KEY = "097628343fe99e23765c1513accf8b02"
__INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58"
def __init__(
self,
host: Tuple[str, int],
mac: Union[bytes, str],
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
devtype: int,
timeout: int = 10,
name: str = None,
model: str = None,
manufacturer: str = None,
is_locked: bool = None,
timeout: int = DEFAULT_TIMEOUT,
name: str = "",
model: str = "",
manufacturer: str = "",
is_locked: bool = False,
) -> None:
"""Initialize the controller."""
self.host = host
self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac
self.devtype = devtype if devtype is not None else 0x272A
self.devtype = devtype
self.timeout = timeout
self.name = name
self.model = model
self.manufacturer = manufacturer
self.is_locked = is_locked
self.count = random.randint(0x8000, 0xFFFF)
self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58")
self.iv = bytes.fromhex(self.__INIT_VECT)
self.id = 0
self.type = self.TYPE # For backwards compatibility.
self.lock = threading.Lock()
self.aes = None
key = bytes.fromhex("097628343fe99e23765c1513accf8b02")
self.update_aes(key)
self.update_aes(bytes.fromhex(self.__INIT_KEY))
def __repr__(self):
return "<%s: %s %s (%s) at %s:%s | %s | %s | %s>" % (
type(self).__name__,
self.manufacturer,
self.model,
hex(self.devtype),
self.host[0],
self.host[1],
":".join(format(x, "02x") for x in self.mac),
def __repr__(self) -> str:
"""Return a formal representation of the device."""
return (
"%s.%s(%s, mac=%r, devtype=%r, timeout=%r, name=%r, "
"model=%r, manufacturer=%r, is_locked=%r)"
) % (
self.__class__.__module__,
self.__class__.__qualname__,
self.host,
self.mac,
self.devtype,
self.timeout,
self.name,
"Locked" if self.is_locked else "Unlocked",
self.model,
self.manufacturer,
self.is_locked,
)
def __str__(self):
return "%s (%s at %s)" % (
self.name,
self.model or hex(self.devtype),
self.host[0],
def __str__(self) -> str:
"""Return a readable representation of the device."""
return "%s (%s / %s:%s / %s)" % (
self.name or "Unknown",
" ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])),
*self.host,
":".join(format(x, "02X") for x in self.mac),
)
def update_aes(self, key: bytes) -> None:
@ -157,22 +172,21 @@ class device:
def auth(self) -> bool:
"""Authenticate to the device."""
payload = bytearray(0x50)
payload[0x04:0x14] = [0x31]*16
payload[0x1E] = 0x01
payload[0x2D] = 0x01
payload[0x30:0x37] = "Test 1".encode()
self.id = 0
self.update_aes(bytes.fromhex(self.__INIT_KEY))
response = self.send_packet(0x65, payload)
check_error(response[0x22:0x24])
packet = bytearray(0x50)
packet[0x04:0x14] = [0x31] * 16
packet[0x1E] = 0x01
packet[0x2D] = 0x01
packet[0x30:0x36] = "Test 1".encode()
response = self.send_packet(0x65, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
key = payload[0x04:0x14]
if len(key) % 16 != 0:
return False
self.id = int.from_bytes(payload[:0x4], "little")
self.update_aes(key)
self.update_aes(payload[0x04:0x14])
return True
def hello(self, local_ip_address=None) -> bool:
@ -187,12 +201,30 @@ class device:
discover_ip_port=self.host[1],
)
try:
devtype, host, mac, name, is_locked = next(responses)
except StopIteration:
raise exception(-4000) # Network timeout.
devtype, _, mac, name, is_locked = next(responses)
if (devtype, host, mac) != (self.devtype, self.host, self.mac):
raise exception(-2040) # Device information is not intact.
except StopIteration as err:
raise e.NetworkTimeoutError(
-4000,
"Network timeout",
f"No response received within {self.timeout}s",
) from err
if mac != self.mac:
raise e.DataValidationError(
-2040,
"Device information is not intact",
"The MAC address is different",
f"Expected {self.mac} and received {mac}",
)
if devtype != self.devtype:
raise e.DataValidationError(
-2040,
"Device information is not intact",
"The product ID is different",
f"Expected {self.devtype} and received {devtype}",
)
self.name = name
self.is_locked = is_locked
@ -211,7 +243,7 @@ class device:
"""Get firmware version."""
packet = bytearray([0x68])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x4] | payload[0x5] << 8
@ -220,9 +252,9 @@ class device:
packet = bytearray(4)
packet += name.encode("utf-8")
packet += bytearray(0x50 - len(packet))
packet[0x43] = bool(self.is_locked)
packet[0x43] = self.is_locked
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.name = name
def set_lock(self, state: bool) -> None:
@ -232,7 +264,7 @@ class device:
packet += bytearray(0x50 - len(packet))
packet[0x43] = bool(state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
self.is_locked = bool(state)
def get_type(self) -> str:
@ -246,8 +278,8 @@ class device:
packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55")
packet[0x24:0x26] = self.devtype.to_bytes(2, "little")
packet[0x26:0x28] = packet_type.to_bytes(2, "little")
packet[0x28:0x2a] = self.count.to_bytes(2, "little")
packet[0x2a:0x30] = self.mac[::-1]
packet[0x28:0x2A] = self.count.to_bytes(2, "little")
packet[0x2A:0x30] = self.mac[::-1]
packet[0x30:0x34] = self.id.to_bytes(4, "little")
p_checksum = sum(payload, 0xBEAF) & 0xFFFF
@ -266,21 +298,35 @@ class device:
while True:
time_left = timeout - (time.time() - start_time)
conn.settimeout(min(1, time_left))
conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left))
conn.sendto(packet, self.host)
try:
resp = conn.recvfrom(2048)[0]
break
except socket.timeout:
except socket.timeout as err:
if (time.time() - start_time) > timeout:
raise exception(-4000) # Network timeout.
raise e.NetworkTimeoutError(
-4000,
"Network timeout",
f"No response received within {timeout}s",
) from err
if len(resp) < 0x30:
raise exception(-4007) # Length error.
raise e.DataValidationError(
-4007,
"Received data packet length error",
f"Expected at least 48 bytes and received {len(resp)}",
)
checksum = int.from_bytes(resp[0x20:0x22], "little")
if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum:
raise exception(-4008) # Checksum error.
nom_checksum = int.from_bytes(resp[0x20:0x22], "little")
real_checksum = sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF
if nom_checksum != real_checksum:
raise e.DataValidationError(
-4008,
"Received data packet check error",
f"Expected a checksum of {nom_checksum} and received {real_checksum}",
)
return resp

View File

@ -1,19 +1,17 @@
"""Exceptions for Broadlink devices."""
import collections
import struct
class BroadlinkException(Exception):
"""Common base class for all Broadlink exceptions."""
"""Base class common to all Broadlink exceptions."""
def __init__(self, *args, **kwargs):
"""Initialize the exception."""
super().__init__(*args, **kwargs)
if len(args) >= 3:
if len(args) >= 2:
self.errno = args[0]
self.strerror = "%s: %s" % (args[1], args[2])
elif len(args) == 2:
self.errno = args[0]
self.strerror = str(args[1])
self.strerror = ": ".join(str(arg) for arg in args[1:])
elif len(args) == 1:
self.errno = None
self.strerror = str(args[0])
@ -22,80 +20,93 @@ class BroadlinkException(Exception):
self.strerror = ""
def __str__(self):
"""Return the error message."""
"""Return str(self)."""
if self.errno is not None:
return "[Errno %s] %s" % (self.errno, self.strerror)
return self.strerror
def __eq__(self, other):
"""Return self==value."""
# pylint: disable=unidiomatic-typecheck
return type(self) == type(other) and self.args == other.args
class FirmwareException(BroadlinkException):
"""Common base class for all firmware exceptions."""
def __hash__(self):
"""Return hash(self)."""
return hash((type(self), self.args))
class AuthenticationError(FirmwareException):
class MultipleErrors(BroadlinkException):
"""Multiple errors."""
def __init__(self, *args, **kwargs):
"""Initialize the exception."""
errors = args[0][:] if args else []
counter = collections.Counter(errors)
strerror = "Multiple errors occurred: %s" % counter
super().__init__(strerror, **kwargs)
self.errors = errors
def __repr__(self):
"""Return repr(self)."""
return "MultipleErrors(%r)" % self.errors
def __str__(self):
"""Return str(self)."""
return self.strerror
class AuthenticationError(BroadlinkException):
"""Authentication error."""
class AuthorizationError(FirmwareException):
class AuthorizationError(BroadlinkException):
"""Authorization error."""
class CommandNotSupportedError(FirmwareException):
class CommandNotSupportedError(BroadlinkException):
"""Command not supported error."""
class ConnectionClosedError(FirmwareException):
class ConnectionClosedError(BroadlinkException):
"""Connection closed error."""
class DataValidationError(FirmwareException):
"""Data validation error."""
class StructureAbnormalError(BroadlinkException):
"""Structure abnormal error."""
class DeviceOfflineError(FirmwareException):
class DeviceOfflineError(BroadlinkException):
"""Device offline error."""
class ReadError(FirmwareException):
class ReadError(BroadlinkException):
"""Read error."""
class SendError(FirmwareException):
class SendError(BroadlinkException):
"""Send error."""
class SSIDNotFoundError(FirmwareException):
class SSIDNotFoundError(BroadlinkException):
"""SSID not found error."""
class StorageError(FirmwareException):
class StorageError(BroadlinkException):
"""Storage error."""
class WriteError(FirmwareException):
class WriteError(BroadlinkException):
"""Write error."""
class SDKException(BroadlinkException):
"""Common base class for all SDK exceptions."""
class DeviceInformationError(SDKException):
"""Device information is not intact."""
class ChecksumError(SDKException):
"""Received data packet check error."""
class LengthError(SDKException):
"""Received data packet length error."""
class NetworkTimeoutError(SDKException):
class NetworkTimeoutError(BroadlinkException):
"""Network timeout error."""
class DataValidationError(BroadlinkException):
"""Data validation error."""
class UnknownError(BroadlinkException):
"""Unknown error."""
@ -107,30 +118,34 @@ BROADLINK_EXCEPTIONS = {
-3: (DeviceOfflineError, "The device is offline"),
-4: (CommandNotSupportedError, "Command not supported"),
-5: (StorageError, "The device storage is full"),
-6: (DataValidationError, "Structure is abnormal"),
-6: (StructureAbnormalError, "Structure is abnormal"),
-7: (AuthorizationError, "Control key is expired"),
-8: (SendError, "Send error"),
-9: (WriteError, "Write error"),
-10: (ReadError, "Read error"),
-11: (SSIDNotFoundError, "SSID could not be found in AP configuration"),
# SDK related errors are generated by this module.
-2040: (DeviceInformationError, "Device information is not intact"),
-2040: (DataValidationError, "Device information is not intact"),
-4000: (NetworkTimeoutError, "Network timeout"),
-4007: (LengthError, "Received data packet length error"),
-4008: (ChecksumError, "Received data packet check error"),
-4007: (DataValidationError, "Received data packet length error"),
-4008: (DataValidationError, "Received data packet check error"),
-4009: (DataValidationError, "Received data packet information type error"),
-4010: (DataValidationError, "Received encrypted data packet length error"),
-4011: (DataValidationError, "Received encrypted data packet check error"),
-4012: (AuthorizationError, "Device control ID error"),
}
def exception(error_code):
def exception(err_code: int) -> BroadlinkException:
"""Return exception corresponding to an error code."""
try:
exc, msg = BROADLINK_EXCEPTIONS[error_code]
return exc(error_code, msg)
exc, msg = BROADLINK_EXCEPTIONS[err_code]
return exc(err_code, msg)
except KeyError:
return UnknownError(error_code, "Unknown error")
return UnknownError(err_code, "Unknown error")
def check_error(error):
def check_error(error: bytes) -> None:
"""Raise exception if an error occurred."""
error_code = struct.unpack("h", error)[0]
if error_code:

View File

@ -1,26 +1,43 @@
"""Helper functions."""
from ctypes import c_ushort
"""Helper functions and classes."""
import typing as t
def calculate_crc16(input_data: bytes) -> int:
"""Calculate the CRC-16 of a byte string."""
crc16_tab = []
crc16_constant = 0xA001
class CRC16:
"""Helps with CRC-16 calculation.
for i in range(0, 256):
crc = c_ushort(i).value
for j in range(0, 8):
if crc & 0x0001:
crc = c_ushort(crc >> 1).value ^ crc16_constant
else:
crc = c_ushort(crc >> 1).value
crc16_tab.append(hex(crc))
CRC tables are cached for performance.
"""
crcValue = 0xFFFF
_cache: t.Dict[int, t.List[int]] = {}
for c in input_data:
tmp = crcValue ^ c
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0)
@classmethod
def get_table(cls, polynomial: int) -> t.List[int]:
"""Return the CRC-16 table for a polynomial."""
try:
crc_table = cls._cache[polynomial]
except KeyError:
crc_table = []
for dividend in range(0, 256):
remainder = dividend
for _ in range(0, 8):
if remainder & 1:
remainder = remainder >> 1 ^ polynomial
else:
remainder = remainder >> 1
crc_table.append(remainder)
cls._cache[polynomial] = crc_table
return crc_table
return crcValue
@classmethod
def calculate(
cls,
sequence: t.Sequence[int],
polynomial: int = 0xA001, # CRC-16-ANSI.
init_value: int = 0xFFFF,
) -> int:
"""Calculate the CRC-16 of a sequence of integers."""
crc_table = cls.get_table(polynomial)
crc = init_value
for item in sequence:
crc = crc >> 8 ^ crc_table[(crc ^ item) & 0xFF]
return crc

95
broadlink/hub.py Normal file
View File

@ -0,0 +1,95 @@
"""Support for hubs."""
import struct
import json
from . import exceptions as e
from .device import Device
class s3(Device):
"""Controls a Broadlink S3."""
TYPE = "S3"
MAX_SUBDEVICES = 8
def get_subdevices(self, step: int = 5) -> list:
"""Return a list of sub devices."""
total = self.MAX_SUBDEVICES
sub_devices = []
seen = set()
index = 0
while index < total:
state = {"count": step, "index": index}
packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
resp = self._decode(resp)
for device in resp["list"]:
did = device["did"]
if did in seen:
continue
seen.add(did)
sub_devices.append(device)
total = resp["total"]
if len(seen) >= total:
break
index += step
return sub_devices
def get_state(self, did: str = None) -> dict:
"""Return the power state of the device."""
state = {}
if did is not None:
state["did"] = did
packet = self._encode(1, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
self,
did: str = None,
pwr1: bool = None,
pwr2: bool = None,
pwr3: bool = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if did is not None:
state["did"] = did
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if pwr3 is not None:
state["pwr3"] = int(bool(pwr3))
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
return state

View File

@ -2,13 +2,12 @@
import enum
import json
import struct
import typing
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class lb1(device):
class lb1(Device):
"""Controls a Broadlink LB1."""
TYPE = "LB1"
@ -16,6 +15,7 @@ class lb1(device):
@enum.unique
class ColorMode(enum.IntEnum):
"""Enumerates color modes."""
RGB = 0
WHITE = 1
SCENE = 2
@ -27,7 +27,7 @@ class lb1(device):
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
@ -58,48 +58,140 @@ class lb1(device):
if green is not None:
state["green"] = int(green)
if brightness is not None:
state["brightness"] = brightness
state["brightness"] = int(brightness)
if colortemp is not None:
state["colortemp"] = colortemp
state["colortemp"] = int(colortemp)
if hue is not None:
state["hue"] = hue
state["hue"] = int(hue)
if saturation is not None:
state["saturation"] = saturation
state["saturation"] = int(saturation)
if transitionduration is not None:
state["transitionduration"] = transitionduration
state["transitionduration"] = int(transitionduration)
if maxworktime is not None:
state["maxworktime"] = maxworktime
state["maxworktime"] = int(maxworktime)
if bulb_colormode is not None:
state["bulb_colormode"] = bulb_colormode
state["bulb_colormode"] = int(bulb_colormode)
if bulb_scenes is not None:
state["bulb_scenes"] = bulb_scenes
state["bulb_scenes"] = str(bulb_scenes)
if bulb_scene is not None:
state["bulb_scene"] = bulb_scene
state["bulb_scene"] = str(bulb_scene)
if bulb_sceneidx is not None:
state["bulb_sceneidx"] = bulb_sceneidx
state["bulb_sceneidx"] = int(bulb_sceneidx)
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, obj: typing.Any) -> bytes:
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(14)
js = json.dumps(obj, separators=[',', ':']).encode()
p_len = 12 + len(js)
data = json.dumps(state, separators=(",", ":")).encode()
p_len = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet += js
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
packet[0x6:0x8] = checksum.to_bytes(2, "little")
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> typing.Any:
def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0xE : 0xE + js_len])
return state
class lb2(Device):
"""Controls a Broadlink LB26/LB27."""
TYPE = "LB2"
@enum.unique
class ColorMode(enum.IntEnum):
"""Enumerates color modes."""
RGB = 0
WHITE = 1
SCENE = 2
def get_state(self) -> dict:
"""Return the power state of the device.
Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
self,
pwr: bool = None,
red: int = None,
blue: int = None,
green: int = None,
brightness: int = None,
colortemp: int = None,
hue: int = None,
saturation: int = None,
transitionduration: int = None,
maxworktime: int = None,
bulb_colormode: int = None,
bulb_scenes: str = None,
bulb_scene: str = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if red is not None:
state["red"] = int(red)
if blue is not None:
state["blue"] = int(blue)
if green is not None:
state["green"] = int(green)
if brightness is not None:
state["brightness"] = int(brightness)
if colortemp is not None:
state["colortemp"] = int(colortemp)
if hue is not None:
state["hue"] = int(hue)
if saturation is not None:
state["saturation"] = int(saturation)
if transitionduration is not None:
state["transitionduration"] = int(transitionduration)
if maxworktime is not None:
state["maxworktime"] = int(maxworktime)
if bulb_colormode is not None:
state["bulb_colormode"] = int(bulb_colormode)
if bulb_scenes is not None:
state["bulb_scenes"] = str(bulb_scenes)
if bulb_scene is not None:
state["bulb_scene"] = str(bulb_scene)
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
return state

View File

@ -1,3 +1,4 @@
"""The networking part of the python-broadlink library."""
import datetime as dt
import time
@ -14,7 +15,7 @@ class Datetime:
data[0x04:0x06] = datetime.year.to_bytes(2, "little")
data[0x06] = datetime.minute
data[0x07] = datetime.hour
data[0x08] = int(datetime.strftime('%y'))
data[0x08] = int(datetime.strftime("%y"))
data[0x09] = datetime.isoweekday()
data[0x0A] = datetime.day
data[0x0B] = datetime.month
@ -37,7 +38,7 @@ class Datetime:
if datetime.isoweekday() != isoweekday:
raise ValueError("isoweekday does not match")
if int(datetime.strftime('%y')) != subyear:
if int(datetime.strftime("%y")) != subyear:
raise ValueError("subyear does not match")
return datetime

View File

@ -1,26 +1,70 @@
"""Support for universal remotes."""
import struct
import typing as t
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class rm(device):
"""Controls a Broadlink RM."""
def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26
TYPE = "RM2"
for pulse in pulses:
div, mod = divmod(int(pulse // tick), 256)
if div:
result.append(0)
result.append(div)
result.append(mod)
def _send(self, command: int, data: bytes = b'') -> bytes:
data_len = len(result) - 4
result[0x02] = data_len & 0xFF
result[0x03] = data_len >> 8
return result
def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
end = min(256 * data[0x03] + data[0x02] + 4, len(data))
while index < end:
chunk = data[index]
index += 1
if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError:
raise ValueError("Malformed data.")
index += 2
result.append(int(chunk * tick))
return result
class rmmini(Device):
"""Controls a Broadlink RM mini 3."""
TYPE = "RMMINI"
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<I", command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[0x4:]
def check_data(self) -> bytes:
"""Return the last captured code."""
return self._send(0x4)
def update(self) -> None:
"""Update device name and lock status."""
resp = self._send(0x1)
self.name = resp[0x48:].split(b"\x00")[0].decode()
self.is_locked = bool(resp[0x87])
def send_data(self, data: bytes) -> None:
"""Send a code to the device."""
@ -30,57 +74,100 @@ class rm(device):
"""Enter infrared learning mode."""
self._send(0x3)
def check_data(self) -> bytes:
"""Return the last captured code."""
return self._send(0x4)
class rmpro(rmmini):
"""Controls a Broadlink RM pro."""
TYPE = "RMPRO"
def sweep_frequency(self) -> None:
"""Sweep frequency."""
self._send(0x19)
def check_frequency(self) -> t.Tuple[bool, float]:
"""Return True if the frequency was identified successfully."""
resp = self._send(0x1A)
is_found = bool(resp[0])
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
return is_found, frequency
def find_rf_packet(self, frequency: float = None) -> None:
"""Enter radiofrequency learning mode."""
payload = bytearray()
if frequency:
payload += struct.pack("<I", int(frequency * 1000))
self._send(0x1B, payload)
def cancel_sweep_frequency(self) -> None:
"""Cancel sweep frequency."""
self._send(0x1E)
def check_frequency(self) -> bool:
"""Return True if the frequency was identified successfully."""
resp = self._send(0x1A)
return resp[0] == 1
def find_rf_packet(self) -> None:
"""Enter radiofrequency learning mode."""
self._send(0x1B)
def check_sensors(self) -> dict:
"""Return the state of the sensors."""
resp = self._send(0x1)
temp = struct.unpack("<bb", resp[:0x2])
return {"temperature": temp[0x0] + temp[0x1] / 10.0}
def check_temperature(self) -> float:
"""Return the temperature."""
return self.check_sensors()["temperature"]
def check_sensors(self) -> dict:
"""Return the state of the sensors."""
resp = self._send(0x1)
temperature = struct.unpack("<bb", resp[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
return {"temperature": temperature}
class rmminib(rmmini):
"""Controls a Broadlink RM mini 3 (new firmware)."""
class rm4(rm):
"""Controls a Broadlink RM4."""
TYPE = "RMMINIB"
TYPE = "RM4"
def _send(self, command: int, data: bytes = b'') -> bytes:
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<HI", len(data) + 4, command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6:p_len+2]
return payload[0x6 : p_len + 2]
class rm4mini(rmminib):
"""Controls a Broadlink RM4 mini."""
TYPE = "RM4MINI"
def check_sensors(self) -> dict:
"""Return the state of the sensors."""
resp = self._send(0x24)
temp = struct.unpack("<bb", resp[:0x2])
return {
"temperature": temp[0x0] + temp[0x1] / 100.0,
"humidity": resp[0x2] + resp[0x3] / 100.0,
}
def check_temperature(self) -> float:
"""Return the temperature."""
return self.check_sensors()["temperature"]
def check_humidity(self) -> float:
"""Return the humidity."""
return self.check_sensors()["humidity"]
def check_sensors(self) -> dict:
"""Return the state of the sensors."""
resp = self._send(0x24)
temperature = struct.unpack("<bb", resp[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 100.0
humidity = resp[0x2] + resp[0x3] / 100.0
return {"temperature": temperature, "humidity": humidity}
class rm4pro(rm4mini, rmpro):
"""Controls a Broadlink RM4 pro."""
TYPE = "RM4PRO"
class rm(rmpro):
"""For backwards compatibility."""
TYPE = "RM2"
class rm4(rm4pro):
"""For backwards compatibility."""
TYPE = "RM4"

View File

@ -1,11 +1,9 @@
"""Support for sensors."""
import struct
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class a1(device):
class a1(Device):
"""Controls a Broadlink A1."""
TYPE = "A1"
@ -29,19 +27,62 @@ class a1(device):
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
packet = bytearray([0x1])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
data = payload[0x4:]
temperature = struct.unpack("<bb", data[:0x2])
temperature = temperature[0x0] + temperature[0x1] / 10.0
humidity = data[0x2] + data[0x3] / 10.0
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
data = self.decrypt(resp[0x38:])
return {
"temperature": temperature,
"humidity": humidity,
"light": data[0x4],
"air_quality": data[0x6],
"noise": data[0x8],
"temperature": data[0x04] + data[0x05] / 10.0,
"humidity": data[0x06] + data[0x07] / 10.0,
"light": data[0x08],
"air_quality": data[0x0A],
"noise": data[0x0C],
}
class a2(Device):
"""Controls a Broadlink A2."""
TYPE = "A2"
def _send(self, operation: int, data: bytes = b""):
"""Send a command to the device."""
packet = bytearray(12)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
if data:
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(2)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet_len = len(packet) - 2
packet[0x00] = packet_len & 0xFF
packet[0x01] = packet_len >> 8
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def check_sensors_raw(self) -> dict:
"""Return the state of the sensors in raw format."""
data = self._send(1)
return {
"temperature": data[0x13] * 256 + data[0x14],
"humidity": data[0x15] * 256 + data[0x16],
"pm10": data[0x0D] * 256 + data[0x0E],
"pm2_5": data[0x0F] * 256 + data[0x10],
"pm1": data[0x11] * 256 + data[0x12],
}

View File

@ -2,226 +2,128 @@
import json
import struct
from .device import device
from .exceptions import check_error
from . import exceptions as e
from .device import Device
class mp1(device):
"""Controls a Broadlink MP1."""
TYPE = "MP1"
def set_power_mask(self, sid_mask: int, state: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0D
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask)
packet[0x07] = 0xC0
packet[0x08] = 0x02
packet[0x0A] = 0x03
packet[0x0D] = sid_mask
packet[0x0E] = sid_mask if state else 0
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def set_power(self, sid: int, state: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, state)
def check_power_raw(self) -> int:
"""Return the power state of the device in raw format."""
packet = bytearray(16)
packet[0x00] = 0x0A
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xAE
packet[0x07] = 0xC0
packet[0x08] = 0x01
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x0E]
def check_power(self) -> dict:
"""Return the power state of the device."""
state = self.check_power_raw()
if state is None:
return {"s1": None, "s2": None, "s3": None, "s4": None}
data = {}
data["s1"] = bool(state & 0x01)
data["s2"] = bool(state & 0x02)
data["s3"] = bool(state & 0x04)
data["s4"] = bool(state & 0x08)
return data
class bg1(device):
"""Controls a BG Electrical smart outlet."""
TYPE = "BG1"
def get_state(self) -> dict:
"""Return the power state of the device.
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
"""
packet = self._encode(1, b"{}")
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
data = {}
if pwr is not None:
data["pwr"] = int(bool(pwr))
if pwr1 is not None:
data["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
data["pwr2"] = int(bool(pwr2))
if maxworktime is not None:
data["maxworktime"] = maxworktime
if maxworktime1 is not None:
data["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
data["maxworktime2"] = maxworktime2
if idcbrightness is not None:
data["idcbrightness"] = idcbrightness
js = json.dumps(data).encode("utf8")
packet = self._encode(2, js)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, js: str) -> bytes:
"""Encode a message."""
# The packet format is:
# 0x00-0x01 length
# 0x02-0x05 header
# 0x06-0x07 00
# 0x08 flag (1 for read or 2 write?)
# 0x09 unknown (0xb)
# 0x0a-0x0d length of json
# 0x0e- json data
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(js)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(js)
)
for i in range(len(js)):
packet.append(js[i])
checksum = sum(packet[0x08:], 0xC0AD) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state
class sp1(device):
class sp1(Device):
"""Controls a Broadlink SP1."""
TYPE = "SP1"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(4)
packet[0] = state
packet[0] = bool(pwr)
response = self.send_packet(0x66, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
class sp2(device):
class sp2(Device):
"""Controls a Broadlink SP2."""
TYPE = "SP2"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0] = 2
if self.check_nightlight():
packet[4] = 3 if state else 2
else:
packet[4] = 1 if state else 0
packet[4] = bool(pwr)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def set_nightlight(self, state: bool) -> None:
"""Set the night light state of the device."""
packet = bytearray(16)
packet[0] = 2
if self.check_power():
packet[4] = 3 if state else 1
else:
packet[4] = 2 if state else 0
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
def check_power(self) -> bool:
"""Return the power state of the device."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
return bool(payload[0x4])
class sp2s(sp2):
"""Controls a Broadlink SP2S."""
TYPE = "SP2S"
def get_energy(self) -> float:
"""Return the power consumption in W."""
packet = bytearray(16)
packet[0] = 4
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return int.from_bytes(payload[0x4:0x7], "little") / 1000
class sp3(Device):
"""Controls a Broadlink SP3."""
TYPE = "SP3"
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0] = 2
packet[4] = self.check_nightlight() << 1 | bool(pwr)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
def set_nightlight(self, ntlight: bool) -> None:
"""Set the night light state of the device."""
packet = bytearray(16)
packet[0] = 2
packet[4] = bool(ntlight) << 1 | self.check_power()
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
def check_power(self) -> bool:
"""Return the power state of the device."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] & 1)
def check_nightlight(self) -> bool:
"""Return the state of the night light."""
packet = bytearray(16)
packet[0] = 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
return bool(payload[0x4] & 2)
class sp3s(sp2):
"""Controls a Broadlink SP3S."""
TYPE = "SP3S"
def get_energy(self) -> float:
"""Return the power consumption in W."""
packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45])
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
energy = payload[0x7:0x4:-1].hex()
return int(energy) / 100
class sp4(device):
class sp4(Device):
"""Controls a Broadlink SP4."""
TYPE = "SP4"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
self.set_state(pwr=state)
self.set_state(pwr=pwr)
def set_nightlight(self, state: bool) -> None:
def set_nightlight(self, ntlight: bool) -> None:
"""Set the night light state of the device."""
self.set_state(ntlight=state)
self.set_state(ntlight=ntlight)
def set_state(
self,
@ -233,33 +135,33 @@ class sp4(device):
childlock: bool = None,
) -> dict:
"""Set state of device."""
data = {}
state = {}
if pwr is not None:
data["pwr"] = int(bool(pwr))
state["pwr"] = int(bool(pwr))
if ntlight is not None:
data["ntlight"] = int(bool(ntlight))
state["ntlight"] = int(bool(ntlight))
if indicator is not None:
data["indicator"] = int(bool(indicator))
state["indicator"] = int(bool(indicator))
if ntlbrightness is not None:
data["ntlbrightness"] = ntlbrightness
state["ntlbrightness"] = ntlbrightness
if maxworktime is not None:
data["maxworktime"] = maxworktime
state["maxworktime"] = maxworktime
if childlock is not None:
data["childlock"] = int(bool(childlock))
state["childlock"] = int(bool(childlock))
packet = self._encode(2, data)
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
return self._decode(response)
def check_power(self) -> bool:
"""Return the power state of the device."""
state = self.get_state()
return state["pwr"]
return bool(state["pwr"])
def check_nightlight(self) -> bool:
"""Return the state of the night light."""
state = self.get_state()
return state["ntlight"]
return bool(state["ntlight"])
def get_state(self) -> dict:
"""Get full state of device."""
@ -269,20 +171,19 @@ class sp4(device):
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
payload = json.dumps(state, separators=(",", ":")).encode()
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(payload)
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
packet.extend(payload)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04] = checksum & 0xFF
packet[0x05] = checksum >> 8
packet[0x04:0x06] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
@ -308,9 +209,9 @@ class sp4b(sp4):
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
payload = json.dumps(state, separators=(",", ":")).encode()
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(payload)
data = json.dumps(state, separators=(",", ":")).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI",
packet,
@ -321,18 +222,241 @@ class sp4b(sp4):
0x0000,
flag,
0x0B,
len(payload),
len(data),
)
packet.extend(payload)
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
check_error(response[0x22:0x24])
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state
class bg1(Device):
"""Controls a BG Electrical smart outlet."""
TYPE = "BG1"
def get_state(self) -> dict:
"""Return the power state of the device.
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
"""
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
maxworktime: int = None,
maxworktime1: int = None,
maxworktime2: int = None,
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if maxworktime is not None:
state["maxworktime"] = maxworktime
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
packet = bytearray(14)
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x0A)[0]
state = json.loads(payload[0x0E : 0x0E + js_len])
return state
class ehc31(bg1):
"""Controls a BG Electrical smart extension lead."""
TYPE = "EHC31"
def set_state(
self,
pwr: bool = None,
pwr1: bool = None,
pwr2: bool = None,
pwr3: bool = None,
maxworktime1: int = None,
maxworktime2: int = None,
maxworktime3: int = None,
idcbrightness: int = None,
childlock: bool = None,
childlock1: bool = None,
childlock2: bool = None,
childlock3: bool = None,
childlock4: bool = None,
) -> dict:
"""Set the power state of the device."""
state = {}
if pwr is not None:
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
state["pwr2"] = int(bool(pwr2))
if pwr3 is not None:
state["pwr3"] = int(bool(pwr3))
if maxworktime1 is not None:
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
state["maxworktime2"] = maxworktime2
if maxworktime3 is not None:
state["maxworktime3"] = maxworktime3
if idcbrightness is not None:
state["idcbrightness"] = idcbrightness
if childlock is not None:
state["childlock"] = int(bool(childlock))
if childlock1 is not None:
state["childlock1"] = int(bool(childlock1))
if childlock2 is not None:
state["childlock2"] = int(bool(childlock2))
if childlock3 is not None:
state["childlock3"] = int(bool(childlock3))
if childlock4 is not None:
state["childlock4"] = int(bool(childlock4))
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
class mp1(Device):
"""Controls a Broadlink MP1."""
TYPE = "MP1"
def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0D
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
packet[0x07] = 0xC0
packet[0x08] = 0x02
packet[0x0A] = 0x03
packet[0x0D] = sid_mask
packet[0x0E] = sid_mask if pwr else 0
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
def set_power(self, sid: int, pwr: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, pwr)
def check_power_raw(self) -> int:
"""Return the power state of the device in raw format."""
packet = bytearray(16)
packet[0x00] = 0x0A
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xAE
packet[0x07] = 0xC0
packet[0x08] = 0x01
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x0E]
def check_power(self) -> dict:
"""Return the power state of the device."""
data = self.check_power_raw()
return {
"s1": bool(data & 1),
"s2": bool(data & 2),
"s3": bool(data & 4),
"s4": bool(data & 8),
}
class mp1s(mp1):
"""Controls a Broadlink MP1S."""
TYPE = "MP1S"
def get_state(self) -> dict:
"""Return the power state of the device.
voltage in V.
current in A.
power in W.
power consumption in kW·h.
"""
packet = bytearray(16)
packet[0x00] = 0x0E
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2
packet[0x07] = 0xC0
packet[0x08] = 0x01
packet[0x0A] = 0x04
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
payload_str = payload.hex()[4:-6]
def get_value(start, end, factors):
value = sum(
int(payload_str[i - 2 : i]) * factor
for i, factor in zip(range(start, end, -2), factors)
)
return value
return {
"volt": get_value(34, 30, [10, 0.1]),
"current": get_value(40, 34, [1, 0.01, 0.0001]),
"power": get_value(46, 40, [100, 1, 0.01]),
"totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]),
}

View File

@ -1,37 +1,29 @@
Command line interface for python-broadlink
===========================================
This is a command line interface for broadlink python library
Tested with BroadLink RMPRO / RM2
This is a command line interface for the python-broadlink API.
Requirements
------------
You should have the broadlink python installed, this can be made in many linux distributions using :
You need to install the module first:
```
sudo pip install broadlink
pip3 install broadlink
```
Installation
-----------
Just copy this files
Download "broadlink_cli" and "broadlink_discovery".
Programs
--------
* broadlink_discovery: Discover Broadlink devices connected to the local network.
* broadlink_cli: Send commands and query the Broadlink device.
* broadlink_discovery
used to run the discovery in the network
this program withh show the command line parameters to be used with
broadlink_cli to select broadlink device
* broadlink_cli
used to send commands and query the broadlink device
device specification formats
Device specification formats
----------------------------
Using separate parameters for each information:
@ -48,38 +40,99 @@ Using file with parameters:
```
broadlink_cli --device @BEDROOM.device --temp
```
This is prefered as the configuration is stored in file and you can change
just a file to point to a different hardware
This is prefered as the configuration is stored in a file and you can change
it later to point to a different device.
Sample usage
------------
Example usage
-------------
Learn commands :
### Common commands
#### Join device to the Wi-Fi network
```
broadlink_cli --joinwifi SSID PASSWORD
```
#### Discover devices connected to the local network
```
broadlink_discovery
```
### Universal remotes
#### Learn IR code and show at console
```
# Learn and save to file
broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power
# LEard and show at console
broadlink_cli --device @BEDROOM.device --learn
```
#### Learn RF code and show at console
```
broadlink_cli --device @BEDROOM.device --rfscanlearn
```
Send command :
#### Learn IR code and save to file
```
broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power
```
#### Learn RF code and save to file
```
broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power
```
#### Send code
```
broadlink_cli --device @BEDROOM.device --send DATA
```
#### Send code from file
```
broadlink_cli --device @BEDROOM.device --send @LG-TV.power
broadlink_cli --device @BEDROOM.device --send ....datafromlearncommand...
```
Get Temperature :
#### Check temperature
```
broadlink_cli --device @BEDROOM.device --temperature
```
Get Energy Consumption (For a SmartPlug) :
#### Check humidity
```
broadlink_cli --device @BEDROOM.device --humidity
```
### Smart plugs
#### Turn on
```
broadlink_cli --device @BEDROOM.device --turnon
```
#### Turn off
```
broadlink_cli --device @BEDROOM.device --turnoff
```
#### Turn on nightlight
```
broadlink_cli --device @BEDROOM.device --turnnlon
```
#### Turn off nightlight
```
broadlink_cli --device @BEDROOM.device --turnnloff
```
#### Check power state
```
broadlink_cli --device @BEDROOM.device --check
```
#### Check nightlight state
```
broadlink_cli --device @BEDROOM.device --checknl
```
#### Check power consumption
```
broadlink_cli --device @BEDROOM.device --energy
```
Once joined to the Broadlink provisioning Wi-Fi, configure it with your Wi-Fi details:
```
broadlink_cli --joinwifi MySSID MyWifiPassword
```

View File

@ -1,67 +1,32 @@
#!/usr/bin/env python3
import argparse
import base64
import codecs
import time
import typing as t
import broadlink
from broadlink.const import DEFAULT_PORT
from broadlink.exceptions import ReadError, StorageError
from broadlink.remote import data_to_pulses, pulses_to_data
TICK = 32.84
TIMEOUT = 30
IR_TOKEN = 0x26
def auto_int(x):
return int(x, 0)
def to_microseconds(bytes):
result = []
# print bytes[0] # 0x26 = 38for IR
index = 4
while index < len(bytes):
chunk = bytes[index]
index += 1
if chunk == 0:
chunk = bytes[index]
chunk = 256 * chunk + bytes[index + 1]
index += 2
result.append(int(round(chunk * TICK)))
if chunk == 0x0d05:
break
return result
def format_pulses(pulses: t.List[int]) -> str:
"""Format pulses."""
return " ".join(
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
for i, pulse in enumerate(pulses)
)
def durations_to_broadlink(durations):
result = bytearray()
result.append(IR_TOKEN)
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num % 256)
return result
def format_durations(data):
result = ''
for i in range(0, len(data)):
if len(result) > 0:
result += ' '
result += ('+' if i % 2 == 0 else '-') + str(data[i])
return result
def parse_durations(str):
result = []
for s in str.split():
result.append(abs(int(s)))
return result
def parse_pulses(data: t.List[str]) -> t.List[int]:
"""Parse pulses."""
return [abs(int(s)) for s in data]
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
@ -70,6 +35,7 @@ parser.add_argument("--type", type=auto_int, default=0x2712, help="type of devic
parser.add_argument("--host", help="host address")
parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library")
parser.add_argument("--temperature", action="store_true", help="request temperature from device")
parser.add_argument("--humidity", action="store_true", help="request humidity from device")
parser.add_argument("--energy", action="store_true", help="request energy consumption from device")
parser.add_argument("--check", action="store_true", help="check current power state")
parser.add_argument("--checknl", action="store_true", help="check current nightlight state")
@ -81,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on
parser.add_argument("--send", action="store_true", help="send command")
parser.add_argument("--sensors", action="store_true", help="check all sensors")
parser.add_argument("--learn", action="store_true", help="learn command")
parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning")
parser.add_argument("--rflearn", action="store_true", help="rf scan learning")
parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning")
parser.add_argument("--learnfile", help="save learned command to a specified file")
parser.add_argument("--durations", action="store_true",
help="use durations in micro seconds instead of the Broadlink format")
@ -92,16 +59,16 @@ args = parser.parse_args()
if args.device:
values = args.device.split()
type = int(values[0], 0)
devtype = int(values[0], 0)
host = values[1]
mac = bytearray.fromhex(values[2])
elif args.mac:
type = args.type
devtype = args.type
host = args.host
mac = bytearray.fromhex(args.mac)
if args.host or args.device:
dev = broadlink.gendevice(type, (host, 80), mac)
dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac)
dev.auth()
if args.joinwifi:
@ -109,25 +76,26 @@ if args.joinwifi:
if args.convert:
data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data)
print(format_durations(durations))
pulses = data_to_pulses(data)
print(format_pulses(pulses))
if args.temperature:
print(dev.check_temperature())
if args.humidity:
print(dev.check_humidity())
if args.energy:
print(dev.get_energy())
if args.sensors:
try:
data = dev.check_sensors()
except:
data = {}
data['temperature'] = dev.check_temperature()
data = dev.check_sensors()
for key in data:
print("{} {}".format(key, data[key]))
if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
if args.durations else bytearray.fromhex(''.join(args.data))
data = (
pulses_to_data(parse_pulses(args.data))
if args.durations
else bytes.fromhex(''.join(args.data))
)
dev.send_data(data)
if args.learn or (args.learnfile and not args.rfscanlearn):
if args.learn or (args.learnfile and not args.rflearn):
dev.enter_learning()
print("Learning...")
start = time.time()
@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn):
print("No data received...")
exit(1)
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)
if args.check:
if dev.check_power():
print('* ON *')
@ -195,28 +165,33 @@ if args.switch:
else:
dev.set_power(True)
print('* Switch to ON *')
if args.rfscanlearn:
dev.sweep_frequency()
print("Learning RF Frequency, press and hold the button to learn...")
start = time.time()
while time.time() - start < TIMEOUT:
time.sleep(1)
if dev.check_frequency():
break
if args.rflearn:
if args.frequency:
frequency = args.frequency
print("Press the button you want to learn, a short press...")
else:
print("RF Frequency not found")
dev.cancel_sweep_frequency()
exit(1)
dev.sweep_frequency()
print("Detecting radiofrequency, press and hold the button to learn...")
print("Found RF Frequency - 1 of 2!")
print("You can now let go of the button")
start = time.time()
while time.time() - start < TIMEOUT:
time.sleep(1)
locked, frequency = dev.check_frequency()
if locked:
break
else:
print("Radiofrequency not found")
dev.cancel_sweep_frequency()
exit(1)
input("Press enter to continue...")
print("Radiofrequency detected: {}MHz".format(frequency))
print("You can now let go of the button")
print("To complete learning, single press the button you want to learn")
input("Press enter to continue...")
dev.find_rf_packet()
print("Press the button again, now a short press.")
dev.find_rf_packet(frequency)
start = time.time()
while time.time() - start < TIMEOUT:
@ -231,15 +206,16 @@ if args.rfscanlearn:
print("No data received...")
exit(1)
print("Found RF Frequency - 2 of 2!")
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learnfile is None:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0]))))
if args.learnfile is not None:
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))
print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)

View File

@ -2,12 +2,13 @@
import argparse
import broadlink
from broadlink.const import DEFAULT_BCAST_ADDR, DEFAULT_TIMEOUT
from broadlink.exceptions import StorageError
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
parser.add_argument("--timeout", type=int, default=5, help="timeout to wait for receiving discovery responses")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="timeout to wait for receiving discovery responses")
parser.add_argument("--ip", default=None, help="ip address to use in the discovery")
parser.add_argument("--dst-ip", default="255.255.255.255", help="destination ip address to use in the discovery")
parser.add_argument("--dst-ip", default=DEFAULT_BCAST_ADDR, help="destination ip address to use in the discovery")
args = parser.parse_args()
print("Discovering...")

View File

@ -1 +1 @@
cryptography==2.6.1
cryptography==3.2

View File

@ -5,7 +5,7 @@
from setuptools import setup, find_packages
version = '0.16.0'
version = '0.18.3'
setup(
name="broadlink",
@ -15,8 +15,8 @@ setup(
url="http://github.com/mjg59/python-broadlink",
packages=find_packages(),
scripts=[],
install_requires=["cryptography>=2.1.1"],
description="Python API for controlling Broadlink IR controllers",
install_requires=["cryptography>=3.2"],
description="Python API for controlling Broadlink devices",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",