1
0
Fork 0

Compare commits

...

3 Commits

Author SHA1 Message Date
Felipe Martins Diel d9f822a1f7
Merge ab4311e799 into 84af992dcc 2024-04-10 21:27:12 +01:00
Felipe Martins Diel 84af992dcc
Add support for Wistar smart curtain (0x4F6C) (#786)
* Add support for Wistar smart curtain (0x4F6C)

* Rename wsrc to wser
2024-04-10 16:35:25 -03:00
Felipe Martins Diel ab4311e799 Add support for the FastCon technology 2022-03-19 19:53:29 -03:00
3 changed files with 139 additions and 36 deletions

View File

@ -7,7 +7,7 @@ from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .cover import dooya, dooya2
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
@ -183,6 +183,9 @@ SUPPORTED_TYPES = {
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"),
},

View File

@ -13,15 +13,16 @@ class dooya(Device):
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
packet[2] = 0xBB
packet[3] = command
packet[4] = attribute
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
packet[0x00] = 0x09
packet[0x02] = 0xBB
packet[0x03] = command
packet[0x04] = attribute
packet[0x09] = 0xFA
packet[0x0A] = 0x44
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4]
def open(self) -> int:
@ -62,44 +63,117 @@ class dooya2(Device):
TYPE = "DT360E-2"
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
checksum = 0xC0C4 + command + attribute & 0xFFFF
packet = bytearray(32)
packet[0] = 0x16
packet[2] = 0xA5
packet[3] = 0xA5
packet[4] = 0x5A
packet[5] = 0x5A
def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[8] = 0x02
packet[9] = 0x0B
packet[10] = 0x0A
packet[15] = command
packet[16] = attribute
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x11]
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def open(self) -> None:
"""Open the curtain."""
self._send(0x01)
self._send(2, [0x00, 0x01, 0x00])
def close(self) -> None:
"""Close the curtain."""
self._send(0x02)
self._send(2, [0x00, 0x02, 0x00])
def stop(self) -> None:
"""Stop the curtain."""
self._send(0x03)
self._send(2, [0x00, 0x03, 0x00])
def get_percentage(self) -> int:
"""Return the position of the curtain."""
return self._send(0x06)
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]
def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(0x09, new_percentage)
self._send(2, [0x00, 0x09, new_percentage])
class wser(Device):
"""Controls a Wistar curtain motor"""
TYPE = "WSER"
def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
position = resp[0x0E]
return position

View File

@ -116,6 +116,25 @@ class sp4(Device):
"""Controls a Broadlink SP4."""
TYPE = "SP4"
MAX_SUBDEVICES = 8
def get_subdevices(self) -> list:
"""Return the lit of sub devices."""
sub_devices = []
step = 5
for index in range(0, self.MAX_SUBDEVICES, step):
state = {"count": step, "index": index}
packet = self._encode(14, state)
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
resp = self._decode(resp)
sub_devices.extend(resp["list"])
if len(sub_devices) == resp["total"]:
break
return sub_devices
def set_power(self, pwr: bool) -> None:
"""Set the power state of the device."""
@ -127,6 +146,7 @@ class sp4(Device):
def set_state(
self,
did: str = None,
pwr: bool = None,
ntlight: bool = None,
indicator: bool = None,
@ -136,6 +156,8 @@ class sp4(Device):
) -> dict:
"""Set state of device."""
state = {}
if did is not None:
state["did"] = did
if pwr is not None:
state["pwr"] = int(bool(pwr))
if ntlight is not None:
@ -163,9 +185,13 @@ class sp4(Device):
state = self.get_state()
return bool(state["ntlight"])
def get_state(self) -> dict:
"""Get full state of device."""
packet = self._encode(1, {})
def get_state(self, did: str = None) -> dict:
"""Return the state of the device."""
state = {}
if did is not None:
state["did"] = did
packet = self._encode(1, state)
response = self.send_packet(0x6A, packet)
return self._decode(response)