Compare commits
3 Commits
d7b6ec3481
...
b420c5d0ba
Author | SHA1 | Date |
---|---|---|
Bengt Martensson | b420c5d0ba | |
Felipe Martins Diel | 84af992dcc | |
Bengt Martensson | 61e6419e79 |
|
@ -7,7 +7,7 @@ from . import exceptions as e
|
|||
from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT
|
||||
from .alarm import S1C
|
||||
from .climate import hysen
|
||||
from .cover import dooya, dooya2
|
||||
from .cover import dooya, dooya2, wser
|
||||
from .device import Device, ping, scan
|
||||
from .hub import s3
|
||||
from .light import lb1, lb2
|
||||
|
@ -183,6 +183,9 @@ SUPPORTED_TYPES = {
|
|||
dooya2: {
|
||||
0x4F6E: ("DT360E-45/20", "Dooya"),
|
||||
},
|
||||
wser: {
|
||||
0x4F6C: ("WSER", "Wistar"),
|
||||
},
|
||||
bg1: {
|
||||
0x51E3: ("BG800/BG900", "BG Electrical"),
|
||||
},
|
||||
|
|
|
@ -13,15 +13,16 @@ class dooya(Device):
|
|||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
packet = bytearray(16)
|
||||
packet[0] = 0x09
|
||||
packet[2] = 0xBB
|
||||
packet[3] = command
|
||||
packet[4] = attribute
|
||||
packet[9] = 0xFA
|
||||
packet[10] = 0x44
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
packet[0x00] = 0x09
|
||||
packet[0x02] = 0xBB
|
||||
packet[0x03] = command
|
||||
packet[0x04] = attribute
|
||||
packet[0x09] = 0xFA
|
||||
packet[0x0A] = 0x44
|
||||
|
||||
resp = self.send_packet(0x6A, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload[4]
|
||||
|
||||
def open(self) -> int:
|
||||
|
@ -62,44 +63,117 @@ class dooya2(Device):
|
|||
|
||||
TYPE = "DT360E-2"
|
||||
|
||||
def _send(self, command: int, attribute: int = 0) -> int:
|
||||
"""Send a packet to the device."""
|
||||
checksum = 0xC0C4 + command + attribute & 0xFFFF
|
||||
packet = bytearray(32)
|
||||
packet[0] = 0x16
|
||||
packet[2] = 0xA5
|
||||
packet[3] = 0xA5
|
||||
packet[4] = 0x5A
|
||||
packet[5] = 0x5A
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
packet[8] = 0x02
|
||||
packet[9] = 0x0B
|
||||
packet[10] = 0x0A
|
||||
packet[15] = command
|
||||
packet[16] = attribute
|
||||
|
||||
response = self.send_packet(0x6A, packet)
|
||||
e.check_error(response[0x22:0x24])
|
||||
payload = self.decrypt(response[0x38:])
|
||||
return payload[0x11]
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def open(self) -> None:
|
||||
"""Open the curtain."""
|
||||
self._send(0x01)
|
||||
self._send(2, [0x00, 0x01, 0x00])
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the curtain."""
|
||||
self._send(0x02)
|
||||
self._send(2, [0x00, 0x02, 0x00])
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the curtain."""
|
||||
self._send(0x03)
|
||||
self._send(2, [0x00, 0x03, 0x00])
|
||||
|
||||
def get_percentage(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
return self._send(0x06)
|
||||
resp = self._send(1, [0x00, 0x06, 0x00])
|
||||
return resp[0x11]
|
||||
|
||||
def set_percentage(self, new_percentage: int) -> None:
|
||||
"""Set the position of the curtain."""
|
||||
self._send(0x09, new_percentage)
|
||||
self._send(2, [0x00, 0x09, new_percentage])
|
||||
|
||||
|
||||
class wser(Device):
|
||||
"""Controls a Wistar curtain motor"""
|
||||
|
||||
TYPE = "WSER"
|
||||
|
||||
def _send(self, operation: int, data: bytes):
|
||||
"""Send a command to the device."""
|
||||
packet = bytearray(14)
|
||||
packet[0x02] = 0xA5
|
||||
packet[0x03] = 0xA5
|
||||
packet[0x04] = 0x5A
|
||||
packet[0x05] = 0x5A
|
||||
packet[0x08] = operation
|
||||
packet[0x09] = 0x0B
|
||||
|
||||
data_len = len(data)
|
||||
packet[0x0A] = data_len & 0xFF
|
||||
packet[0x0B] = data_len >> 8
|
||||
|
||||
packet += bytes(data)
|
||||
|
||||
checksum = sum(packet, 0xBEAF) & 0xFFFF
|
||||
packet[6] = checksum & 0xFF
|
||||
packet[7] = checksum >> 8
|
||||
|
||||
packet_len = len(packet) - 2
|
||||
packet[0] = packet_len & 0xFF
|
||||
packet[1] = packet_len >> 8
|
||||
|
||||
resp = self.send_packet(0x6a, packet)
|
||||
e.check_error(resp[0x22:0x24])
|
||||
payload = self.decrypt(resp[0x38:])
|
||||
return payload
|
||||
|
||||
def get_position(self) -> int:
|
||||
"""Return the position of the curtain."""
|
||||
resp = self._send(1, [])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def open(self) -> int:
|
||||
"""Open the curtain."""
|
||||
resp = self._send(2, [0x4a, 0x31, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def close(self) -> int:
|
||||
"""Close the curtain."""
|
||||
resp = self._send(2, [0x61, 0x32, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def stop(self) -> int:
|
||||
"""Stop the curtain."""
|
||||
resp = self._send(2, [0x4c, 0x73, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
||||
def set_position(self, position: int) -> int:
|
||||
"""Set the position of the curtain."""
|
||||
resp = self._send(2, [position, 0x70, 0xa0])
|
||||
position = resp[0x0E]
|
||||
return position
|
||||
|
|
|
@ -62,7 +62,12 @@ broadlink_discovery
|
|||
|
||||
#### Learn IR code and show at console
|
||||
```
|
||||
broadlink_cli --device @BEDROOM.device --learn
|
||||
broadlink_cli --device @BEDROOM.device --learn
|
||||
```
|
||||
|
||||
#### Learn IR code and show at console as durations in micro seconds
|
||||
```
|
||||
broadlink_cli --device @BEDROOM.device --learn --durations
|
||||
```
|
||||
|
||||
#### Learn RF code and show at console
|
||||
|
@ -85,6 +90,11 @@ broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power
|
|||
broadlink_cli --device @BEDROOM.device --send DATA
|
||||
```
|
||||
|
||||
#### Send code from durations in micro seconds
|
||||
```
|
||||
broadlink_cli --device @BEDROOM.device --send --durations DURATIONS
|
||||
```
|
||||
|
||||
#### Send code from file
|
||||
```
|
||||
broadlink_cli --device @BEDROOM.device --send @LG-TV.power
|
||||
|
|
|
@ -38,14 +38,24 @@ def durations_to_broadlink(durations):
|
|||
result = bytearray()
|
||||
result.append(IR_TOKEN)
|
||||
result.append(0)
|
||||
result.append(len(durations) % 256)
|
||||
result.append(len(durations) / 256)
|
||||
for dur in durations:
|
||||
result.append(0) # overwrite later
|
||||
result.append(0) # overwrite later
|
||||
|
||||
# Discard the final duration (a gap), ...
|
||||
for dur in durations[:-1]:
|
||||
num = int(round(dur / TICK))
|
||||
if num > 255:
|
||||
result.append(0)
|
||||
result.append(num / 256)
|
||||
result.append(num // 256)
|
||||
result.append(num % 256)
|
||||
|
||||
# ... and replace it by the Broadlink IR ending token.
|
||||
result.append(0x00)
|
||||
result.append(0x0D)
|
||||
result.append(0x05)
|
||||
length = len(result)
|
||||
result[2] = length % 256
|
||||
result[3] = length // 256
|
||||
return result
|
||||
|
||||
|
||||
|
@ -58,9 +68,9 @@ def format_durations(data):
|
|||
return result
|
||||
|
||||
|
||||
def parse_durations(str):
|
||||
def parse_durations(data):
|
||||
result = []
|
||||
for s in str.split():
|
||||
for s in data:
|
||||
result.append(abs(int(s)))
|
||||
return result
|
||||
|
||||
|
@ -87,8 +97,8 @@ parser.add_argument("--rflearn", action="store_true", help="rf scan learning")
|
|||
parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning")
|
||||
parser.add_argument("--learnfile", help="save learned command to a specified file")
|
||||
parser.add_argument("--durations", action="store_true",
|
||||
help="use durations in micro seconds instead of the Broadlink format")
|
||||
parser.add_argument("--convert", action="store_true", help="convert input data to durations")
|
||||
help="use durations in micro seconds instead of the Broadlink format; for sending and learning. @-form not supported.")
|
||||
parser.add_argument("--convert", action="store_true", help="convert input data to durations and print")
|
||||
parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with")
|
||||
parser.add_argument("data", nargs='*', help="Data to send or convert")
|
||||
args = parser.parse_args()
|
||||
|
@ -125,7 +135,7 @@ if args.sensors:
|
|||
for key in data:
|
||||
print("{} {}".format(key, data[key]))
|
||||
if args.send:
|
||||
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
|
||||
data = durations_to_broadlink(parse_durations(args.data)) \
|
||||
if args.durations else bytearray.fromhex(''.join(args.data))
|
||||
dev.send_data(data)
|
||||
if args.learn or (args.learnfile and not args.rflearn):
|
||||
|
@ -149,8 +159,9 @@ if args.learn or (args.learnfile and not args.rflearn):
|
|||
else ''.join(format(x, '02x') for x in bytearray(data))
|
||||
if args.learn:
|
||||
print(learned)
|
||||
decode_hex = codecs.getdecoder("hex_codec")
|
||||
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
|
||||
if not args.durations:
|
||||
decode_hex = codecs.getdecoder("hex_codec")
|
||||
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
|
||||
if args.learnfile:
|
||||
print("Saving to {}".format(args.learnfile))
|
||||
with open(args.learnfile, "w") as text_file:
|
||||
|
|
Loading…
Reference in New Issue