1
0
Fork 0

Compare commits

...

3 Commits

Author SHA1 Message Date
Bengt Martensson b420c5d0ba
Merge 61e6419e79 into 84af992dcc 2024-04-10 17:14:41 -03:00
Felipe Martins Diel 84af992dcc
Add support for Wistar smart curtain (0x4F6C) (#786)
* Add support for Wistar smart curtain (0x4F6C)

* Rename wsrc to wser
2024-04-10 16:35:25 -03:00
Bengt Martensson 61e6419e79
Fix some problems with --durations in the current version.
Resolves #740.
2023-01-13 16:07:22 +01:00
4 changed files with 143 additions and 45 deletions

View File

@ -7,7 +7,7 @@ from . import exceptions as e
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
from .alarm import S1C
from .climate import hysen
from .cover import dooya, dooya2
from .cover import dooya, dooya2, wser
from .device import Device, ping, scan
from .hub import s3
from .light import lb1, lb2
@ -183,6 +183,9 @@ SUPPORTED_TYPES = {
dooya2: {
0x4F6E: ("DT360E-45/20", "Dooya"),
},
wser: {
0x4F6C: ("WSER", "Wistar"),
},
bg1: {
0x51E3: ("BG800/BG900", "BG Electrical"),
},

View File

@ -13,15 +13,16 @@ class dooya(Device):
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
packet = bytearray(16)
packet[0] = 0x09
packet[2] = 0xBB
packet[3] = command
packet[4] = attribute
packet[9] = 0xFA
packet[10] = 0x44
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
packet[0x00] = 0x09
packet[0x02] = 0xBB
packet[0x03] = command
packet[0x04] = attribute
packet[0x09] = 0xFA
packet[0x0A] = 0x44
resp = self.send_packet(0x6A, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload[4]
def open(self) -> int:
@ -62,44 +63,117 @@ class dooya2(Device):
TYPE = "DT360E-2"
def _send(self, command: int, attribute: int = 0) -> int:
"""Send a packet to the device."""
checksum = 0xC0C4 + command + attribute & 0xFFFF
packet = bytearray(32)
packet[0] = 0x16
packet[2] = 0xA5
packet[3] = 0xA5
packet[4] = 0x5A
packet[5] = 0x5A
def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet[8] = 0x02
packet[9] = 0x0B
packet[10] = 0x0A
packet[15] = command
packet[16] = attribute
response = self.send_packet(0x6A, packet)
e.check_error(response[0x22:0x24])
payload = self.decrypt(response[0x38:])
return payload[0x11]
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def open(self) -> None:
"""Open the curtain."""
self._send(0x01)
self._send(2, [0x00, 0x01, 0x00])
def close(self) -> None:
"""Close the curtain."""
self._send(0x02)
self._send(2, [0x00, 0x02, 0x00])
def stop(self) -> None:
"""Stop the curtain."""
self._send(0x03)
self._send(2, [0x00, 0x03, 0x00])
def get_percentage(self) -> int:
"""Return the position of the curtain."""
return self._send(0x06)
resp = self._send(1, [0x00, 0x06, 0x00])
return resp[0x11]
def set_percentage(self, new_percentage: int) -> None:
"""Set the position of the curtain."""
self._send(0x09, new_percentage)
self._send(2, [0x00, 0x09, new_percentage])
class wser(Device):
"""Controls a Wistar curtain motor"""
TYPE = "WSER"
def _send(self, operation: int, data: bytes):
"""Send a command to the device."""
packet = bytearray(14)
packet[0x02] = 0xA5
packet[0x03] = 0xA5
packet[0x04] = 0x5A
packet[0x05] = 0x5A
packet[0x08] = operation
packet[0x09] = 0x0B
data_len = len(data)
packet[0x0A] = data_len & 0xFF
packet[0x0B] = data_len >> 8
packet += bytes(data)
checksum = sum(packet, 0xBEAF) & 0xFFFF
packet[6] = checksum & 0xFF
packet[7] = checksum >> 8
packet_len = len(packet) - 2
packet[0] = packet_len & 0xFF
packet[1] = packet_len >> 8
resp = self.send_packet(0x6a, packet)
e.check_error(resp[0x22:0x24])
payload = self.decrypt(resp[0x38:])
return payload
def get_position(self) -> int:
"""Return the position of the curtain."""
resp = self._send(1, [])
position = resp[0x0E]
return position
def open(self) -> int:
"""Open the curtain."""
resp = self._send(2, [0x4a, 0x31, 0xa0])
position = resp[0x0E]
return position
def close(self) -> int:
"""Close the curtain."""
resp = self._send(2, [0x61, 0x32, 0xa0])
position = resp[0x0E]
return position
def stop(self) -> int:
"""Stop the curtain."""
resp = self._send(2, [0x4c, 0x73, 0xa0])
position = resp[0x0E]
return position
def set_position(self, position: int) -> int:
"""Set the position of the curtain."""
resp = self._send(2, [position, 0x70, 0xa0])
position = resp[0x0E]
return position

View File

@ -62,7 +62,12 @@ broadlink_discovery
#### Learn IR code and show at console
```
broadlink_cli --device @BEDROOM.device --learn
broadlink_cli --device @BEDROOM.device --learn
```
#### Learn IR code and show at console as durations in micro seconds
```
broadlink_cli --device @BEDROOM.device --learn --durations
```
#### Learn RF code and show at console
@ -85,6 +90,11 @@ broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power
broadlink_cli --device @BEDROOM.device --send DATA
```
#### Send code from durations in micro seconds
```
broadlink_cli --device @BEDROOM.device --send --durations DURATIONS
```
#### Send code from file
```
broadlink_cli --device @BEDROOM.device --send @LG-TV.power

View File

@ -38,14 +38,24 @@ def durations_to_broadlink(durations):
result = bytearray()
result.append(IR_TOKEN)
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
result.append(0) # overwrite later
result.append(0) # overwrite later
# Discard the final duration (a gap), ...
for dur in durations[:-1]:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num // 256)
result.append(num % 256)
# ... and replace it by the Broadlink IR ending token.
result.append(0x00)
result.append(0x0D)
result.append(0x05)
length = len(result)
result[2] = length % 256
result[3] = length // 256
return result
@ -58,9 +68,9 @@ def format_durations(data):
return result
def parse_durations(str):
def parse_durations(data):
result = []
for s in str.split():
for s in data:
result.append(abs(int(s)))
return result
@ -87,8 +97,8 @@ parser.add_argument("--rflearn", action="store_true", help="rf scan learning")
parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning")
parser.add_argument("--learnfile", help="save learned command to a specified file")
parser.add_argument("--durations", action="store_true",
help="use durations in micro seconds instead of the Broadlink format")
parser.add_argument("--convert", action="store_true", help="convert input data to durations")
help="use durations in micro seconds instead of the Broadlink format; for sending and learning. @-form not supported.")
parser.add_argument("--convert", action="store_true", help="convert input data to durations and print")
parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with")
parser.add_argument("data", nargs='*', help="Data to send or convert")
args = parser.parse_args()
@ -125,7 +135,7 @@ if args.sensors:
for key in data:
print("{} {}".format(key, data[key]))
if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
data = durations_to_broadlink(parse_durations(args.data)) \
if args.durations else bytearray.fromhex(''.join(args.data))
dev.send_data(data)
if args.learn or (args.learnfile and not args.rflearn):
@ -149,8 +159,9 @@ if args.learn or (args.learnfile and not args.rflearn):
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
if not args.durations:
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file: