1
0
Fork 0

Initial commit

This commit is contained in:
Matthew Garrett 2016-09-15 08:06:26 -07:00
commit ecab016a73
4 changed files with 262 additions and 0 deletions

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2014 Mike Ryan
Copyright (c) 2016 Matthew Garrett
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

36
README.md Normal file
View File

@ -0,0 +1,36 @@
Python control for Broadlink RM2 IR controllers
===============================================
A simple Python API for controlling IR controllers from [Broadlink](http://www.ibroadlink.com/rm/). At present, only RM Pro (referred to as RM2 in the codebase) devices are supported and only one device per network will be used. There is currently no support for the cloud API.
Example use
-----------
Discover an available device on the local network:
```
import broadlink
device = broadlink.rm2()
device.discover()
```
Obtain the authentication key required for further communication:
```
device.auth()
```
Enter learning mode:
```
device.enter_learning()
```
Obtain an IR packet while in learning mode:
```
ir_packet = device.check_data()
```
(This will return None if the device does not have a packet to return)
Send an IR packet:
```
device.send_data(ir_packet)
```

172
broadlink/__init__.py Executable file
View File

@ -0,0 +1,172 @@
#!/usr/bin/python
from datetime import datetime
from socket import *
from Crypto.Cipher import AES
import time
import random
class rm2:
def __init__(self):
self.count = random.randrange(0xffff)
self.key = bytearray([0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02])
self.iv = bytearray([0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58])
s = socket(AF_INET, SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # connecting to a UDP address doesn't send packets
local_ip_address = s.getsockname()[0]
self.address = local_ip_address.split('.')
self.id = bytearray([0, 0, 0, 0])
def discover(self):
self.cs = socket(AF_INET, SOCK_DGRAM)
self.cs.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.cs.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
self.cs.bind(('',0))
self.port = self.cs.getsockname()[1]
packet = bytearray(0x30)
year = datetime.now().year
packet[0x08] = 0xf9
packet[0x09] = 0xff
packet[0x0a] = 0xff
packet[0x0b] = 0xff
packet[0x0c] = year & 0xff
packet[0x0d] = year >> 8
packet[0x0e] = datetime.now().minute
packet[0x0f] = datetime.now().hour
subyear = str(year)[2:]
packet[0x10] = int(subyear)
packet[0x11] = datetime.now().isoweekday()
packet[0x12] = datetime.now().day
packet[0x13] = datetime.now().month
packet[0x18] = int(self.address[0])
packet[0x19] = int(self.address[1])
packet[0x1a] = int(self.address[2])
packet[0x1b] = int(self.address[3])
packet[0x1c] = self.port & 0xff
packet[0x1d] = self.port >> 8
packet[0x26] = 6
checksum = 0xbeaf
for i in range(len(packet)):
checksum += packet[i]
checksum = checksum & 0xffff
packet[0x20] = checksum & 0xff
packet[0x21] = checksum >> 8
self.cs.sendto(packet, ('255.255.255.255', 80))
response = self.cs.recvfrom(1024)
responsepacket = bytearray(response[0])
self.host = response[1]
self.mac = responsepacket[0x3a:0x40]
def auth(self):
payload = bytearray(0x50)
payload[0x04] = 0x31
payload[0x05] = 0x31
payload[0x06] = 0x31
payload[0x07] = 0x31
payload[0x08] = 0x31
payload[0x09] = 0x31
payload[0x0a] = 0x31
payload[0x0b] = 0x31
payload[0x0c] = 0x31
payload[0x0d] = 0x31
payload[0x0e] = 0x31
payload[0x0f] = 0x31
payload[0x10] = 0x31
payload[0x11] = 0x31
payload[0x12] = 0x31
payload[0x1e] = 0x01
payload[0x2d] = 0x01
payload[0x30] = 'T'
payload[0x31] = 'e'
payload[0x32] = 's'
payload[0x33] = 't'
payload[0x34] = ' '
payload[0x35] = ' '
payload[0x36] = '1'
response = self.send_packet(0x65, payload)
enc_payload = response[0x38:]
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
payload = aes.decrypt(str(response[0x38:]))
self.id = payload[0x00:0x04]
self.key = payload[0x04:0x14]
def send_packet(self, command, payload):
packet = bytearray(0x38)
packet[0x00] = 0x5a
packet[0x01] = 0xa5
packet[0x02] = 0xaa
packet[0x03] = 0x55
packet[0x04] = 0x5a
packet[0x05] = 0xa5
packet[0x06] = 0xaa
packet[0x07] = 0x55
packet[0x24] = 0x2a
packet[0x25] = 0x27
packet[0x26] = command
packet[0x28] = self.count & 0xff
packet[0x29] = self.count >> 8
packet[0x2a] = self.mac[0]
packet[0x2b] = self.mac[1]
packet[0x2c] = self.mac[2]
packet[0x2d] = self.mac[3]
packet[0x2e] = self.mac[4]
packet[0x2f] = self.mac[5]
packet[0x30] = self.id[0]
packet[0x31] = self.id[1]
packet[0x32] = self.id[2]
packet[0x33] = self.id[3]
checksum = 0xbeaf
for i in range(len(payload)):
checksum += payload[i]
checksum = checksum & 0xffff
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
payload = aes.encrypt(str(payload))
packet[0x34] = checksum & 0xff
packet[0x35] = checksum >> 8
for i in range(len(payload)):
packet.append(payload[i])
checksum = 0xbeaf
for i in range(len(packet)):
checksum += packet[i]
checksum = checksum & 0xffff
packet[0x20] = checksum & 0xff
packet[0x21] = checksum >> 8
self.cs.sendto(packet, self.host)
response = self.cs.recvfrom(1024)
return response[0]
def send_data(self, data):
packet = bytearray([0x02, 0x00, 0x00, 0x00])
packet += data
self.send_packet(0x6a, packet)
def enter_learning(self):
packet = bytearray(16)
packet[0] = 3
self.send_packet(0x6a, packet)
def check_data(self):
packet = bytearray(16)
packet[0] = 4
response = self.send_packet(0x6a, packet)
err = ord(response[0x22]) | (ord(response[0x23]) << 8)
if err == 0:
aes = AES.new(str(self.key), AES.MODE_CBC, str(self.iv))
payload = aes.decrypt(str(response[0x38:]))
return payload[0x04:]

32
setup.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
from setuptools import setup, find_packages
import sys
import warnings
dynamic_requires = []
version = 0.1
setup(
name='broadlink',
version=0.1,
author='Matthew Garrett',
author_email='mjg59@srcf.ucam.org',
url='http://github.com/mjg59/python-broadlink',
packages=find_packages(),
scripts=[],
license=open('LICENSE').read(),
description='Python API for controlling Broadlink IR controllers',
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
include_package_data=True,
zip_safe=False,
)