1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-22 07:00:12 +01:00

Improve RM4 communication (#490)

This commit is contained in:
Felipe Martins Diel 2021-01-09 19:21:54 -03:00
parent 1b3fd16321
commit d7cbe304e0

View File

@ -1,4 +1,6 @@
"""Support for universal remotes.""" """Support for universal remotes."""
import struct
from .device import device from .device import device
from .exceptions import check_error from .exceptions import check_error
@ -10,87 +12,53 @@ class rm(device):
"""Initialize the controller.""" """Initialize the controller."""
device.__init__(self, *args, **kwargs) device.__init__(self, *args, **kwargs)
self.type = "RM2" self.type = "RM2"
self._request_header = bytes()
self._code_sending_header = bytes() def _send(self, command: int, data: bytes = b'') -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<I", command) + data
resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[0x4:]
def check_data(self) -> bytes: def check_data(self) -> bytes:
"""Return the last captured code.""" """Return the last captured code."""
packet = bytearray(self._request_header) return self._send(0x4)
packet.append(0x04)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[len(self._request_header) + 4 :]
def send_data(self, data: bytes) -> None: def send_data(self, data: bytes) -> None:
"""Send a code to the device.""" """Send a code to the device."""
packet = bytearray(self._code_sending_header) self._send(0x2, data)
packet += bytearray([0x02, 0x00, 0x00, 0x00])
packet += data
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def enter_learning(self) -> None: def enter_learning(self) -> None:
"""Enter infrared learning mode.""" """Enter infrared learning mode."""
packet = bytearray(self._request_header) self._send(0x3)
packet.append(0x03)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def sweep_frequency(self) -> None: def sweep_frequency(self) -> None:
"""Sweep frequency.""" """Sweep frequency."""
packet = bytearray(self._request_header) self._send(0x19)
packet.append(0x19)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def cancel_sweep_frequency(self) -> None: def cancel_sweep_frequency(self) -> None:
"""Cancel sweep frequency.""" """Cancel sweep frequency."""
packet = bytearray(self._request_header) self._send(0x1E)
packet.append(0x1E)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
def check_frequency(self) -> bool: def check_frequency(self) -> bool:
"""Return True if the frequency was identified successfully.""" """Return True if the frequency was identified successfully."""
packet = bytearray(self._request_header) resp = self._send(0x1A)
packet.append(0x1A) return resp[0] == 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if payload[len(self._request_header) + 4] == 1:
return True
return False
def find_rf_packet(self) -> bool: def find_rf_packet(self) -> bool:
"""Enter radiofrequency learning mode.""" """Enter radiofrequency learning mode."""
packet = bytearray(self._request_header) resp = self._send(0x1B)
packet.append(0x1B) return resp[0] == 1
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if payload[len(self._request_header) + 4] == 1:
return True
return False
def _check_sensors(self, command: int) -> bytes: def check_temperature(self) -> float:
"""Return the state of the sensors in raw format."""
packet = bytearray(self._request_header)
packet.append(command)
response = self.send_packet(0x6A, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bytearray(payload[len(self._request_header) + 4 :])
def check_temperature(self) -> int:
"""Return the temperature.""" """Return the temperature."""
data = self._check_sensors(0x1) return self.check_sensors()["temperature"]
return data[0x0] + data[0x1] / 10.0
def check_sensors(self) -> dict: def check_sensors(self) -> dict:
"""Return the state of the sensors.""" """Return the state of the sensors."""
data = self._check_sensors(0x1) resp = self._send(0x1)
return {"temperature": data[0x0] + data[0x1] / 10.0} return {"temperature": resp[0x0] + resp[0x1] / 10.0}
class rm4(rm): class rm4(rm):
@ -100,23 +68,29 @@ class rm4(rm):
"""Initialize the controller.""" """Initialize the controller."""
device.__init__(self, *args, **kwargs) device.__init__(self, *args, **kwargs)
self.type = "RM4" self.type = "RM4"
self._request_header = b"\x04\x00"
self._code_sending_header = b"\xda\x00"
def check_temperature(self) -> int: def _send(self, command: int, data: bytes = b'') -> bytes:
"""Return the temperature.""" """Send a packet to the device."""
data = self._check_sensors(0x24) packet = struct.pack("<HI", len(data) + 4, command) + data
return data[0x0] + data[0x1] / 100.0 resp = self.send_packet(0x6A, packet)
check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6:p_len+2]
def check_humidity(self) -> int: def find_rf_packet(self) -> bool:
"""Enter radiofrequency learning mode."""
self._send(0x1B)
return True
def check_humidity(self) -> float:
"""Return the humidity.""" """Return the humidity."""
data = self._check_sensors(0x24) return self.check_sensors()["humidity"]
return data[0x2] + data[0x3] / 100.0
def check_sensors(self) -> dict: def check_sensors(self) -> dict:
"""Return the state of the sensors.""" """Return the state of the sensors."""
data = self._check_sensors(0x24) resp = self._send(0x24)
return { return {
"temperature": data[0x0] + data[0x1] / 100.0, "temperature": resp[0x0] + resp[0x1] / 100.0,
"humidity": data[0x2] + data[0x3] / 100.0, "humidity": resp[0x2] + resp[0x3] / 100.0,
} }