diff --git a/README.md b/README.md index 675dfe0..81c6de5 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3) Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') +``` + ### Discovery Use this function to discover devices: @@ -61,17 +68,19 @@ devices = broadlink.discover() #### Advanced options You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. +Using the IP address of your local machine: ```python3 -devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine. +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` +Using the broadcast address of your subnet: ```python3 -devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet. +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: ```python3 -device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device. +device = broadlink.hello('192.168.0.16') ``` If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: @@ -106,23 +115,33 @@ packet = device.check_data() ### Learning RF codes -Learning IR codes takes place in five steps. +Learning RF codes takes place in six steps. 1. Sweep the frequency: ```python3 device.sweep_frequency() ``` 2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. -3. Enter learning mode: +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') +``` +4. Enter learning mode: ```python3 device.find_rf_packet() ``` -4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. -5. Get the RF packet: +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: ```python3 packet = device.check_data() ``` +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. + ### Canceling learning You can exit the learning mode in the middle of the process by calling this method: diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 1cf7deb..d313550 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -import typing as t +from typing import Generator, List, Optional, Tuple, Union from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen -from .cover import dooya +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser from .device import Device, ping, scan from .hub import s3 from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro -from .sensor import a1 -from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { sp1: { @@ -148,14 +148,19 @@ SUPPORTED_TYPES = { 0x653C: ("RM4 pro", "Broadlink"), }, a1: { - 0x2714: ("e-Sensor", "Broadlink"), + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), - 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), 0x4F65: ("MP1-1K3S2U", "Broadlink"), }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, lb1: { 0x5043: ("SB800TD", "Broadlink (OEM)"), 0x504E: ("LB1", "Broadlink"), @@ -177,26 +182,38 @@ SUPPORTED_TYPES = { S1C: { 0x2722: ("S2KIT", "Broadlink"), }, - s3: { + s3: { 0xA59C: ("S3", "Broadlink"), 0xA64D: ("S3", "Broadlink"), }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), }, dooya: { 0x4E4D: ("DT360E-45/20", "Dooya"), }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } def gendevice( dev_type: int, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], name: str = "", is_locked: bool = False, ) -> Device: @@ -222,7 +239,7 @@ def gendevice( def hello( - host: str, + ip_address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT, ) -> Device: @@ -232,7 +249,11 @@ def hello( """ try: return next( - xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) ) except StopIteration as err: raise e.NetworkTimeoutError( @@ -244,33 +265,42 @@ def hello( def discover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.List[Device]: +) -> List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] def xdiscover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[Device, None, None]: +) -> Generator[Device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid: str, password: str, security_mode: int) -> None: +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) @@ -299,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) sock.close() diff --git a/broadlink/climate.py b/broadlink/climate.py old mode 100644 new mode 100755 index eee5f11..1a0c600 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,7 @@ -"""Support for HVAC units.""" -import typing as t +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence from . import exceptions as e from .device import Device @@ -19,7 +21,7 @@ class hysen(Device): TYPE = "HYS" - def send_request(self, request: t.Sequence[int]) -> bytes: + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" packet = bytearray() packet.extend((len(request) + 2).to_bytes(2, "little")) @@ -31,27 +33,34 @@ class hysen(Device): payload = self.decrypt(response[0x38:]) p_len = int.from_bytes(payload[:0x02], "little") - if p_len + 2 > len(payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", ) - nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") - real_crc = CRC16.calculate(payload[0x02:p_len]) - if nom_crc != real_crc: - raise ValueError("hysen_response_error", "CRC check on response failed") - return payload[0x02:p_len] + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[0x05] / 2.0 + return self._decode_temp(payload, 5) def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[18] / 2.0 + return self._decode_temp(payload, 18) def get_full_status(self) -> dict: """Return the state of the device. @@ -64,9 +73,10 @@ class hysen(Device): data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 - data["room_temp"] = payload[5] / 2.0 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 - data["auto_mode"] = payload[7] & 0xF + data["auto_mode"] = payload[7] & 0x0F data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] @@ -79,7 +89,7 @@ class hysen(Device): data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = payload[18] / 2.0 + data["external_temp"] = self._decode_temp(payload, 18) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -117,7 +127,9 @@ class hysen(Device): # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) @@ -185,16 +197,32 @@ class hysen(Device): # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: """Set the power state of the device.""" - self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -203,7 +231,7 @@ class hysen(Device): # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] @@ -226,3 +254,221 @@ class hysen(Device): request.append(int(weekend[i]["temp"] * 2)) self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info diff --git a/broadlink/cover.py b/broadlink/cover.py index c0f08ab..7531794 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,5 +1,6 @@ """Support for covers.""" import time +from typing import Sequence from . import exceptions as e from .device import Device @@ -8,33 +9,34 @@ from .device import Device class dooya(Device): """Controls a Dooya curtain motor.""" - TYPE = "Dooya DT360E" + TYPE = "DT360E" - def _send(self, magic1: int, magic2: int) -> int: + def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xBB - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xFA - packet[10] = 0x44 - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) return payload[4] def open(self) -> int: """Open the curtain.""" - return self._send(0x01, 0x00) + return self._send(0x01) def close(self) -> int: """Close the curtain.""" - return self._send(0x02, 0x00) + return self._send(0x02) def stop(self) -> int: """Stop the curtain.""" - return self._send(0x03, 0x00) + return self._send(0x03) def get_percentage(self) -> int: """Return the position of the curtain.""" @@ -55,3 +57,126 @@ class dooya(Device): time.sleep(0.2) current = self.get_percentage() self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py index 74e916f..5a10bc0 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,7 +3,7 @@ import socket import threading import random import time -import typing as t +from typing import Generator, Optional, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -17,15 +17,15 @@ from .const import ( ) from .protocol import Datetime -HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] def scan( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[HelloResponse, None, None]: +) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -76,7 +76,7 @@ def scan( conn.close() -def ping(address: str, port: int = DEFAULT_PORT) -> None: +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: """Send a ping packet to an address. This packet feeds the watchdog timer of firmwares >= v53. @@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None: conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) packet = bytearray(0x30) packet[0x26] = 1 - conn.sendto(packet, (address, port)) + conn.sendto(packet, (ip_address, port)) class Device: @@ -100,8 +100,8 @@ class Device: def __init__( self, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], devtype: int, timeout: int = DEFAULT_TIMEOUT, name: str = "", diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 6ee5499..e7b3d4c 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,5 +1,5 @@ """Helper functions and classes.""" -import typing as t +from typing import Dict, List, Sequence class CRC16: @@ -8,10 +8,10 @@ class CRC16: CRC tables are cached for performance. """ - _cache: t.Dict[int, t.List[int]] = {} + _cache: Dict[int, List[int]] = {} @classmethod - def get_table(cls, polynomial: int) -> t.List[int]: + def get_table(cls, polynomial: int) -> List[int]: """Return the CRC-16 table for a polynomial.""" try: crc_table = cls._cache[polynomial] @@ -31,7 +31,7 @@ class CRC16: @classmethod def calculate( cls, - sequence: t.Sequence[int], + sequence: Sequence[int], polynomial: int = 0xA001, # CRC-16-ANSI. init_value: int = 0xFFFF, ) -> int: diff --git a/broadlink/hub.py b/broadlink/hub.py index 07b02e8..0fd4ae5 100644 --- a/broadlink/hub.py +++ b/broadlink/hub.py @@ -1,6 +1,7 @@ """Support for hubs.""" import struct import json +from typing import Optional from . import exceptions as e from .device import Device @@ -12,25 +13,37 @@ class s3(Device): TYPE = "S3" MAX_SUBDEVICES = 8 - def get_subdevices(self) -> list: - """Return the lit of sub devices.""" + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES sub_devices = [] - step = 5 + seen = set() + index = 0 - for index in range(0, self.MAX_SUBDEVICES, step): + while index < total: state = {"count": step, "index": index} packet = self._encode(14, state) resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) resp = self._decode(resp) - sub_devices.extend(resp["list"]) - if len(sub_devices) == resp["total"]: + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: break + index += step + return sub_devices - def get_state(self, did: str = None) -> dict: + def get_state(self, did: Optional[str] = None) -> dict: """Return the power state of the device.""" state = {} if did is not None: @@ -43,10 +56,10 @@ class s3(Device): def set_state( self, - did: str = None, - pwr1: bool = None, - pwr2: bool = None, - pwr3: bool = None, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -69,7 +82,9 @@ class s3(Device): # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Set the power state of the device.""" state = {} @@ -101,7 +102,7 @@ class lb1(Device): """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: """Set the power state of the device.""" state = {} @@ -183,7 +184,9 @@ class lb2(Device): # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + class rmmini(Device): """Controls a Broadlink RM mini 3.""" @@ -46,14 +88,19 @@ class rmpro(rmmini): """Sweep frequency.""" self._send(0x19) - def check_frequency(self) -> bool: + def check_frequency(self) -> Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) - return resp[0] == 1 + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: Optional[float] = None) -> None: """Enter radiofrequency learning mode.""" - self._send(0x1B) + payload = bytearray() + if frequency: + payload += struct.pack(" None: """Cancel sweep frequency.""" @@ -82,7 +129,7 @@ class rmminib(rmmini): e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) p_len = struct.unpack(" dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] - - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) + + return { + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], } diff --git a/broadlink/switch.py b/broadlink/switch.py index 1079cde..8393f6b 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,6 +1,7 @@ """Support for switches.""" import json import struct +from typing import Optional from . import exceptions as e from .device import Device @@ -127,12 +128,12 @@ class sp4(Device): def set_state( self, - pwr: bool = None, - ntlight: bool = None, - indicator: bool = None, - ntlbrightness: int = None, - maxworktime: int = None, - childlock: bool = None, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, ) -> dict: """Set state of device.""" state = {} @@ -186,7 +187,7 @@ class sp4(Device): e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: """Set the power state of the device.""" state = {} @@ -291,7 +292,16 @@ class bg1(Device): data = json.dumps(state).encode() length = 12 + len(data) struct.pack_into( - " dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + class mp1(Device): """Controls a Broadlink MP1.""" @@ -360,3 +426,47 @@ class mp1(Device): "s3": bool(data & 4), "s4": bool(data & 8), } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md index 7b40b4c..b7e48dc 100644 --- a/cli/README.md +++ b/cli/README.md @@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature #### Check humidity ``` -broadlink_cli --device @BEDROOM.device --temperature +broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs diff --git a/cli/broadlink_cli b/cli/broadlink_cli index f7a24ad..7913e33 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,68 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +from typing import List import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result - - -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result - - -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -83,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") @@ -111,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -124,10 +89,13 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) -if args.learn or (args.learnfile and not args.rfscanlearn): +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() print("Learning...") start = time.time() @@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -195,28 +165,33 @@ if args.switch: else: dev.set_power(True) print('* Switch to ON *') -if args.rfscanlearn: - dev.sweep_frequency() - print("Learning RF Frequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - if dev.check_frequency(): - break +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") else: - print("RF Frequency not found") - dev.cancel_sweep_frequency() - exit(1) + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") - print("Found RF Frequency - 1 of 2!") - print("You can now let go of the button") + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) - input("Press enter to continue...") + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") - print("To complete learning, single press the button you want to learn") + input("Press enter to continue...") - dev.find_rf_packet() + print("Press the button again, now a short press.") + + dev.find_rf_packet(frequency) start = time.time() while time.time() - start < TIMEOUT: @@ -231,15 +206,16 @@ if args.rfscanlearn: print("No data received...") exit(1) - print("Found RF Frequency - 2 of 2!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt)