1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-09-21 04:20:36 +02:00

Improve A1, RM and RM4 sensors (#374)

* Clean up A1 sensor

* Improve communication with RM4 sensors

* Add check_sensors() to RM class

* Fix parenthesis

* Fix noise levels

* Fix precision error
This commit is contained in:
Felipe Martins Diel 2020-06-08 07:21:23 -03:00 committed by GitHub
parent 47087bfc7f
commit 89cd7d2970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -525,79 +525,39 @@ class sp2(device):
class a1(device): class a1(device):
_SENSORS_AND_LEVELS = (
('light', ('dark', 'dim', 'normal', 'bright')),
('air_quality', ('excellent', 'good', 'normal', 'bad')),
('noise', ('quiet', 'normal', 'noisy')),
)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
device.__init__(self, *args, **kwargs) device.__init__(self, *args, **kwargs)
self.type = "A1" self.type = "A1"
def check_sensors(self): def check_sensors(self):
packet = bytearray(16) data = self.check_sensors_raw()
packet[0] = 1 for sensor, levels in self._SENSORS_AND_LEVELS:
response = self.send_packet(0x6a, packet) try:
check_error(response[0x22:0x24]) data[sensor] = levels[data[sensor]]
data = {} except IndexError:
payload = self.decrypt(bytes(response[0x38:])) data[sensor] = 'unknown'
if isinstance(payload[0x4], int):
data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0
data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0
light = payload[0x8]
air_quality = payload[0x0a]
noise = payload[0xc]
else:
data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0
data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0
light = ord(payload[0x8])
air_quality = ord(payload[0x0a])
noise = ord(payload[0xc])
if light == 0:
data['light'] = 'dark'
elif light == 1:
data['light'] = 'dim'
elif light == 2:
data['light'] = 'normal'
elif light == 3:
data['light'] = 'bright'
else:
data['light'] = 'unknown'
if air_quality == 0:
data['air_quality'] = 'excellent'
elif air_quality == 1:
data['air_quality'] = 'good'
elif air_quality == 2:
data['air_quality'] = 'normal'
elif air_quality == 3:
data['air_quality'] = 'bad'
else:
data['air_quality'] = 'unknown'
if noise == 0:
data['noise'] = 'quiet'
elif noise == 1:
data['noise'] = 'normal'
elif noise == 2:
data['noise'] = 'noisy'
else:
data['noise'] = 'unknown'
return data return data
def check_sensors_raw(self): def check_sensors_raw(self):
packet = bytearray(16) packet = bytearray([0x1])
packet[0] = 1
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24]) check_error(response[0x22:0x24])
data = {}
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
if isinstance(payload[0x4], int): data = bytearray(payload[0x4:])
data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 return {
data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 'temperature': data[0x0] + data[0x1] / 10.0,
data['light'] = payload[0x8] 'humidity': data[0x2] + data[0x3] / 10.0,
data['air_quality'] = payload[0x0a] 'light': data[0x4],
data['noise'] = payload[0xc] 'air_quality': data[0x6],
else: 'noise': data[0x8],
data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 }
data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0
data['light'] = ord(payload[0x8])
data['air_quality'] = ord(payload[0x0a])
data['noise'] = ord(payload[0xc])
return data
class rm(device): class rm(device):
@ -660,21 +620,22 @@ class rm(device):
return True return True
return False return False
def _read_sensor(self, type, offset, divider): def _check_sensors(self, command):
packet = bytearray(self._request_header) packet = bytearray(self._request_header)
packet.append(type) packet.append(command)
response = self.send_packet(0x6a, packet) response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24]) check_error(response[0x22:0x24])
payload = self.decrypt(bytes(response[0x38:])) payload = self.decrypt(bytes(response[0x38:]))
value_pos = len(self._request_header) + offset return bytearray(payload[len(self._request_header) + 4:])
if isinstance(payload[value_pos], int):
value = (payload[value_pos] + payload[value_pos+1] / divider)
else:
value = (ord(payload[value_pos]) + ord(payload[value_pos+1]) / divider)
return value
def check_temperature(self): def check_temperature(self):
return self._read_sensor( 0x01, 4, 10.0 ) data = self._check_sensors(0x1)
return data[0x0] + data[0x1] / 10.0
def check_sensors(self):
data = self._check_sensors(0x1)
return {'temperature': data[0x0] + data[0x1] / 10.0}
class rm4(rm): class rm4(rm):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -684,17 +645,21 @@ class rm4(rm):
self._code_sending_header = b'\xd0\x00' self._code_sending_header = b'\xd0\x00'
def check_temperature(self): def check_temperature(self):
return self._read_sensor( 0x24, 4, 100.0 ) data = self._check_sensors(0x24)
return data[0x0] + data[0x1] / 100.0
def check_humidity(self): def check_humidity(self):
return self._read_sensor( 0x24, 6, 100.0 ) data = self._check_sensors(0x24)
return data[0x2] + data[0x3] / 100.0
def check_sensors(self): def check_sensors(self):
data = self._check_sensors(0x24)
return { return {
'temperature': self.check_temperature(), 'temperature': data[0x0] + data[0x1] / 100.0,
'humidity': self.check_humidity() 'humidity': data[0x2] + data[0x3] / 100.0
} }
# For legacy compatibility - don't use this # For legacy compatibility - don't use this
class rm2(rm): class rm2(rm):
def __init__(self): def __init__(self):