Compare commits
3 Commits
d9cb84e566
...
d9f822a1f7
Author | SHA1 | Date |
---|---|---|
Felipe Martins Diel | d9f822a1f7 | |
Felipe Martins Diel | 84af992dcc | |
Felipe Martins Diel | ab4311e799 |
|
@ -7,7 +7,7 @@ from . import exceptions as e
|
|||
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
|
||||
from .alarm import S1C
|
||||
from .climate import hysen
|
||||
from .cover import dooya, dooya2
|
||||
from .cover import dooya, dooya2, wser
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
from .light import lb1, lb2
|
||||
|
@ -183,6 +183,9 @@ SUPPORTED_TYPES = {
|
|||
dooya2: {
|
||||
0x4F6E: ("DT360E-45/20", "Dooya"),
|
||||
},
|
||||
wser: {
|
||||
0x4F6C: ("WSER", "Wistar"),
|
||||
},
|
||||
bg1: {
|
||||
0x51E3: ("BG800/BG900", "BG Electrical"),
|
||||
},
|
||||
|
|
|
@ -13,15 +13,16 @@ class dooya(Device):
|
|||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
packet = bytearray(16)
|
||||
packet[0] = 0x09
|
||||
packet[2] = 0xBB
|
||||
packet[3] = command
|
||||
packet[4] = attribute
|
||||
packet[9] = 0xFA
|
||||
packet[10] = 0x44
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
packet[0x00] = 0x09
|
||||
packet[0x02] = 0xBB
|
||||
packet[0x03] = command
|
||||
packet[0x04] = attribute
|
||||
packet[0x09] = 0xFA
|
||||
packet[0x0A] = 0x44
|
||||
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload[4]
|
||||
|
||||
def open(self) -> int:
|
||||
|
@ -62,44 +63,117 @@ class dooya2(Device):
|
|||
|
||||
TYPE = "DT360E-2"
|
||||
|
||||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
checksum = 0xC0C4 + command + attribute & 0xFFFF
|
||||
packet = bytearray(32)
|
||||
packet[0] = 0x16
|
||||
packet[2] = 0xA5
|
||||
packet[3] = 0xA5
|
||||
packet[4] = 0x5A
|
||||
packet[5] = 0x5A
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
packet[8] = 0x02
|
||||
packet[9] = 0x0B
|
||||
packet[10] = 0x0A
|
||||
packet[15] = command
|
||||
packet[16] = attribute
|
||||
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
return payload[0x11]
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def open(self) -> None:
|
||||
"""Open the curtain."""
|
||||
self._send(0x01)
|
||||
self._send(2, [0x00, 0x01, 0x00])
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the curtain."""
|
||||
self._send(0x02)
|
||||
self._send(2, [0x00, 0x02, 0x00])
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the curtain."""
|
||||
self._send(0x03)
|
||||
self._send(2, [0x00, 0x03, 0x00])
|
||||
|
||||
def get_percentage(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
return self._send(0x06)
|
||||
resp = self._send(1, [0x00, 0x06, 0x00])
|
||||
return resp[0x11]
|
||||
|
||||
def set_percentage(self, new_percentage: int) -> None:
|
||||
"""Set the position of the curtain."""
|
||||
self._send(0x09, new_percentage)
|
||||
self._send(2, [0x00, 0x09, new_percentage])
|
||||
|
||||
|
||||
class wser(Device):
|
||||
"""Controls a Wistar curtain motor"""
|
||||
|
||||
TYPE = "WSER"
|
||||
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def get_position(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
resp = self._send(1, [])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def open(self) -> int:
|
||||
"""Open the curtain."""
|
||||
resp = self._send(2, [0x4a, 0x31, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def close(self) -> int:
|
||||
"""Close the curtain."""
|
||||
resp = self._send(2, [0x61, 0x32, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def stop(self) -> int:
|
||||
"""Stop the curtain."""
|
||||
resp = self._send(2, [0x4c, 0x73, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def set_position(self, position: int) -> int:
|
||||
"""Set the position of the curtain."""
|
||||
resp = self._send(2, [position, 0x70, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
|
|
@ -116,6 +116,25 @@ class sp4(Device):
|
|||
"""Controls a Broadlink SP4."""
|
||||
|
||||
TYPE = "SP4"
|
||||
MAX_SUBDEVICES = 8
|
||||
|
||||
def get_subdevices(self) -> list:
|
||||
"""Return the lit of sub devices."""
|
||||
sub_devices = []
|
||||
step = 5
|
||||
|
||||
for index in range(0, self.MAX_SUBDEVICES, step):
|
||||
state = {"count": step, "index": index}
|
||||
packet = self._encode(14, state)
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
resp = self._decode(resp)
|
||||
|
||||
sub_devices.extend(resp["list"])
|
||||
if len(sub_devices) == resp["total"]:
|
||||
break
|
||||
|
||||
return sub_devices
|
||||
|
||||
def set_power(self, pwr: bool) -> None:
|
||||
"""Set the power state of the device."""
|
||||
|
@ -127,6 +146,7 @@ class sp4(Device):
|
|||
|
||||
def set_state(
|
||||
self,
|
||||
did: str = None,
|
||||
pwr: bool = None,
|
||||
ntlight: bool = None,
|
||||
indicator: bool = None,
|
||||
|
@ -136,6 +156,8 @@ class sp4(Device):
|
|||
) -> dict:
|
||||
"""Set state of device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
if pwr is not None:
|
||||
state["pwr"] = int(bool(pwr))
|
||||
if ntlight is not None:
|
||||
|
@ -163,9 +185,13 @@ class sp4(Device):
|
|||
state = self.get_state()
|
||||
return bool(state["ntlight"])
|
||||
|
||||
def get_state(self) -> dict:
|
||||
"""Get full state of device."""
|
||||
packet = self._encode(1, {})
|
||||
def get_state(self, did: str = None) -> dict:
|
||||
"""Return the state of the device."""
|
||||
state = {}
|
||||
if did is not None:
|
||||
state["did"] = did
|
||||
|
||||
packet = self._encode(1, state)
|
||||
response = self.send_packet(0x6A, packet)
|
||||
return self._decode(response)
|
||||
|
||||
|
|
Loading…
Reference in New Issue