2020-09-24 07:36:12 +02:00
|
|
|
"""Support for sensors."""
|
2021-03-31 19:27:05 +02:00
|
|
|
from . import exceptions as e
|
2021-04-03 06:01:38 +02:00
|
|
|
from .device import Device
|
2020-09-17 05:41:32 +02:00
|
|
|
|
|
|
|
|
2021-04-03 06:01:38 +02:00
|
|
|
class a1(Device):
|
2020-09-17 05:41:32 +02:00
|
|
|
"""Controls a Broadlink A1."""
|
|
|
|
|
2021-01-28 23:16:25 +01:00
|
|
|
TYPE = "A1"
|
|
|
|
|
2020-09-17 05:41:32 +02:00
|
|
|
_SENSORS_AND_LEVELS = (
|
2020-11-05 20:00:42 +01:00
|
|
|
("light", ("dark", "dim", "normal", "bright")),
|
|
|
|
("air_quality", ("excellent", "good", "normal", "bad")),
|
|
|
|
("noise", ("quiet", "normal", "noisy")),
|
2020-09-17 05:41:32 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
def check_sensors(self) -> dict:
|
|
|
|
"""Return the state of the sensors."""
|
|
|
|
data = self.check_sensors_raw()
|
|
|
|
for sensor, levels in self._SENSORS_AND_LEVELS:
|
|
|
|
try:
|
|
|
|
data[sensor] = levels[data[sensor]]
|
|
|
|
except IndexError:
|
2020-11-05 20:00:42 +01:00
|
|
|
data[sensor] = "unknown"
|
2020-09-17 05:41:32 +02:00
|
|
|
return data
|
|
|
|
|
|
|
|
def check_sensors_raw(self) -> dict:
|
|
|
|
"""Return the state of the sensors in raw format."""
|
|
|
|
packet = bytearray([0x1])
|
2024-04-12 07:10:06 +02:00
|
|
|
resp = self.send_packet(0x6A, packet)
|
|
|
|
e.check_error(resp[0x22:0x24])
|
|
|
|
data = self.decrypt(resp[0x38:])
|
|
|
|
|
|
|
|
return {
|
|
|
|
"temperature": data[0x04] + data[0x05] / 10.0,
|
|
|
|
"humidity": data[0x06] + data[0x07] / 10.0,
|
|
|
|
"light": data[0x08],
|
|
|
|
"air_quality": data[0x0A],
|
|
|
|
"noise": data[0x0C],
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class a2(Device):
|
|
|
|
"""Controls a Broadlink A2."""
|
|
|
|
|
|
|
|
TYPE = "A2"
|
2021-01-10 02:18:56 +01:00
|
|
|
|
2024-04-12 07:10:06 +02:00
|
|
|
def _send(self, operation: int, data: bytes = b""):
|
|
|
|
"""Send a command to the device."""
|
|
|
|
packet = bytearray(12)
|
|
|
|
packet[0x02] = 0xA5
|
|
|
|
packet[0x03] = 0xA5
|
|
|
|
packet[0x04] = 0x5A
|
|
|
|
packet[0x05] = 0x5A
|
|
|
|
packet[0x08] = operation
|
|
|
|
packet[0x09] = 0x0B
|
|
|
|
|
|
|
|
if data:
|
|
|
|
data_len = len(data)
|
|
|
|
packet[0x0A] = data_len & 0xFF
|
|
|
|
packet[0x0B] = data_len >> 8
|
|
|
|
packet += bytes(2)
|
|
|
|
packet.extend(data)
|
|
|
|
|
|
|
|
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
|
|
|
packet[0x06] = checksum & 0xFF
|
|
|
|
packet[0x07] = checksum >> 8
|
|
|
|
|
|
|
|
packet_len = len(packet) - 2
|
|
|
|
packet[0x00] = packet_len & 0xFF
|
|
|
|
packet[0x01] = packet_len >> 8
|
|
|
|
|
|
|
|
resp = self.send_packet(0x6A, packet)
|
|
|
|
e.check_error(resp[0x22:0x24])
|
|
|
|
payload = self.decrypt(resp[0x38:])
|
|
|
|
return payload
|
|
|
|
|
|
|
|
def check_sensors_raw(self) -> dict:
|
|
|
|
"""Return the state of the sensors in raw format."""
|
|
|
|
data = self._send(1)
|
2021-01-10 02:18:56 +01:00
|
|
|
|
2020-09-17 05:41:32 +02:00
|
|
|
return {
|
2024-04-12 07:10:06 +02:00
|
|
|
"temperature": data[0x13] * 256 + data[0x14],
|
|
|
|
"humidity": data[0x15] * 256 + data[0x16],
|
|
|
|
"pm10": data[0x0D] * 256 + data[0x0E],
|
|
|
|
"pm2_5": data[0x0F] * 256 + data[0x10],
|
|
|
|
"pm1": data[0x11] * 256 + data[0x12],
|
2020-09-17 05:41:32 +02:00
|
|
|
}
|