from .device import device from .exceptions import check_error class rm(device): """Controls a Broadlink RM.""" def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM2" self._request_header = bytes() self._code_sending_header = bytes() def check_data(self) -> bytes: """Return the last captured code.""" packet = bytearray(self._request_header) packet.append(0x04) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[len(self._request_header) + 4:] def send_data(self, data: bytes) -> None: """Send a code to the device.""" packet = bytearray(self._code_sending_header) packet += bytearray([0x02, 0x00, 0x00, 0x00]) packet += data response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) def enter_learning(self) -> None: """Enter infrared learning mode.""" packet = bytearray(self._request_header) packet.append(0x03) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) def sweep_frequency(self) -> None: """Sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x19) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x1e) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" packet = bytearray(self._request_header) packet.append(0x1a) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: return True return False def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" packet = bytearray(self._request_header) packet.append(0x1b) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: return True return False def _check_sensors(self, command: int) -> bytes: """Return the state of the sensors in raw format.""" packet = bytearray(self._request_header) packet.append(command) response = self.send_packet(0x6a, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bytearray(payload[len(self._request_header) + 4:]) def check_temperature(self) -> int: """Return the temperature.""" data = self._check_sensors(0x1) return data[0x0] + data[0x1] / 10.0 def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x1) return {'temperature': data[0x0] + data[0x1] / 10.0} class rm4(rm): """Controls a Broadlink RM4.""" def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" self._request_header = b'\x04\x00' self._code_sending_header = b'\xda\x00' def check_temperature(self) -> int: """Return the temperature.""" data = self._check_sensors(0x24) return data[0x0] + data[0x1] / 100.0 def check_humidity(self) -> int: """Return the humidity.""" data = self._check_sensors(0x24) return data[0x2] + data[0x3] / 100.0 def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x24) return { 'temperature': data[0x0] + data[0x1] / 100.0, 'humidity': data[0x2] + data[0x3] / 100.0 } # For legacy compatibility - don't use this class rm2(rm): def __init__(self): device.__init__(self, None, None, None) def discover(self): dev = discover() self.host = dev.host self.mac = dev.mac