1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-11-22 07:00:12 +01:00

Raise exceptions for errors (#356)

* Raise exceptions for errors

* Fix lb1 error check

* Fix typo

* Improve code readability

* Fix exception
This commit is contained in:
Felipe Martins Diel 2020-05-07 02:59:21 -03:00 committed by GitHub
parent 5e444988e1
commit 3216b069f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 132 additions and 64 deletions

View File

@ -12,6 +12,7 @@ from datetime import datetime
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .exceptions import check_error, exception
def gendevice(devtype, host, mac, name=None, cloud=None): def gendevice(devtype, host, mac, name=None, cloud=None):
devices = { devices = {
@ -221,10 +222,7 @@ class device:
payload[0x36] = ord('1') payload[0x36] = ord('1')
response = self.send_packet(0x65, payload) response = self.send_packet(0x65, payload)
check_error(response[0x22:0x24])
if any(response[0x22:0x24]):
return False
payload = self.decrypt(response[0x38:]) payload = self.decrypt(response[0x38:])
key = payload[0x04:0x14] key = payload[0x04:0x14]
@ -301,7 +299,7 @@ class device:
break break
except socket.timeout: except socket.timeout:
if (time.time() - start_time) > self.timeout: if (time.time() - start_time) > self.timeout:
raise raise exception(0xfffd)
finally: finally:
cs.close() cs.close()
return bytearray(response[0]) return bytearray(response[0])
@ -328,7 +326,8 @@ class mp1(device):
packet[0x0d] = sid_mask packet[0x0d] = sid_mask
packet[0x0e] = sid_mask if state else 0 packet[0x0e] = sid_mask if state else 0
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def set_power(self, sid, state): def set_power(self, sid, state):
"""Sets the power state of the smart power strip.""" """Sets the power state of the smart power strip."""
@ -348,9 +347,7 @@ class mp1(device):
packet[0x08] = 0x01 packet[0x08] = 0x01
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): if isinstance(payload[0x4], int):
state = payload[0x0e] state = payload[0x0e]
@ -384,6 +381,7 @@ class bg1(device):
eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`""" eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`"""
packet = self._encode(1, b'{}') packet = self._encode(1, b'{}')
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response) return self._decode(response)
def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None): def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None):
@ -405,6 +403,7 @@ class bg1(device):
js = json.dumps(data).encode('utf8') js = json.dumps(data).encode('utf8')
packet = self._encode(2, js) packet = self._encode(2, js)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response) return self._decode(response)
def _encode(self, flag, js): def _encode(self, flag, js):
@ -432,10 +431,6 @@ class bg1(device):
return packet return packet
def _decode(self, response): def _decode(self, response):
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
js_len = struct.unpack_from('<I', payload, 0x0a)[0] js_len = struct.unpack_from('<I', payload, 0x0a)[0]
state = json.loads(payload[0x0e:0x0e+js_len]) state = json.loads(payload[0x0e:0x0e+js_len])
@ -449,7 +444,8 @@ class sp1(device):
def set_power(self, state): def set_power(self, state):
packet = bytearray(4) packet = bytearray(4)
packet[0] = state packet[0] = state
self.send_packet(0x66, packet) response = self.send_packet(0x66, packet)
check_error(response[0x22:0x24])
class sp2(device): class sp2(device):
@ -465,7 +461,8 @@ class sp2(device):
packet[4] = 3 if state else 2 packet[4] = 3 if state else 2
else: else:
packet[4] = 1 if state else 0 packet[4] = 1 if state else 0
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def set_nightlight(self, state): def set_nightlight(self, state):
"""Sets the night light state of the smart plug""" """Sets the night light state of the smart plug"""
@ -475,16 +472,15 @@ class sp2(device):
packet[4] = 3 if state else 1 packet[4] = 3 if state else 1
else: else:
packet[4] = 2 if state else 0 packet[4] = 2 if state else 0
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def check_power(self): def check_power(self):
"""Returns the power state of the smart plug.""" """Returns the power state of the smart plug."""
packet = bytearray(16) packet = bytearray(16)
packet[0] = 1 packet[0] = 1
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): if isinstance(payload[0x4], int):
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
@ -495,9 +491,7 @@ class sp2(device):
packet = bytearray(16) packet = bytearray(16)
packet[0] = 1 packet[0] = 1
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): if isinstance(payload[0x4], int):
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
@ -506,9 +500,7 @@ class sp2(device):
def get_energy(self): def get_energy(self):
packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45])
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x7], int): if isinstance(payload[0x7], int):
energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0
@ -527,9 +519,7 @@ class a1(device):
packet = bytearray(16) packet = bytearray(16)
packet[0] = 1 packet[0] = 1
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
data = {} data = {}
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): if isinstance(payload[0x4], int):
@ -578,9 +568,7 @@ class a1(device):
packet = bytearray(16) packet = bytearray(16)
packet[0] = 1 packet[0] = 1
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
data = {} data = {}
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): if isinstance(payload[0x4], int):
@ -609,9 +597,7 @@ class rm(device):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x04) packet.append(0x04)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
return payload[len(self._request_header) + 4:] return payload[len(self._request_header) + 4:]
@ -619,30 +605,32 @@ class rm(device):
packet = bytearray(self._code_sending_header) packet = bytearray(self._code_sending_header)
packet += bytes([0x02, 0x00, 0x00, 0x00]) packet += bytes([0x02, 0x00, 0x00, 0x00])
packet += data packet += data
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def enter_learning(self): def enter_learning(self):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x03) packet.append(0x03)
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def sweep_frequency(self): def sweep_frequency(self):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x19) packet.append(0x19)
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def cancel_sweep_frequency(self): def cancel_sweep_frequency(self):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x1e) packet.append(0x1e)
self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
def check_frequency(self): def check_frequency(self):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x1a) packet.append(0x1a)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if payload[len(self._request_header) + 4] == 1: if payload[len(self._request_header) + 4] == 1:
return True return True
@ -652,9 +640,7 @@ class rm(device):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(0x1b) packet.append(0x1b)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if payload[len(self._request_header) + 4] == 1: if payload[len(self._request_header) + 4] == 1:
return True return True
@ -664,9 +650,7 @@ class rm(device):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(type) packet.append(type)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return False
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
value_pos = len(self._request_header) + offset value_pos = len(self._request_header) + offset
if isinstance(payload[value_pos], int): if isinstance(payload[value_pos], int):
@ -767,12 +751,7 @@ class hysen(device):
# send to device # send to device
response = self.send_packet(0x6a, request_payload) response = self.send_packet(0x6a, request_payload)
check_error(response[0x22:0x24])
# check for error
err = response[0x22] | (response[0x23] << 8)
if err:
raise ValueError('broadlink_response_error', err)
response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) response_payload = bytearray(self.decrypt(bytes(response[0x38:])))
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
@ -940,10 +919,7 @@ class S1C(device):
packet = bytearray(16) packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if not payload: if not payload:
return None return None
@ -991,9 +967,7 @@ class dooya(device):
packet[9] = 0xfa packet[9] = 0xfa
packet[10] = 0x44 packet[10] = 0x44
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
err = response[0x22] | (response[0x23] << 8) check_error(response[0x22:0x24])
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
return ord(payload[4]) return ord(payload[4])
@ -1059,10 +1033,7 @@ class lb1(device):
packet[0x07] = checksum >> 8 # Checksum 2 position packet[0x07] = checksum >> 8 # Checksum 2 position
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x36:0x38])
err = response[0x36] | (response[0x37] << 8)
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8)

97
broadlink/exceptions.py Normal file
View File

@ -0,0 +1,97 @@
"""Exceptions for Broadlink devices."""
class BroadlinkException(Exception):
"""Common base class for all Broadlink exceptions."""
pass
class AuthenticationError(BroadlinkException):
"""Authentication error."""
pass
class AuthorizationError(BroadlinkException):
"""Authorization error."""
pass
class CommandNotSupportedError(BroadlinkException):
"""Command not supported error."""
pass
class ConnectionClosedError(BroadlinkException):
"""Connection closed error."""
pass
class DataValidationError(BroadlinkException):
"""Data validation error."""
pass
class DeviceOfflineError(BroadlinkException):
"""Device offline error."""
pass
class ReadError(BroadlinkException):
"""Read error."""
pass
class SendError(BroadlinkException):
"""Send error."""
pass
class SSIDNotFoundError(BroadlinkException):
"""SSID not found error."""
pass
class StorageError(BroadlinkException):
"""Storage error."""
pass
class UnknownError(BroadlinkException):
"""Unknown error."""
pass
class WriteError(BroadlinkException):
"""Write error."""
pass
FIRMWARE_ERRORS = {
0xffff: (AuthenticationError, "Authentication failed"),
0xfffe: (ConnectionClosedError, "You have been logged out"),
0xfffd: (DeviceOfflineError, "The device is offline"),
0xfffc: (CommandNotSupportedError, "Command not supported"),
0xfffb: (StorageError, "The device storage is full"),
0xfffa: (DataValidationError, "Structure is abnormal"),
0xfff9: (AuthorizationError, "Control key is expired"),
0xfff8: (SendError, "Send error"),
0xfff7: (WriteError, "Write error"),
0xfff6: (ReadError, "Read error"),
0xfff5: (SSIDNotFoundError, "SSID could not be found in AP configuration"),
}
def exception(error_code):
"""Return exception corresponding to an error code."""
try:
exc, msg = FIRMWARE_ERRORS[error_code]
return exc(msg)
except KeyError:
return UnknownError("Unknown error: " + hex(error_code))
def check_error(error):
"""Raise exception if an error occurred."""
error_code = error[0] | (error[1] << 8)
if error_code:
raise exception(error_code)