1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-21 22:51:41 +01:00

Remove legacy byte conversions

This commit is contained in:
Felipe Martins Diel 2020-09-20 06:16:49 -03:00 committed by Matthew Garrett
parent de38e237ca
commit 5ef4124491
9 changed files with 43 additions and 66 deletions

View File

@ -1,5 +1,3 @@
import codecs
from .device import device
from .exceptions import check_error
@ -25,7 +23,7 @@ class S1C(device):
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
if not payload:
return None
count = payload[0x4]
@ -34,11 +32,11 @@ class S1C(device):
sens_res = []
for sens in sensors_a:
status = ord(chr(sens[0]))
_name = str(bytes(sens[4:26]).decode())
_order = ord(chr(sens[1]))
_type = ord(chr(sens[3]))
_serial = bytes(codecs.encode(sens[26:30], "hex")).decode()
status = sens[0]
_name = sens[4:26].decode()
_order = sens[1]
_type = sens[3]
_serial = sens[26:30].hex()
type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown')

View File

@ -19,9 +19,9 @@ class hysen(device):
# New behaviour: raises a ValueError if the device response indicates an error or CRC check fails
# The function prepends length (2 bytes) and appends CRC
def send_request(self, input_payload: bytearray) -> bytes:
def send_request(self, input_payload: bytes) -> bytes:
"""Send a request to the device."""
crc = calculate_crc16(bytes(input_payload))
crc = calculate_crc16(input_payload)
# first byte is length, +2 for CRC16
request_payload = bytearray([len(input_payload) + 2, 0x00])
@ -34,13 +34,13 @@ class hysen(device):
# send to device
response = self.send_packet(0x6a, request_payload)
check_error(response[0x22:0x24])
response_payload = bytearray(self.decrypt(bytes(response[0x38:])))
response_payload = self.decrypt(response[0x38:])
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
response_payload_len = response_payload[0]
if response_payload_len + 2 > len(response_payload):
raise ValueError('hysen_response_error', 'first byte of response is not length')
crc = calculate_crc16(bytes(response_payload[2:response_payload_len]))
crc = calculate_crc16(response_payload[2:response_payload_len])
if (response_payload[response_payload_len] == crc & 0xFF) and (
response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF):
return response_payload[2:response_payload_len]

View File

@ -23,8 +23,8 @@ class dooya(device):
packet[10] = 0x44
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
return ord(payload[4])
payload = self.decrypt(response[0x38:])
return payload[4]
def open(self) -> int:
"""Open the curtain."""

View File

@ -34,14 +34,14 @@ class device:
self.manufacturer = manufacturer
self.is_locked = is_locked
self.count = random.randrange(0xffff)
self.iv = bytearray(
self.iv = bytes(
[0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58])
self.id = bytearray([0, 0, 0, 0])
self.id = bytes(4)
self.type = "Unknown"
self.lock = threading.Lock()
self.aes = None
key = bytearray(
key = bytes(
[0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02])
self.update_aes(key)
@ -133,7 +133,7 @@ class device:
"""Return device type."""
return self.type
def send_packet(self, command: int, payload: bytearray) -> bytearray:
def send_packet(self, command: int, payload: bytes) -> bytes:
"""Send a packet to the device."""
self.count = (self.count + 1) & 0xffff
packet = bytearray(0x38)
@ -162,8 +162,10 @@ class device:
packet[0x33] = self.id[0]
# pad the payload for AES encryption
if payload:
payload += bytearray((16 - len(payload)) % 16)
padding = (16 - len(payload)) % 16
if padding:
payload = bytearray(payload)
payload += bytearray(padding)
checksum = sum(payload, 0xbeaf) & 0xffff
packet[0x34] = checksum & 0xff
@ -187,7 +189,6 @@ class device:
cs.sendto(packet, self.host)
cs.settimeout(1)
resp, _ = cs.recvfrom(2048)
resp = bytearray(resp)
break
except socket.timeout:
if (time.time() - start_time) > self.timeout:

View File

@ -21,8 +21,8 @@ def get_local_ip() -> str:
return s.getsockname()[0]
def calculate_crc16(input_data) -> int:
"""Calculate CRC-16."""
def calculate_crc16(input_data: bytes) -> int:
"""Calculate the CRC-16 of a byte string."""
crc16_tab = []
crc16_constant = 0xA001
@ -35,24 +35,11 @@ def calculate_crc16(input_data) -> int:
crc = c_ushort(crc >> 1).value
crc16_tab.append(hex(crc))
try:
is_string = isinstance(input_data, str)
is_bytes = isinstance(input_data, bytes)
crcValue = 0xFFFF
if not is_string and not is_bytes:
raise Exception(
"Please provide a string or a byte sequence "
"as argument for calculation."
)
for c in input_data:
tmp = crcValue ^ c
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0)
crcValue = 0xFFFF
for c in input_data:
d = ord(c) if is_string else c
tmp = crcValue ^ d
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0)
return crcValue
except Exception as e:
print("EXCEPTION(calculate): {}".format(e))
return crcValue

View File

@ -44,7 +44,7 @@ class lb1(device):
response = self.send_packet(0x6a, packet)
check_error(response[0x36:0x38])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8)
if responseLength > 0:

View File

@ -18,7 +18,7 @@ class rm(device):
packet.append(0x04)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
return payload[len(self._request_header) + 4:]
def send_data(self, data: bytes) -> None:
@ -56,7 +56,7 @@ class rm(device):
packet.append(0x1a)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
if payload[len(self._request_header) + 4] == 1:
return True
return False
@ -67,7 +67,7 @@ class rm(device):
packet.append(0x1b)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
if payload[len(self._request_header) + 4] == 1:
return True
return False
@ -78,7 +78,7 @@ class rm(device):
packet.append(command)
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
return bytearray(payload[len(self._request_header) + 4:])
def check_temperature(self) -> int:

View File

@ -31,7 +31,7 @@ class a1(device):
packet = bytearray([0x1])
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
data = bytearray(payload[0x4:])
return {
'temperature': data[0x0] + data[0x1] / 10.0,

View File

@ -50,7 +50,7 @@ class mp1(device):
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
return payload[0x0e]
def check_power(self) -> dict:
@ -116,7 +116,7 @@ class bg1(device):
check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, js: str) -> bytearray:
def _encode(self, flag: int, js: str) -> bytes:
"""Encode a message."""
# The packet format is:
# 0x00-0x01 length
@ -139,7 +139,7 @@ class bg1(device):
def _decode(self, response: bytes) -> dict:
"""Decode a message."""
payload = self.decrypt(bytes(response[0x38:]))
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from('<I', payload, 0x0a)[0]
state = json.loads(payload[0x0e:0x0e+js_len])
return state
@ -197,10 +197,8 @@ class sp2(device):
packet[0] = 1
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
return bool(ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD)
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
def check_nightlight(self) -> bool:
"""Return the state of the night light."""
@ -208,20 +206,13 @@ class sp2(device):
packet[0] = 1
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int):
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF)
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
def get_energy(self) -> int:
"""Return the energy state of the device."""
packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45])
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x7], int):
energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
else:
energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int(
hex(ord(payload[0x05]))[2:]) / 100.0
return energy
payload = self.decrypt(response[0x38:])
return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0