1
0
mirror of https://github.com/mjg59/python-broadlink.git synced 2024-09-21 04:20:36 +02:00

Add support for BG Electrical Smart Sockets

This commit is contained in:
Barnaby Gray 2019-10-13 22:12:13 +01:00
parent cbb1d67df8
commit 660c2269e4

View File

@ -1,8 +1,10 @@
#!/usr/bin/python
import codecs
import json
import random
import socket
import struct
import threading
import time
from datetime import datetime
@ -48,7 +50,8 @@ def gendevice(devtype, host, mac):
],
hysen: [0x4EAD], # Hysen controller
S1C: [0x2722], # S1 (SmartOne Alarm Kit)
dooya: [0x4E4D] # Dooya DT360E (DOOYA_CURTAIN_V2)
dooya: [0x4E4D], # Dooya DT360E (DOOYA_CURTAIN_V2)
bg1: [0x51E3] # BG Electrical Smart Power Socket
}
# Look for the class associated to devtype in devices
@ -349,6 +352,71 @@ class mp1(device):
return data
class bg1(device):
def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype)
self.type = "BG1"
def get_state(self):
"""Get state of device"""
packet = self._encode(1, b'{}')
response = self.send_packet(0x6a, packet)
return self._decode(response)
def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None):
data = {}
if pwr is not None:
data['pwr'] = int(bool(pwr))
if pwr1 is not None:
data['pwr1'] = int(bool(pwr1))
if pwr2 is not None:
data['pwr2'] = int(bool(pwr2))
if maxworktime is not None:
data['maxworktime'] = maxworktime
if maxworktime1 is not None:
data['maxworktime1'] = maxworktime1
if maxworktime2 is not None:
data['maxworktime2'] = maxworktime2
if idcbrightness is not None:
data['idcbrightness'] = idcbrightness
js = json.dumps(data).encode('utf8')
packet = self._encode(2, js)
response = self.send_packet(0x6a, packet)
return self._decode(response)
def _encode(self, flag, js):
# packet format is:
# 0x00-0x01 length
# 0x02-0x05 header
# 0x06-0x07 00
# 0x08 flag (1 for read or 2 write?)
# 0x09 unknown (0xb)
# 0x0a-0x0d length of json
# 0x0e- json data
packet = bytearray(14)
length = 4 + 2 + 2 + 4 + len(js)
struct.pack_into('<HHHHBBI', packet, 0, length, 0xa5a5, 0x5a5a, 0x0000, flag, 0x0b, len(js))
for i in range(len(js)):
packet.append(js[i])
checksum = 0xc0ad
for c in packet[0x08:]:
checksum = (checksum + c) & 0xffff
packet[0x06] = checksum & 0xff
packet[0x07] = checksum >> 8
return packet
def _decode(self, response):
err = response[0x22] | (response[0x23] << 8)
if err != 0:
return None
payload = self.decrypt(bytes(response[0x38:]))
js_len = struct.unpack_from('<I', payload, 0x0a)[0]
state = json.loads(payload[0x0e:0x0e+js_len])
return state
class sp1(device):
def __init__(self, host, mac, devtype):
device.__init__(self, host, mac, devtype)