1
0
Fork 0

Improve code quality (#572)

* Improve typing

* Use better names

* Clean up switch.py

* Remove unused import

* Run black

* Remove unnecessary comments

* Clean up climate.py
This commit is contained in:
Felipe Martins Diel 2021-04-05 14:02:56 -03:00 committed by Felipe Martins Diel
parent b77e803864
commit 12fdf01631
9 changed files with 158 additions and 210 deletions

View File

@ -118,8 +118,8 @@ def gendevice(
dev_type: int,
host: t.Tuple[str, int],
mac: t.Union[bytes, str],
name: str = None,
is_locked: bool = None,
name: str = "",
is_locked: bool = False,
) -> Device:
"""Generate a device."""
try:

View File

@ -9,20 +9,18 @@ class S1C(Device):
TYPE = "S1C"
_SENSORS_TYPES = {
0x31: "Door Sensor", # 49 as hex
0x91: "Key Fob", # 145 as hex, as serial on fob corpse
0x21: "Motion Sensor", # 33 as hex
0x31: "Door Sensor",
0x91: "Key Fob",
0x21: "Motion Sensor",
}
def get_sensors_status(self) -> dict:
"""Return the state of the sensors."""
packet = bytearray(16)
packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors
packet[0] = 0x06
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
if not payload:
return None
count = payload[0x4]
sensor_data = payload[0x6:]
sensors = [

View File

@ -1,5 +1,5 @@
"""Support for climate control."""
from typing import List
import typing as t
from . import exceptions as e
from .device import Device
@ -11,50 +11,38 @@ class hysen(Device):
TYPE = "Hysen heating controller"
# Send a request
# input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00])
# Returns decrypted payload
# New behaviour: raises a ValueError if the device response indicates an error or CRC check fails
# The function prepends length (2 bytes) and appends CRC
def send_request(self, input_payload: bytes) -> bytes:
def send_request(self, request: t.Sequence[int]) -> bytes:
"""Send a request to the device."""
crc = CRC16.calculate(input_payload)
packet = bytearray()
packet.extend((len(request) + 2).to_bytes(2, "little"))
packet.extend(request)
packet.extend(CRC16.calculate(request).to_bytes(2, "little"))
# first byte is length, +2 for CRC16
request_payload = bytearray([len(input_payload) + 2, 0x00])
request_payload.extend(input_payload)
# append CRC
request_payload.append(crc & 0xFF)
request_payload.append((crc >> 8) & 0xFF)
# send to device
response = self.send_packet(0x6A, request_payload)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
response_payload = self.decrypt(response[0x38:])
payload = self.decrypt(response[0x38:])
# experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc)
response_payload_len = response_payload[0]
if response_payload_len + 2 > len(response_payload):
p_len = int.from_bytes(payload[:0x02], "little")
if p_len + 2 > len(payload):
raise ValueError(
"hysen_response_error", "first byte of response is not length"
)
crc = CRC16.calculate(response_payload[2:response_payload_len])
if (response_payload[response_payload_len] == crc & 0xFF) and (
response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF
):
return response_payload[2:response_payload_len]
raise ValueError("hysen_response_error", "CRC check on response failed")
def get_temp(self) -> int:
nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little")
real_crc = CRC16.calculate(payload[0x02:p_len])
if nom_crc != real_crc:
raise ValueError("hysen_response_error", "CRC check on response failed")
return payload[0x02:p_len]
def get_temp(self) -> float:
"""Return the room temperature in degrees celsius."""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]))
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[0x05] / 2.0
def get_external_temp(self) -> int:
def get_external_temp(self) -> float:
"""Return the external temperature in degrees celsius."""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]))
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])
return payload[18] / 2.0
def get_full_status(self) -> dict:
@ -62,28 +50,28 @@ class hysen(Device):
Timer schedule included.
"""
payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]))
payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])
data = {}
data["remote_lock"] = payload[3] & 1
data["power"] = payload[4] & 1
data["active"] = (payload[4] >> 4) & 1
data["temp_manual"] = (payload[4] >> 6) & 1
data["room_temp"] = (payload[5] & 255) / 2.0
data["thermostat_temp"] = (payload[6] & 255) / 2.0
data["auto_mode"] = payload[7] & 15
data["loop_mode"] = (payload[7] >> 4) & 15
data["room_temp"] = payload[5] / 2.0
data["thermostat_temp"] = payload[6] / 2.0
data["auto_mode"] = payload[7] & 0xF
data["loop_mode"] = payload[7] >> 4
data["sensor"] = payload[8]
data["osv"] = payload[9]
data["dif"] = payload[10]
data["svh"] = payload[11]
data["svl"] = payload[12]
data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0
if data["room_temp_adj"] > 32767:
data["room_temp_adj"] = 32767 - data["room_temp_adj"]
data["room_temp_adj"] = (
int.from_bytes(payload[13:15], "big", signed=True) / 10.0
)
data["fre"] = payload[15]
data["poweron"] = payload[16]
data["unknown"] = payload[17]
data["external_temp"] = (payload[18] & 255) / 2.0
data["external_temp"] = payload[18] / 2.0
data["hour"] = payload[19]
data["min"] = payload[20]
data["sec"] = payload[21]
@ -124,7 +112,7 @@ class hysen(Device):
def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None:
"""Set the mode of the device."""
mode_byte = ((loop_mode + 1) << 4) + auto_mode
self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]))
self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])
# Advanced settings
# Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor,
@ -133,7 +121,7 @@ class hysen(Device):
# Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C
# Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C
# Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C
# Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C
# Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C
# Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down,
# 1 for anti-freezing function open. Factory default: 0
# Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0
@ -150,7 +138,7 @@ class hysen(Device):
poweron: int,
) -> None:
"""Set advanced options."""
input_payload = bytearray(
self.send_request(
[
0x01,
0x10,
@ -165,13 +153,12 @@ class hysen(Device):
dif,
svh,
svl,
(int(adj * 2) >> 8 & 0xFF),
(int(adj * 2) & 0xFF),
int(adj * 10) >> 8 & 0xFF,
int(adj * 10) & 0xFF,
fre,
poweron,
]
)
self.send_request(input_payload)
# For backwards compatibility only. Prefer calling set_mode directly.
# Note this function invokes loop_mode=0 and sensor=0.
@ -186,22 +173,20 @@ class hysen(Device):
# Set temperature for manual mode (also activates manual mode if currently in automatic)
def set_temp(self, temp: float) -> None:
"""Set the target temperature."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]))
self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])
# Set device on(1) or off(0), does not deactivate Wifi connectivity.
# Remote lock disables control by buttons on thermostat.
def set_power(self, power: int = 1, remote_lock: int = 0) -> None:
"""Set the power state of the device."""
self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power]))
self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power])
# set time on device
# n.b. day=1 is Monday, ..., day=7 is Sunday
def set_time(self, hour: int, minute: int, second: int, day: int) -> None:
"""Set the time."""
self.send_request(
bytearray(
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
)
[0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day]
)
# Set timer schedule
@ -210,28 +195,26 @@ class hysen(Device):
# {'start_hour':17, 'start_minute':30, 'temp': 22 }
# Each one specifies the thermostat temp that will become effective at start_hour:start_minute
# weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon)
def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None:
"""Set timer schedule."""
# Begin with some magic values ...
input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18])
request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]
# Now simply append times/temps
# weekday times
for i in range(0, 6):
input_payload.append(weekday[i]["start_hour"])
input_payload.append(weekday[i]["start_minute"])
request.append(weekday[i]["start_hour"])
request.append(weekday[i]["start_minute"])
# weekend times
for i in range(0, 2):
input_payload.append(weekend[i]["start_hour"])
input_payload.append(weekend[i]["start_minute"])
request.append(weekend[i]["start_hour"])
request.append(weekend[i]["start_minute"])
# weekday temperatures
for i in range(0, 6):
input_payload.append(int(weekday[i]["temp"] * 2))
request.append(int(weekday[i]["temp"] * 2))
# weekend temperatures
for i in range(0, 2):
input_payload.append(int(weekend[i]["temp"] * 2))
request.append(int(weekend[i]["temp"] * 2))
self.send_request(input_payload)
self.send_request(request)

View File

@ -141,20 +141,12 @@ class Device:
def __str__(self) -> str:
"""Return a readable representation of the device."""
model = []
if self.manufacturer:
model.append(self.manufacturer)
if self.model:
model.append(self.model)
model.append(hex(self.devtype))
model = " ".join(model)
info = []
info.append(model)
info.append(f"{self.host[0]}:{self.host[1]}")
info.append(":".join(format(x, "02x") for x in self.mac).upper())
info = " / ".join(info)
return "%s (%s)" % (self.name or "Unknown", info)
return "%s (%s / %s:%s / %s)" % (
self.name or "Unknown",
" ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])),
*self.host,
":".join(format(x, "02X") for x in self.mac),
)
def update_aes(self, key: bytes) -> None:
"""Update AES."""
@ -177,22 +169,18 @@ class Device:
self.id = 0
self.update_aes(bytes.fromhex(self.__INIT_KEY))
payload = bytearray(0x50)
payload[0x04:0x14] = [0x31]*16
payload[0x1E] = 0x01
payload[0x2D] = 0x01
payload[0x30:0x36] = "Test 1".encode()
packet = bytearray(0x50)
packet[0x04:0x14] = [0x31] * 16
packet[0x1E] = 0x01
packet[0x2D] = 0x01
packet[0x30:0x36] = "Test 1".encode()
response = self.send_packet(0x65, payload)
response = self.send_packet(0x65, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
key = payload[0x04:0x14]
if len(key) % 16 != 0:
return False
self.id = int.from_bytes(payload[:0x4], "little")
self.update_aes(key)
self.update_aes(payload[0x04:0x14])
return True
def hello(self, local_ip_address=None) -> bool:
@ -284,8 +272,8 @@ class Device:
packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55")
packet[0x24:0x26] = self.devtype.to_bytes(2, "little")
packet[0x26:0x28] = packet_type.to_bytes(2, "little")
packet[0x28:0x2a] = self.count.to_bytes(2, "little")
packet[0x2a:0x30] = self.mac[::-1]
packet[0x28:0x2A] = self.count.to_bytes(2, "little")
packet[0x2A:0x30] = self.mac[::-1]
packet[0x30:0x34] = self.id.to_bytes(4, "little")
p_checksum = sum(payload, 0xBEAF) & 0xFFFF

View File

@ -8,10 +8,10 @@ class CRC16:
CRC tables are cached for performance.
"""
_cache = {}
_cache: t.Dict[int, t.List[int]] = {}
@classmethod
def get_table(cls, polynomial: int):
def get_table(cls, polynomial: int) -> t.List[int]:
"""Return the CRC-16 table for a polynomial."""
try:
crc_table = cls._cache[polynomial]

View File

@ -2,7 +2,6 @@
import enum
import json
import struct
import typing as t
from . import exceptions as e
from .device import Device
@ -16,6 +15,7 @@ class lb1(Device):
@enum.unique
class ColorMode(enum.IntEnum):
"""Enumerates color modes."""
RGB = 0
WHITE = 1
SCENE = 2
@ -83,21 +83,21 @@ class lb1(Device):
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, obj: t.Any) -> bytes:
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(14)
js = json.dumps(obj, separators=[',', ':']).encode()
p_len = 12 + len(js)
data = json.dumps(state, separators=(",", ":")).encode()
p_len = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
"<HHHHBBI", packet, 0, p_len, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data)
)
packet += js
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
packet[0x6:0x8] = checksum.to_bytes(2, "little")
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> t.Any:
def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0xA)[0]
@ -113,6 +113,7 @@ class lb27r1(Device):
@enum.unique
class ColorMode(enum.IntEnum):
"""Enumerates color modes."""
RGB = 0
WHITE = 1
SCENE = 2
@ -177,22 +178,20 @@ class lb27r1(Device):
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, obj: t.Any) -> bytes:
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a JSON packet."""
# flag: 1 for reading, 2 for writing.
packet = bytearray(12)
js = json.dumps(obj, separators=[',', ':']).encode()
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0xB, len(js)
)
packet += js
checksum = sum(packet[0x6:], 0xC0AD) & 0xFFFF
packet[0x4:0x6] = checksum.to_bytes(2, "little")
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into("<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0, flag, 0x0B, len(data))
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04:0x06] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> t.Any:
def _decode(self, response: bytes) -> dict:
"""Decode a JSON packet."""
payload = self.decrypt(response[0x38:])
js_len = struct.unpack_from("<I", payload, 0x8)[0]
state = json.loads(payload[0xC : 0xC + js_len])
js_len = struct.unpack_from("<I", payload, 0x08)[0]
state = json.loads(payload[0x0C : 0x0C + js_len])
return state

View File

@ -14,7 +14,7 @@ class Datetime:
data[0x04:0x06] = datetime.year.to_bytes(2, "little")
data[0x06] = datetime.minute
data[0x07] = datetime.hour
data[0x08] = int(datetime.strftime('%y'))
data[0x08] = int(datetime.strftime("%y"))
data[0x09] = datetime.isoweekday()
data[0x0A] = datetime.day
data[0x0B] = datetime.month
@ -37,7 +37,7 @@ class Datetime:
if datetime.isoweekday() != isoweekday:
raise ValueError("isoweekday does not match")
if int(datetime.strftime('%y')) != subyear:
if int(datetime.strftime("%y")) != subyear:
raise ValueError("subyear does not match")
return datetime

View File

@ -10,7 +10,7 @@ class rmmini(Device):
TYPE = "RMMINI"
def _send(self, command: int, data: bytes = b'') -> bytes:
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<I", command) + data
resp = self.send_packet(0x6A, packet)
@ -75,19 +75,19 @@ class rmminib(rmmini):
TYPE = "RMMINIB"
def _send(self, command: int, data: bytes = b'') -> bytes:
def _send(self, command: int, data: bytes = b"") -> bytes:
"""Send a packet to the device."""
packet = struct.pack("<HI", len(data) + 4, command) + data
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
p_len = struct.unpack("<H", payload[:0x2])[0]
return payload[0x6:p_len+2]
return payload[0x6 : p_len + 2]
class rm4mini(rmminib):
"""Controls a Broadlink RM4 mini."""
TYPE = "RM4MINI"
def check_sensors(self) -> dict:
@ -96,7 +96,7 @@ class rm4mini(rmminib):
temp = struct.unpack("<bb", resp[:0x2])
return {
"temperature": temp[0x0] + temp[0x1] / 100.0,
"humidity": resp[0x2] + resp[0x3] / 100.0
"humidity": resp[0x2] + resp[0x3] / 100.0,
}
def check_temperature(self) -> float:

View File

@ -11,7 +11,7 @@ class mp1(Device):
TYPE = "MP1"
def set_power_mask(self, sid_mask: int, state: bool) -> None:
def set_power_mask(self, sid_mask: int, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0x00] = 0x0D
@ -19,20 +19,20 @@ class mp1(Device):
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask)
packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask)
packet[0x07] = 0xC0
packet[0x08] = 0x02
packet[0x0A] = 0x03
packet[0x0D] = sid_mask
packet[0x0E] = sid_mask if state else 0
packet[0x0E] = sid_mask if pwr else 0
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
def set_power(self, sid: int, state: bool) -> None:
def set_power(self, sid: int, pwr: bool) -> None:
"""Set the power state of the device."""
sid_mask = 0x01 << (sid - 1)
self.set_power_mask(sid_mask, state)
self.set_power_mask(sid_mask, pwr)
def check_power_raw(self) -> int:
"""Return the power state of the device in raw format."""
@ -53,15 +53,13 @@ class mp1(Device):
def check_power(self) -> dict:
"""Return the power state of the device."""
state = self.check_power_raw()
if state is None:
return {"s1": None, "s2": None, "s3": None, "s4": None}
data = {}
data["s1"] = bool(state & 0x01)
data["s2"] = bool(state & 0x02)
data["s3"] = bool(state & 0x04)
data["s4"] = bool(state & 0x08)
return data
data = self.check_power_raw()
return {
"s1": bool(data & 1),
"s2": bool(data & 2),
"s3": bool(data & 4),
"s4": bool(data & 8),
}
class bg1(Device):
@ -74,7 +72,7 @@ class bg1(Device):
Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`
"""
packet = self._encode(1, b"{}")
packet = self._encode(1, {})
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
@ -90,48 +88,38 @@ class bg1(Device):
idcbrightness: int = None,
) -> dict:
"""Set the power state of the device."""
data = {}
state = {}
if pwr is not None:
data["pwr"] = int(bool(pwr))
state["pwr"] = int(bool(pwr))
if pwr1 is not None:
data["pwr1"] = int(bool(pwr1))
state["pwr1"] = int(bool(pwr1))
if pwr2 is not None:
data["pwr2"] = int(bool(pwr2))
state["pwr2"] = int(bool(pwr2))
if maxworktime is not None:
data["maxworktime"] = maxworktime
state["maxworktime"] = maxworktime
if maxworktime1 is not None:
data["maxworktime1"] = maxworktime1
state["maxworktime1"] = maxworktime1
if maxworktime2 is not None:
data["maxworktime2"] = maxworktime2
state["maxworktime2"] = maxworktime2
if idcbrightness is not None:
data["idcbrightness"] = idcbrightness
js = json.dumps(data).encode("utf8")
packet = self._encode(2, js)
state["idcbrightness"] = idcbrightness
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
return self._decode(response)
def _encode(self, flag: int, js: str) -> bytes:
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
# The packet format is:
# 0x00-0x01 length
# 0x02-0x05 header
# 0x06-0x07 00
# 0x08 flag (1 for read or 2 write?)
# 0x09 unknown (0xb)
# 0x0a-0x0d length of json
# 0x0e- json data
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(js)
data = json.dumps(state).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(js)
"<HHHHBBI", packet, 0, length, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
for i in range(len(js)):
packet.append(js[i])
checksum = sum(packet[0x08:], 0xC0AD) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet.extend(data)
checksum = sum(packet[0x2:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
@ -147,10 +135,10 @@ class sp1(Device):
TYPE = "SP1"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(4)
packet[0] = state
packet[0] = bool(pwr)
response = self.send_packet(0x66, packet)
e.check_error(response[0x22:0x24])
@ -160,11 +148,11 @@ class sp2(Device):
TYPE = "SP2"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0] = 2
packet[4] = int(bool(state))
packet[4] = bool(pwr)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
@ -198,25 +186,19 @@ class sp3(Device):
TYPE = "SP3"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
packet = bytearray(16)
packet[0] = 2
if self.check_nightlight():
packet[4] = 3 if state else 2
else:
packet[4] = 1 if state else 0
packet[4] = self.check_nightlight() << 1 | bool(pwr)
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
def set_nightlight(self, state: bool) -> None:
def set_nightlight(self, ntlight: bool) -> None:
"""Set the night light state of the device."""
packet = bytearray(16)
packet[0] = 2
if self.check_power():
packet[4] = 3 if state else 1
else:
packet[4] = 2 if state else 0
packet[4] = bool(ntlight) << 1 | self.check_power()
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
@ -227,7 +209,7 @@ class sp3(Device):
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD)
return bool(payload[0x4] & 1)
def check_nightlight(self) -> bool:
"""Return the state of the night light."""
@ -236,7 +218,7 @@ class sp3(Device):
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF)
return bool(payload[0x4] & 2)
class sp3s(sp2):
@ -259,13 +241,13 @@ class sp4(Device):
TYPE = "SP4"
def set_power(self, state: bool) -> None:
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
self.set_state(pwr=state)
self.set_state(pwr=pwr)
def set_nightlight(self, state: bool) -> None:
def set_nightlight(self, ntlight: bool) -> None:
"""Set the night light state of the device."""
self.set_state(ntlight=state)
self.set_state(ntlight=ntlight)
def set_state(
self,
@ -277,21 +259,21 @@ class sp4(Device):
childlock: bool = None,
) -> dict:
"""Set state of device."""
data = {}
state = {}
if pwr is not None:
data["pwr"] = int(bool(pwr))
state["pwr"] = int(bool(pwr))
if ntlight is not None:
data["ntlight"] = int(bool(ntlight))
state["ntlight"] = int(bool(ntlight))
if indicator is not None:
data["indicator"] = int(bool(indicator))
state["indicator"] = int(bool(indicator))
if ntlbrightness is not None:
data["ntlbrightness"] = ntlbrightness
state["ntlbrightness"] = ntlbrightness
if maxworktime is not None:
data["maxworktime"] = maxworktime
state["maxworktime"] = maxworktime
if childlock is not None:
data["childlock"] = int(bool(childlock))
state["childlock"] = int(bool(childlock))
packet = self._encode(2, data)
packet = self._encode(2, state)
response = self.send_packet(0x6A, packet)
return self._decode(response)
@ -313,15 +295,14 @@ class sp4(Device):
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
payload = json.dumps(state, separators=(",", ":")).encode()
packet = bytearray(12)
data = json.dumps(state, separators=(",", ":")).encode()
struct.pack_into(
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(payload)
"<HHHBBI", packet, 0, 0xA5A5, 0x5A5A, 0x0000, flag, 0x0B, len(data)
)
packet.extend(payload)
packet.extend(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[0x04] = checksum & 0xFF
packet[0x05] = checksum >> 8
packet[0x04:0x06] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict:
@ -352,9 +333,9 @@ class sp4b(sp4):
def _encode(self, flag: int, state: dict) -> bytes:
"""Encode a message."""
payload = json.dumps(state, separators=(",", ":")).encode()
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(payload)
data = json.dumps(state, separators=(",", ":")).encode()
length = 12 + len(data)
struct.pack_into(
"<HHHHBBI",
packet,
@ -365,12 +346,11 @@ class sp4b(sp4):
0x0000,
flag,
0x0B,
len(payload),
len(data),
)
packet.extend(payload)
checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF
packet[0x06] = checksum & 0xFF
packet[0x07] = checksum >> 8
packet.extend(data)
checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF
packet[0x06:0x08] = checksum.to_bytes(2, "little")
return packet
def _decode(self, response: bytes) -> dict: