diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 40a4cd5..67d86d9 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -12,6 +12,7 @@ from datetime import datetime from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from .exceptions import check_error, exception def gendevice(devtype, host, mac, name=None, cloud=None): devices = { @@ -221,10 +222,7 @@ class device: payload[0x36] = ord('1') response = self.send_packet(0x65, payload) - - if any(response[0x22:0x24]): - return False - + check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) key = payload[0x04:0x14] @@ -301,7 +299,7 @@ class device: break except socket.timeout: if (time.time() - start_time) > self.timeout: - raise + raise exception(0xfffd) finally: cs.close() return bytearray(response[0]) @@ -328,7 +326,8 @@ class mp1(device): packet[0x0d] = sid_mask packet[0x0e] = sid_mask if state else 0 - self.send_packet(0x6a, packet) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) def set_power(self, sid, state): """Sets the power state of the smart power strip.""" @@ -348,9 +347,7 @@ class mp1(device): packet[0x08] = 0x01 response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err != 0: - return None + check_error(response[0x22:0x24]) payload = self.decrypt(bytes(response[0x38:])) if isinstance(payload[0x4], int): state = payload[0x0e] @@ -384,6 +381,7 @@ class bg1(device): eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`""" packet = self._encode(1, b'{}') response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) return self._decode(response) def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None): @@ -405,6 +403,7 @@ class bg1(device): js = json.dumps(data).encode('utf8') packet = self._encode(2, js) response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) return self._decode(response) def _encode(self, flag, js): @@ -432,10 +431,6 @@ class bg1(device): return packet def _decode(self, response): - err = response[0x22] | (response[0x23] << 8) - if err != 0: - return None - payload = self.decrypt(bytes(response[0x38:])) js_len = struct.unpack_from('> 8 # Checksum 2 position response = self.send_packet(0x6a, packet) - - err = response[0x36] | (response[0x37] << 8) - if err != 0: - return None + check_error(response[0x36:0x38]) payload = self.decrypt(bytes(response[0x38:])) responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py new file mode 100644 index 0000000..daf1e00 --- /dev/null +++ b/broadlink/exceptions.py @@ -0,0 +1,97 @@ +"""Exceptions for Broadlink devices.""" + + +class BroadlinkException(Exception): + """Common base class for all Broadlink exceptions.""" + pass + + +class AuthenticationError(BroadlinkException): + """Authentication error.""" + pass + + +class AuthorizationError(BroadlinkException): + """Authorization error.""" + pass + + +class CommandNotSupportedError(BroadlinkException): + """Command not supported error.""" + pass + + +class ConnectionClosedError(BroadlinkException): + """Connection closed error.""" + pass + + +class DataValidationError(BroadlinkException): + """Data validation error.""" + pass + + +class DeviceOfflineError(BroadlinkException): + """Device offline error.""" + pass + + +class ReadError(BroadlinkException): + """Read error.""" + pass + + +class SendError(BroadlinkException): + """Send error.""" + pass + + +class SSIDNotFoundError(BroadlinkException): + """SSID not found error.""" + pass + + +class StorageError(BroadlinkException): + """Storage error.""" + pass + + +class UnknownError(BroadlinkException): + """Unknown error.""" + pass + + +class WriteError(BroadlinkException): + """Write error.""" + pass + + +FIRMWARE_ERRORS = { + 0xffff: (AuthenticationError, "Authentication failed"), + 0xfffe: (ConnectionClosedError, "You have been logged out"), + 0xfffd: (DeviceOfflineError, "The device is offline"), + 0xfffc: (CommandNotSupportedError, "Command not supported"), + 0xfffb: (StorageError, "The device storage is full"), + 0xfffa: (DataValidationError, "Structure is abnormal"), + 0xfff9: (AuthorizationError, "Control key is expired"), + 0xfff8: (SendError, "Send error"), + 0xfff7: (WriteError, "Write error"), + 0xfff6: (ReadError, "Read error"), + 0xfff5: (SSIDNotFoundError, "SSID could not be found in AP configuration"), +} + + +def exception(error_code): + """Return exception corresponding to an error code.""" + try: + exc, msg = FIRMWARE_ERRORS[error_code] + return exc(msg) + except KeyError: + return UnknownError("Unknown error: " + hex(error_code)) + + +def check_error(error): + """Raise exception if an error occurred.""" + error_code = error[0] | (error[1] << 8) + if error_code: + raise exception(error_code)