Add support for Wistar smart curtain (0x4F6C) (#786)
* Add support for Wistar smart curtain (0x4F6C) * Rename wsrc to wser
This commit is contained in:
parent
4766d68289
commit
84af992dcc
|
@ -7,7 +7,7 @@ from . import exceptions as e
|
|||
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
|
||||
from .alarm import S1C
|
||||
from .climate import hysen
|
||||
from .cover import dooya, dooya2
|
||||
from .cover import dooya, dooya2, wser
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
from .light import lb1, lb2
|
||||
|
@ -183,6 +183,9 @@ SUPPORTED_TYPES = {
|
|||
dooya2: {
|
||||
0x4F6E: ("DT360E-45/20", "Dooya"),
|
||||
},
|
||||
wser: {
|
||||
0x4F6C: ("WSER", "Wistar"),
|
||||
},
|
||||
bg1: {
|
||||
0x51E3: ("BG800/BG900", "BG Electrical"),
|
||||
},
|
||||
|
|
|
@ -13,15 +13,16 @@ class dooya(Device):
|
|||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
packet = bytearray(16)
|
||||
packet[0] = 0x09
|
||||
packet[2] = 0xBB
|
||||
packet[3] = command
|
||||
packet[4] = attribute
|
||||
packet[9] = 0xFA
|
||||
packet[10] = 0x44
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
packet[0x00] = 0x09
|
||||
packet[0x02] = 0xBB
|
||||
packet[0x03] = command
|
||||
packet[0x04] = attribute
|
||||
packet[0x09] = 0xFA
|
||||
packet[0x0A] = 0x44
|
||||
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload[4]
|
||||
|
||||
def open(self) -> int:
|
||||
|
@ -62,44 +63,117 @@ class dooya2(Device):
|
|||
|
||||
TYPE = "DT360E-2"
|
||||
|
||||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
checksum = 0xC0C4 + command + attribute & 0xFFFF
|
||||
packet = bytearray(32)
|
||||
packet[0] = 0x16
|
||||
packet[2] = 0xA5
|
||||
packet[3] = 0xA5
|
||||
packet[4] = 0x5A
|
||||
packet[5] = 0x5A
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
packet[8] = 0x02
|
||||
packet[9] = 0x0B
|
||||
packet[10] = 0x0A
|
||||
packet[15] = command
|
||||
packet[16] = attribute
|
||||
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
return payload[0x11]
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def open(self) -> None:
|
||||
"""Open the curtain."""
|
||||
self._send(0x01)
|
||||
self._send(2, [0x00, 0x01, 0x00])
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the curtain."""
|
||||
self._send(0x02)
|
||||
self._send(2, [0x00, 0x02, 0x00])
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the curtain."""
|
||||
self._send(0x03)
|
||||
self._send(2, [0x00, 0x03, 0x00])
|
||||
|
||||
def get_percentage(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
return self._send(0x06)
|
||||
resp = self._send(1, [0x00, 0x06, 0x00])
|
||||
return resp[0x11]
|
||||
|
||||
def set_percentage(self, new_percentage: int) -> None:
|
||||
"""Set the position of the curtain."""
|
||||
self._send(0x09, new_percentage)
|
||||
self._send(2, [0x00, 0x09, new_percentage])
|
||||
|
||||
|
||||
class wser(Device):
|
||||
"""Controls a Wistar curtain motor"""
|
||||
|
||||
TYPE = "WSER"
|
||||
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def get_position(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
resp = self._send(1, [])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def open(self) -> int:
|
||||
"""Open the curtain."""
|
||||
resp = self._send(2, [0x4a, 0x31, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def close(self) -> int:
|
||||
"""Close the curtain."""
|
||||
resp = self._send(2, [0x61, 0x32, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def stop(self) -> int:
|
||||
"""Stop the curtain."""
|
||||
resp = self._send(2, [0x4c, 0x73, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def set_position(self, position: int) -> int:
|
||||
"""Set the position of the curtain."""
|
||||
resp = self._send(2, [position, 0x70, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
|
Loading…
Reference in New Issue