diff --git a/README.md b/README.md index 81c6de5..68c9a98 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A Python module and CLI for controlling Broadlink devices locally. The following - **Curtain motors**: Dooya DT360E-45/20 - **Thermostats**: Hysen HY02B05H - **Hubs**: S3 +- **Air purifiers**: LIFAair LM05 ## Installation @@ -216,6 +217,19 @@ devices[0].set_state(bulb_colormode=1) data = device.check_sensors() ``` +## Air purifiers + +### Fetching purifier state +```python3 +data = device.get_state() +``` + +### Controlling purifier fan +```python3 +device.set_fan_mode(FanMode.TURBO) +device.set_fan_speed(50) +``` + ## Hubs ### Discovering subdevices diff --git a/broadlink/__init__.py b/broadlink/__init__.py index d313550..09c9ed8 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -14,6 +14,7 @@ from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .sensor import a1, a2 from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .purifier import lifaair SUPPORTED_TYPES = { sp1: { @@ -207,6 +208,9 @@ SUPPORTED_TYPES = { ehc31: { 0x6480: ("EHC31", "BG Electrical"), }, + lifaair: { + 0x4ec2: ("LM05", "LIFAair"), + } } diff --git a/broadlink/purifier.py b/broadlink/purifier.py new file mode 100644 index 0000000..bf27093 --- /dev/null +++ b/broadlink/purifier.py @@ -0,0 +1,156 @@ +"""Support for air purifiers.""" + +import enum + +from . import exceptions as e +from .device import Device + +@enum.unique +class FanMode(enum.IntEnum): + """Represents mode of the fan.""" + + OFF = 0 + AUTO = 1 + NIGHT = 2 + TURBO = 3 + ANTI_ALLERGY = 4 + MANUAL = 5 + + UNKNOWN = -1 + + +class lifaair(Device): + """Controls a broadcom-based LIFAair air purifier.""" + + TYPE = "LIFAAIR" + + FAN_STATE_TO_MODE = { + 0x81: FanMode.OFF, + 0xA5: FanMode.AUTO, + 0x95: FanMode.NIGHT, + 0x8D: FanMode.TURBO, + 0x85: FanMode.MANUAL, + 0x01: None, # fan is offline + } + + @enum.unique + class _Operation(enum.IntEnum): + SET_STATE = 1 + GET_STATE = 2 + + @enum.unique + class _Action(enum.IntEnum): + SET_FAN_SPEED = 1 + SET_FAN_MODE = 2 + + FAN_MODE_TO_ACTION_ARG = { + FanMode.OFF: 1, + FanMode.AUTO: 2, + FanMode.NIGHT: 6, + FanMode.TURBO: 7, + FanMode.ANTI_ALLERGY: 11, + } + + def set_fan_mode(self, fan_mode: FanMode) -> dict: + """Set mode of the fan. Returns updated state.""" + if fan_mode == FanMode.MANUAL: + return self.set_fan_speed(50) + + action_arg = self.FAN_MODE_TO_ACTION_ARG.get(fan_mode) + if action_arg is not None: + data = self._send( + self._Operation.SET_STATE, self._Action.SET_FAN_MODE, action_arg + ) + return self._decode_state(data) + + return self.get_state() + + def set_fan_speed(self, fan_speed: int) -> dict: + """Set fan speed (0-121). Returns updated state. Note that fan mode will be changed to MANUAL by the device.""" + data = self._send( + self._Operation.SET_STATE, self._Action.SET_FAN_SPEED, fan_speed + ) + return self._decode_state(data) + + def get_state(self) -> dict: + """ + Return the current state of the purifier as python dict. + + Note that the smart remote we're communicating with contains co2, tvoc and PM2.5 sensors, + while temperature, humidity and fan-state are fetched remotely from main unit which can be + offline (unplugged from mains, out of range) in which case those keys will be None. + + Format: + { + "temperature": 24.5, # float, deg C, can be None if main-unit offline + "humidity": 41, # int, %, can be None if main-unit offline + "co2": 425 # int, ppm + "tvoc": 150 # int, ug/m3 + "pm10": 9 # int, ug/m3 (unsure if this is PM10) + "pm2_5": 7 # int, ug/m3 (confirmed PM2.5) + "pm1": 5 # int, ug/m3 (unsure if this is PM1.0) + "fan_mode": FanMode.AUTO # FanMode enum, can be None if main-unit offline + "fan_speed": 50 # int, 0-121 + } + """ + data = self._send(self._Operation.GET_STATE) + return self._decode_state(data) + + def _decode_state(self, data: bytes) -> dict: + raw = self._decode_state_raw(data) + fan_mode = self._decode_fan_mode(raw["fan_state"], raw["fan_flags"]) + isOffline = fan_mode is None + return { + "temperature": None if isOffline else raw["temperature"] / 10.0, + "humidity": None if isOffline else raw["humidity"], + "co2": raw["co2"], + "tvoc": raw["tvoc"] * 10, + "pm10": raw["pm10"], + "pm2_5": raw["pm2_5"], + "pm1": raw["pm1"], + "fan_mode": fan_mode, + "fan_speed": raw["fan_speed"], + } + + def _decode_state_raw(self, data: bytes) -> dict: + return { + "temperature": data[27] + 256 * data[28], + "humidity": data[29], + "co2": data[31] + 256 * data[32], + "tvoc": data[35] + 256 * data[36], + "pm10": data[37] + 256 * data[38], + "pm2_5": data[39] + 256 * data[40], + "pm1": data[41] + 256 * data[42], + "fan_state": data[55], + "fan_speed": data[56], + "fan_flags": data[57], + } + + def _decode_fan_mode(self, fan_state: int, fan_flags: int) -> FanMode: + if fan_flags & 0x40 == 0: + return FanMode.ANTI_ALLERGY + return self.FAN_STATE_TO_MODE.get(fan_state, FanMode.UNKNOWN) + + def _send(self, operation: int, action: int = 0, action_arg: int = 0) -> bytes: + """Send a command to the device.""" + packet = bytearray(26) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation & 0xFF + packet[0x0A] = 0x0C + packet[0x0E] = action & 0xFF + packet[0x0F] = action_arg & 0xFF + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + return self.decrypt(resp[0x38:]) diff --git a/cli/test_lifaair b/cli/test_lifaair new file mode 100644 index 0000000..f2d361b --- /dev/null +++ b/cli/test_lifaair @@ -0,0 +1,44 @@ +import time +import broadlink +from broadlink.purifier import lifaair, FanMode + +def print_state(state: dict): + temperature = state["temperature"] + humidity = state["humidity"] + print("-> Temperature=%s Humidity=%s CO2=%4dppm TVOC=%4dug/m3 PM2.5=%2dug/m3 FanSpeed=%d (%s)" % ( + "%2.1fC" % temperature if temperature is not None else "-----", + "%2d%%" %humidity if humidity is not None else "---", + state["co2"], + state["tvoc"], + state["pm2_5"], + state["fan_speed"], + state["fan_mode"])) + +print("Searching for lifaair devices... ") +dev: lifaair = next(dev for dev in broadlink.discover() if isinstance(dev, lifaair)) +print("Found %s" % dev) + +print("Authenticating... ", end="") +dev.auth() +print("OK (id=%d, key=%s)" % (dev.id, dev.aes.algorithm.key.hex())) + +print("Getting firmware version... ", end="") +print(dev.get_fwversion()) + +print("Getting state...") +print_state(dev.get_state()) + +for mode in FanMode: + print("Setting fan mode to %s" % mode) + print_state(dev.set_fan_mode(mode)) + time.sleep(5) + +for speed in (0, 60, 121): + print("Setting fan speed to %d" % speed) + print_state(dev.set_fan_speed(speed)) + time.sleep(5) + +print("Monitoring state...") +while True: + print_state(dev.get_state()) + time.sleep(1)