mirror of
https://github.com/mjg59/python-broadlink.git
synced 2024-11-21 22:51:41 +01:00
247be74c33
* Move IR duration<->Broadlink conversion down from CLI * Fix --learn base64 to not crash with --durations Also remove its b'...' wrapping. * Fix IR/RF conversions --------- Co-authored-by: William Grant <me@williamgrant.id.au>
174 lines
4.6 KiB
Python
174 lines
4.6 KiB
Python
"""Support for universal remotes."""
|
|
import struct
|
|
import typing as t
|
|
|
|
from . import exceptions as e
|
|
from .device import Device
|
|
|
|
|
|
def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
|
|
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
|
|
result = bytearray(4)
|
|
result[0x00] = 0x26
|
|
|
|
for pulse in pulses:
|
|
div, mod = divmod(int(pulse // tick), 256)
|
|
if div:
|
|
result.append(0)
|
|
result.append(div)
|
|
result.append(mod)
|
|
|
|
data_len = len(result) - 4
|
|
result[0x02] = data_len & 0xFF
|
|
result[0x03] = data_len >> 8
|
|
|
|
return result
|
|
|
|
|
|
def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
|
|
"""Parse a Broadlink packet into a microsecond duration sequence."""
|
|
result = []
|
|
index = 4
|
|
end = min(256 * data[0x03] + data[0x02] + 4, len(data))
|
|
|
|
while index < end:
|
|
chunk = data[index]
|
|
index += 1
|
|
|
|
if chunk == 0:
|
|
try:
|
|
chunk = 256 * data[index] + data[index + 1]
|
|
except IndexError:
|
|
raise ValueError("Malformed data.")
|
|
index += 2
|
|
|
|
result.append(int(chunk * tick))
|
|
|
|
return result
|
|
|
|
|
|
class rmmini(Device):
|
|
"""Controls a Broadlink RM mini 3."""
|
|
|
|
TYPE = "RMMINI"
|
|
|
|
def _send(self, command: int, data: bytes = b"") -> bytes:
|
|
"""Send a packet to the device."""
|
|
packet = struct.pack("<I", command) + data
|
|
resp = self.send_packet(0x6A, packet)
|
|
e.check_error(resp[0x22:0x24])
|
|
payload = self.decrypt(resp[0x38:])
|
|
return payload[0x4:]
|
|
|
|
def update(self) -> None:
|
|
"""Update device name and lock status."""
|
|
resp = self._send(0x1)
|
|
self.name = resp[0x48:].split(b"\x00")[0].decode()
|
|
self.is_locked = bool(resp[0x87])
|
|
|
|
def send_data(self, data: bytes) -> None:
|
|
"""Send a code to the device."""
|
|
self._send(0x2, data)
|
|
|
|
def enter_learning(self) -> None:
|
|
"""Enter infrared learning mode."""
|
|
self._send(0x3)
|
|
|
|
def check_data(self) -> bytes:
|
|
"""Return the last captured code."""
|
|
return self._send(0x4)
|
|
|
|
|
|
class rmpro(rmmini):
|
|
"""Controls a Broadlink RM pro."""
|
|
|
|
TYPE = "RMPRO"
|
|
|
|
def sweep_frequency(self) -> None:
|
|
"""Sweep frequency."""
|
|
self._send(0x19)
|
|
|
|
def check_frequency(self) -> t.Tuple[bool, float]:
|
|
"""Return True if the frequency was identified successfully."""
|
|
resp = self._send(0x1A)
|
|
is_found = bool(resp[0])
|
|
frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
|
|
return is_found, frequency
|
|
|
|
def find_rf_packet(self, frequency: float = None) -> None:
|
|
"""Enter radiofrequency learning mode."""
|
|
payload = bytearray()
|
|
if frequency:
|
|
payload += struct.pack("<I", int(frequency * 1000))
|
|
self._send(0x1B, payload)
|
|
|
|
def cancel_sweep_frequency(self) -> None:
|
|
"""Cancel sweep frequency."""
|
|
self._send(0x1E)
|
|
|
|
def check_sensors(self) -> dict:
|
|
"""Return the state of the sensors."""
|
|
resp = self._send(0x1)
|
|
temp = struct.unpack("<bb", resp[:0x2])
|
|
return {"temperature": temp[0x0] + temp[0x1] / 10.0}
|
|
|
|
def check_temperature(self) -> float:
|
|
"""Return the temperature."""
|
|
return self.check_sensors()["temperature"]
|
|
|
|
|
|
class rmminib(rmmini):
|
|
"""Controls a Broadlink RM mini 3 (new firmware)."""
|
|
|
|
TYPE = "RMMINIB"
|
|
|
|
def _send(self, command: int, data: bytes = b"") -> bytes:
|
|
"""Send a packet to the device."""
|
|
packet = struct.pack("<HI", len(data) + 4, command) + data
|
|
resp = self.send_packet(0x6A, packet)
|
|
e.check_error(resp[0x22:0x24])
|
|
payload = self.decrypt(resp[0x38:])
|
|
p_len = struct.unpack("<H", payload[:0x2])[0]
|
|
return payload[0x6 : p_len + 2]
|
|
|
|
|
|
class rm4mini(rmminib):
|
|
"""Controls a Broadlink RM4 mini."""
|
|
|
|
TYPE = "RM4MINI"
|
|
|
|
def check_sensors(self) -> dict:
|
|
"""Return the state of the sensors."""
|
|
resp = self._send(0x24)
|
|
temp = struct.unpack("<bb", resp[:0x2])
|
|
return {
|
|
"temperature": temp[0x0] + temp[0x1] / 100.0,
|
|
"humidity": resp[0x2] + resp[0x3] / 100.0,
|
|
}
|
|
|
|
def check_temperature(self) -> float:
|
|
"""Return the temperature."""
|
|
return self.check_sensors()["temperature"]
|
|
|
|
def check_humidity(self) -> float:
|
|
"""Return the humidity."""
|
|
return self.check_sensors()["humidity"]
|
|
|
|
|
|
class rm4pro(rm4mini, rmpro):
|
|
"""Controls a Broadlink RM4 pro."""
|
|
|
|
TYPE = "RM4PRO"
|
|
|
|
|
|
class rm(rmpro):
|
|
"""For backwards compatibility."""
|
|
|
|
TYPE = "RM2"
|
|
|
|
|
|
class rm4(rm4pro):
|
|
"""For backwards compatibility."""
|
|
|
|
TYPE = "RM4"
|