From 247be74c33b533b42a14be98179d9f8b5293d385 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 10 Apr 2024 22:51:41 -0300 Subject: [PATCH] Expose IR/RF conversion functions (#788) * Move IR duration<->Broadlink conversion down from CLI * Fix --learn base64 to not crash with --durations Also remove its b'...' wrapping. * Fix IR/RF conversions --------- Co-authored-by: William Grant --- broadlink/remote.py | 41 +++++++++++++++++ cli/broadlink_cli | 106 ++++++++++++++++---------------------------- 2 files changed, 79 insertions(+), 68 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index f4db3d2..89cb71f 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -6,6 +6,47 @@ from . import exceptions as e from .device import Device +def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError: + raise ValueError("Malformed data.") + index += 2 + + result.append(int(chunk * tick)) + + return result + + class rmmini(Device): """Controls a Broadlink RM mini 3.""" diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 1083e59..35317ee 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,68 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +import typing as t import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result +def format_pulses(pulses: t.List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result - - -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result - - -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: t.List[str]) -> t.List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -112,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -125,8 +89,11 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() @@ -144,17 +111,19 @@ if args.learn or (args.learnfile and not args.rflearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -238,14 +207,15 @@ if args.rflearn: exit(1) print("Packet found!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt)