From d7cbe304e05385fbae3fb2784227268b5e54b063 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel Date: Sat, 9 Jan 2021 19:21:54 -0300 Subject: [PATCH] Improve RM4 communication (#490) --- broadlink/remote.py | 108 +++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 67 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index b6528d6..88498b6 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,4 +1,6 @@ """Support for universal remotes.""" +import struct + from .device import device from .exceptions import check_error @@ -10,87 +12,53 @@ class rm(device): """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM2" - self._request_header = bytes() - self._code_sending_header = bytes() + + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" bytes: """Return the last captured code.""" - packet = bytearray(self._request_header) - packet.append(0x04) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[len(self._request_header) + 4 :] + return self._send(0x4) def send_data(self, data: bytes) -> None: """Send a code to the device.""" - packet = bytearray(self._code_sending_header) - packet += bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x2, data) def enter_learning(self) -> None: """Enter infrared learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x03) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x3) def sweep_frequency(self) -> None: """Sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x19) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x19) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" - packet = bytearray(self._request_header) - packet.append(0x1E) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + self._send(0x1E) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" - packet = bytearray(self._request_header) - packet.append(0x1A) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False + resp = self._send(0x1A) + return resp[0] == 1 def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" - packet = bytearray(self._request_header) - packet.append(0x1B) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - if payload[len(self._request_header) + 4] == 1: - return True - return False + resp = self._send(0x1B) + return resp[0] == 1 - def _check_sensors(self, command: int) -> bytes: - """Return the state of the sensors in raw format.""" - packet = bytearray(self._request_header) - packet.append(command) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return bytearray(payload[len(self._request_header) + 4 :]) - - def check_temperature(self) -> int: + def check_temperature(self) -> float: """Return the temperature.""" - data = self._check_sensors(0x1) - return data[0x0] + data[0x1] / 10.0 + return self.check_sensors()["temperature"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x1) - return {"temperature": data[0x0] + data[0x1] / 10.0} + resp = self._send(0x1) + return {"temperature": resp[0x0] + resp[0x1] / 10.0} class rm4(rm): @@ -100,23 +68,29 @@ class rm4(rm): """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" - self._request_header = b"\x04\x00" - self._code_sending_header = b"\xda\x00" - def check_temperature(self) -> int: - """Return the temperature.""" - data = self._check_sensors(0x24) - return data[0x0] + data[0x1] / 100.0 + def _send(self, command: int, data: bytes = b'') -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" int: + def find_rf_packet(self) -> bool: + """Enter radiofrequency learning mode.""" + self._send(0x1B) + return True + + def check_humidity(self) -> float: """Return the humidity.""" - data = self._check_sensors(0x24) - return data[0x2] + data[0x3] / 100.0 + return self.check_sensors()["humidity"] def check_sensors(self) -> dict: """Return the state of the sensors.""" - data = self._check_sensors(0x24) + resp = self._send(0x24) return { - "temperature": data[0x0] + data[0x1] / 100.0, - "humidity": data[0x2] + data[0x3] / 100.0, + "temperature": resp[0x0] + resp[0x1] / 100.0, + "humidity": resp[0x2] + resp[0x3] / 100.0, }