"""Support for universal remotes.""" import struct import typing as t from . import exceptions as e from .device import Device def pulses_to_data(pulses: t.List[int], tick: float = 32.84) -> bytes: """Convert a microsecond duration sequence into a Broadlink IR packet.""" result = bytearray(4) result[0x00] = 0x26 for pulse in pulses: div, mod = divmod(int(pulse // tick), 256) if div: result.append(0) result.append(div) result.append(mod) data_len = len(result) - 4 result[0x02] = data_len & 0xFF result[0x03] = data_len >> 8 return result def data_to_pulses(data: bytes, tick: float = 32.84) -> t.List[int]: """Parse a Broadlink packet into a microsecond duration sequence.""" result = [] index = 4 end = min(256 * data[0x03] + data[0x02] + 4, len(data)) while index < end: chunk = data[index] index += 1 if chunk == 0: try: chunk = 256 * data[index] + data[index + 1] except IndexError as err: raise ValueError("Malformed data.") from err index += 2 result.append(int(chunk * tick)) return result class rmmini(Device): """Controls a Broadlink RM mini 3.""" TYPE = "RMMINI" def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" None: """Update device name and lock status.""" resp = self._send(0x1) self.name = resp[0x48:].split(b"\x00")[0].decode() self.is_locked = bool(resp[0x87]) def send_data(self, data: bytes) -> None: """Send a code to the device.""" self._send(0x2, data) def enter_learning(self) -> None: """Enter infrared learning mode.""" self._send(0x3) def check_data(self) -> bytes: """Return the last captured code.""" return self._send(0x4) class rmpro(rmmini): """Controls a Broadlink RM pro.""" TYPE = "RMPRO" def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) def check_frequency(self) -> t.Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) is_found = bool(resp[0]) frequency = struct.unpack(" None: """Enter radiofrequency learning mode.""" payload = bytearray() if frequency: payload += struct.pack(" None: """Cancel sweep frequency.""" self._send(0x1E) def check_sensors(self) -> dict: """Return the state of the sensors.""" resp = self._send(0x1) temp = struct.unpack(" float: """Return the temperature.""" return self.check_sensors()["temperature"] class rmminib(rmmini): """Controls a Broadlink RM mini 3 (new firmware).""" TYPE = "RMMINIB" def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" dict: """Return the state of the sensors.""" resp = self._send(0x24) temp = struct.unpack(" float: """Return the temperature.""" return self.check_sensors()["temperature"] def check_humidity(self) -> float: """Return the humidity.""" return self.check_sensors()["humidity"] class rm4pro(rm4mini, rmpro): """Controls a Broadlink RM4 pro.""" TYPE = "RM4PRO" class rm(rmpro): """For backwards compatibility.""" TYPE = "RM2" class rm4(rm4pro): """For backwards compatibility.""" TYPE = "RM4"