This commit is contained in:
Manuel Kamper 2023-01-07 17:56:55 +00:00
parent 578865daf6
commit 9d26fe5abe
3 changed files with 551 additions and 553 deletions

View File

@ -1,46 +1,44 @@
import machine
import uos as os
import TimeUtils
class Logger():
def __init__(self, hk_days):
self.hk_days = hk_days
self.TimeUtils = TimeUtils.TimeUtils()
if not "logs" in os.listdir():
os.mkdir("/logs")
def LogMessage(self, message):
print(message)
dt = machine.RTC().datetime()
file = open(("/logs/%04d-%02d-%02d.txt" % (dt[0], dt[1], dt[2])), "a")
file.write(self.TimeUtils.DateTimeNow() + ";" + message + "\n")
file.close()
def Housekeeping(self):
for file in os.listdir("/logs"):
if (file.endswith(".txt")):
fd = file.split('.')[0].split('-')
if (self.TimeUtils.IsOlderThanDays(fd, self.hk_days)):
os.remove("/logs/" + file)
self.LogMessage("Housekeeping: deleted logfile " + file)
def LastLogs(self, lines):
dt = machine.RTC().datetime()
logfilename = "/logs/%04d-%02d-%02d.txt" % (dt[0], dt[1], dt[2])
bufsize = 4096
fsize = os.stat(logfilename).st_size
iter = 0
logfile = open(logfilename, "r")
#lines = file.readlines()
if bufsize > fsize:
bufsize = fsize - 1
fetched_lines = []
while True:
iter += 1
logfile.seek(fsize-bufsize * iter)
fetched_lines.extend(logfile.readlines())
if len(fetched_lines) >= lines or logfile.tell() == 0:
#print(''.join(fetched_lines[-N:]))
break
file.close()
return "<br>".join(fetched_lines[-lines:])
import machine
import os
import TimeUtils
class Logger():
def __init__(self, hk_days):
self.hk_days = hk_days
self.TimeUtils = TimeUtils.TimeUtils()
if not "logs" in os.listdir():
os.mkdir("/logs")
def LogMessage(self, message):
print(message)
dt = machine.RTC().datetime()
file = open(("/logs/%04d-%02d-%02d.txt" % (dt[0], dt[1], dt[2])), "a")
file.write(self.TimeUtils.DateTimeNow() + ";" + message + "\n")
file.close()
def Housekeeping(self):
for file in os.listdir("/logs"):
if (file.endswith(".txt")):
fd = file.split('.')[0].split('-')
if (self.TimeUtils.IsOlderThanDays(fd, self.hk_days)):
os.remove("/logs/" + file)
self.LogMessage("Housekeeping: deleted logfile " + file)
def LastLogs(self, lines):
dt = machine.RTC().datetime()
logfilename = ("/logs/%04d-%02d-%02d.txt" % (dt[0], dt[1], dt[2]))
bufsize = 4096
print(os.stat(logfilename)[6])
fsize = os.stat(logfilename)[6]
iter = 0
with open(logfilename, "r") as logfile:
fetched_lines = []
if bufsize > fsize:
bufsize = fsize - 1
while True:
iter += 1
logfile.seek(fsize - bufsize * iter)
fetched_lines.extend(logfile.readlines())
if len(fetched_lines) >= lines or logfile.tell() == 0:
#todo if lines from todays logfile are less than the number of requested lines, read missing lines from yesterdays logfile
return "<br>".join(fetched_lines[-lines:])

View File

@ -1,80 +1,80 @@
import network
import ubinascii
import Logger
import machine
import time
wlan = network.WLAN(network.STA_IF)
mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode()
led = machine.Pin('LED', machine.Pin.OUT)
class Networking():
def __init__(self, Logger, ssid, pw):
self.ssid = ssid
self.pw = pw
self.Logger = Logger
def Connect(self, disable_wifi_powersavingmode):
wlan.active(True)
#wlan.config(hostname=configs['hostname'])
if(disable_wifi_powersavingmode):
wlan.config(pm = 0xa11140)
self.Logger.LogMessage("mac = " + str(mac))
self.Logger.LogMessage("channel = " + str(wlan.config('channel')))
self.Logger.LogMessage("essid = " + str(wlan.config('essid')))
self.Logger.LogMessage("txpower = " + str(wlan.config('txpower')))
wlan.connect(self.ssid, self.pw)
timeout = 30
while timeout > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
timeout -= 1
self.Logger.LogMessage('Waiting for connection...')
time.sleep(1)
# Wlan Status on LED
# Handle connection error
# Error meanings
# 0 Link Down
# 1 Link Join
# 2 Link NoIp
# 3 Link Up
# -1 Link Fail
# -2 Link NoNet
# -3 Link BadAuth
def Status(self):
global wlan
wlan_status = wlan.status()
self.BlinkOnboardLED(wlan_status)
if wlan_status != 3:
#raise RuntimeError('Wi-Fi connection failed')
return False
else:
self.Logger.LogMessage('Connected')
status = wlan.ifconfig()
self.Logger.LogMessage('ip = ' + status[0])
return True
# Blinkfunktion der Onboard-LED für Errorcodes
def BlinkOnboardLED(self, num_blinks):
global led
for i in range(num_blinks):
led.on()
time.sleep(.2)
led.off()
time.sleep(.2)
def GetMACAddress(self):
global mac
return mac
def GetIPAddress(self):
global wlan
return wlan.ifconfig()[0]
def IsWifiConnected(self):
global wlan
if wlan.status() == 3:
return "W"
else:
import network
import ubinascii
import Logger
import machine
import time
wlan = network.WLAN(network.STA_IF)
mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode()
led = machine.Pin('LED', machine.Pin.OUT)
class Networking():
def __init__(self, Logger, ssid, pw):
self.ssid = ssid
self.pw = pw
self.Logger = Logger
def Connect(self, disable_wifi_powersavingmode):
wlan.active(True)
#wlan.config(hostname=configs['hostname'])
if(disable_wifi_powersavingmode):
wlan.config(pm = 0xa11140)
self.Logger.LogMessage("mac = " + str(mac))
self.Logger.LogMessage("channel = " + str(wlan.config('channel')))
self.Logger.LogMessage("essid = " + str(wlan.config('essid')))
self.Logger.LogMessage("txpower = " + str(wlan.config('txpower')))
wlan.connect(self.ssid, self.pw)
timeout = 30
while timeout > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
timeout -= 1
self.Logger.LogMessage('Waiting for connection...')
time.sleep(1)
# Wlan Status on LED
# Handle connection error
# Error meanings
# 0 Link Down
# 1 Link Join
# 2 Link NoIp
# 3 Link Up
# -1 Link Fail
# -2 Link NoNet
# -3 Link BadAuth
def Status(self):
global wlan
wlan_status = wlan.status()
self.BlinkOnboardLED(wlan_status)
if wlan_status != 3:
#raise RuntimeError('Wi-Fi connection failed')
return False
else:
self.Logger.LogMessage('Connected')
status = wlan.ifconfig()
self.Logger.LogMessage('ip = ' + status[0])
return True
# Blinkfunktion der Onboard-LED für Errorcodes
def BlinkOnboardLED(self, num_blinks):
global led
for i in range(num_blinks):
led.on()
time.sleep(.2)
led.off()
time.sleep(.2)
def GetMACAddress(self):
global mac
return mac
def GetIPAddress(self):
global wlan
return wlan.ifconfig()[0]
def IsWifiConnected(self):
global wlan
if wlan.status() == 3:
return "W"
else:
return " "

856
main.py
View File

@ -1,429 +1,429 @@
import rp2
import TimeUtils
import Logger
import Oled
import Networking
import NTP
import uasyncio as asyncio
import utime as time
import Keypad
import usocket as socket
from machine import ADC, Pin
from secrets import secrets
from configs import configs
rp2.country(configs['country'])
version = "0.8-beta"
Oled = Oled.Oled()
TimeUtils = TimeUtils.TimeUtils()
Logger = Logger.Logger(configs['log_housekeeping_days'])
Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
NTP = NTP.NTP(Logger)
Keypad = Keypad.Keypad()
boottime = time.time()
lastActionTicks = 0
subscreen = 0
sc = 0
partyMode = False
displayOff = False
busline = ADC(28)
triggerline = Pin(15, Pin.OUT)
writerActive = False
# Reboots the Pico W (f.e. in case of an error)
def Reboot():
Logger.LogMessage("Performing Reboot")
machine.reset()
# Calculates the uptime in hours and minutes
def Uptime():
global boottime
seconds = (time.time() - boottime) % (24 * 3600)
hours = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%d:%02d" % (hours, minutes)
# Shows text on the display
def ShowText(line1, line2, line3):
Oled.fill(0x0000)
Oled.text(line1,1,2,Oled.white)
Oled.text(line2,1,12,Oled.white)
Oled.text(line3,1,22,Oled.white)
Oled.show()
# Main method for display output
heartbeat = "H"
def BuildScreen():
global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
if heartbeat == "H":
heartbeat = " "
else:
heartbeat = "H"
lastActionTicks += 1
# after 10 Seconds of no activity, go back to main screen
if (lastActionTicks >= 20):
subscreen = 0
# after X Minutes turn off the display
if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
displayOff = True
ShowText("","","")
else:
if (subscreen == 0):
ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
elif (subscreen == 1):
ShowText("Eingangstuer", "getriggert", " Ext")
elif (subscreen == 2):
ShowText("Licht im Gang", "getriggert", " Ext")
elif (subscreen == 3):
ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
elif (subscreen == 4):
ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
elif (subscreen == 5):
ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
elif (subscreen == 6):
ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
elif (subscreen == 7):
ShowText("API key:", secrets['api'], "Up Dwn Ext")
elif (subscreen == 8):
ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
elif (subscreen == 9):
ShowText("Firmware vers.:", version, "Up Dwn Ext")
elif (subscreen == 10):
ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
elif (subscreen == 11):
ShowText("Perform", "reboot now", "Up Dwn OK Ext")
else:
ShowText("Error","Invalid Screen", " Ext")
# Helper-method for the systemcheck-output on the display
def ShowSystemCheck(screen):
global subscreen, sc
if (screen == "start"):
Logger.LogMessage("Showing Systemcheck")
sc = 0
elif (screen == "next"):
sc = sc + 1
else:
sc = sc - 1
if (sc > 7):
sc = 0
elif (sc < 0):
sc = 7
subscreen = sc + 4
# Activates/deactivates the party-mode
def TogglePartyMode():
global subscreen, partyMode
if (partyMode):
partyMode = False
Logger.LogMessage("Party-Mode off")
else:
partyMode = True
Logger.LogMessage("Party-Mode on")
subscreen = 3
# Sets the party-mode active/inactive
def SetPartyMode(newValue):
global partyMode
partyMode = newValue
Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
# Returns status of party-mode
def PartyModeState():
global partyMode
if (partyMode):
return "enabled"
else:
return "disabled"
# Returns status symbol if party-mode is active
def PartyModeActive():
global PartyMode
if (partyMode):
return "P"
else:
return " "
#####################################################################
# Helper-method to allow error handling and output in asyncio
def set_global_exception():
def handle_exception(loop, context):
Logger.LogMessage("Fatal error: " + str(context["exception"]))
import sys
sys.print_exception(context["exception"])
sys.exit()
Reboot()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
# Main method for all the UI-handling
async def UiHandling():
global interrupt_flag, lastActionTicks, displayOff, subscreen
Logger.LogMessage("UI handling started")
while True:
BuildScreen()
if Keypad.interrupt_flag is 1:
Keypad.interrupt_flag = 0
lastActionTicks = 0
if (displayOff):
displayOff = False
else:
Keypad.interrupt_flag = 0
if (subscreen == 0):
if (Keypad.tastePressed == Keypad.taste1):
ShowSystemCheck("start")
elif (Keypad.tastePressed == Keypad.taste2):
TogglePartyMode()
elif (Keypad.tastePressed == Keypad.taste3):
subscreen = 2
Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message'])
elif (Keypad.tastePressed == Keypad.taste4):
subscreen = 1
Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message'])
else:
Logger.LogMessage("Error: Invalid Button pressed!")
elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
Reboot()
elif (Keypad.tastePressed == Keypad.taste3):
ShowSystemCheck("next")
elif (Keypad.tastePressed == Keypad.taste4):
ShowSystemCheck("prev")
else:
Logger.LogMessage("Error: Invalid Button pressed!")
else:
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
await asyncio.sleep(0.5)
# Main method for the API
html = """<!DOCTYPE html>
<html>
<head> <title>TCS<->FHEM</title> </head>
<body> <h1>TCS<->FHEM</h1>
<p>%s</p>
</body>
</html>"""
json = """{ "TCS<->FHEM API":"%s" }"""
async def APIHandling(reader, writer):
request_line = await reader.readline()
while await reader.readline() != b"\r\n":
pass
request = str(request_line)
try:
request = request.split()[1]
except IndexError:
pass
client_ip = writer.get_extra_info('peername')[0]
Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
stateis = "<b>Error 401:</b> Client '" + client_ip + "' is not authorized to use the API!<br><br>Set authorized client IP in configs.py!"
response = html % stateis
writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
else:
req = request.split('/')
stateis = ""
if (len(req) == 3 or len(req) == 4):
if (req[1] == secrets['api']):
if (req[2] == "triggerdoor"):
Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message'])
stateis = "Triggered front door opener"
elif (req[2] == "triggerlight"):
Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message'])
stateis = "Triggered light"
elif (req[2] == "togglepartymode"):
TogglePartyMode()
stateis = "Toggled Party-Mode"
elif (req[2] == "partymodeon"):
SetPartyMode(True)
stateis = "Enabled Party-Mode"
elif (req[2] == "partymodeoff"):
SetPartyMode(False)
stateis = "Disabled Party-Mode"
elif (req[2] == "partymodestate"):
stateis = "Party-Mode is " + PartyModeState()
elif (req[2] == "ping"):
stateis = "OK"
elif (req[2] == "stats"):
stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "<br>Message 'Door ringing': " + str(configs['door_ringing_message']) + "<br>Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "<br>Message 'Light triggered': " + str(configs['light_trigger_message']) + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000)
elif (req[2] == "reboot"):
stateis = "Rebooting device now..."
Reboot()
elif (req[2] == "ringdoor"):
stateis = "Ringing doorbell"
await TCSBusWriter(configs['door_ringing_message'])
elif (req[2] == "ringfrontdoor"):
stateis = "Ringing front doorbell"
await TCSBusWriter(configs['frontdoor_ringing_message'])
elif (req[2] == "logs"):
stateis = Logger.LastLogs()
else:
stateis = "<b>Error:</b> Unknown command!"
else:
stateis = "<b>Error:</b> API key is invalid!"
if (len(req) == 4 and req[3] == "json"):
response = json % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
else:
response = html % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
else:
stateis = "<b>Error 400:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>stats</li><li>reboot</li><li>ringdoor</li><li>ringfrontdoor</li></ul><br><u>API Key:</u> set 'api' in secrets.py file."
response = html % stateis
writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
writer.write(response)
await writer.drain()
await writer.wait_closed()
microsFlanke = 0
def microsSeitLetzterFlanke():
global microsFlanke
return time.ticks_us() - microsFlanke
# Main method for the TCS:Bus reader
async def TCSBusReader():
global busline, microsFlanke, partyMode, writerActive
zustand = False
Logger.LogMessage("TCS Busreader started")
message = []
while True:
if not writerActive:
busValue = busline.read_u16()
val = 1
if (busValue >= 50000): #voltage on TCS:Bus 0...65535
val = 1
else:
val = 0
#measure voltage changes and time in between
dauer = microsSeitLetzterFlanke()
if (dauer > 10000) and (message): #handle recieved message, and reset message
message.pop(0) #remove first timing, because we do not need it
for i in range(len(message)): #encode message
message[i] = int(((round(message[i] / 1000.0) * 1000.0) / 2000) - 1)
if (message == configs['light_trigger_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
#nothing else to do
elif (message == configs['door_trigger_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
#nothing else to do
elif (message == configs['door_ringing_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
print ("türklingel")
#todo trigger external api
elif (message == configs['frontdoor_ringing_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
if (partyMode):
asyncio.sleep(1)
Logger.LogMessage("Triggering Door and Light for Party-Mode")
await TCSBusWriter(configs['door_trigger_message'])
asyncio.sleep(1)
await TCSBusWriter(configs['light_trigger_message'])
print ("haustürklingel")
#todo trigger external api
else:
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
message = []
else:
if (val == 0 and zustand == False):
message.append(dauer)
zustand = True
microsFlanke = time.ticks_us()
if (val == 1 and zustand == True):
message.append(dauer)
zustand = False
microsFlanke = time.ticks_us()
await asyncio.sleep(0)
# Main method for the TCS:Bus writer
async def TCSBusWriter(message):
global writerActive
if (message):
busMessage = list(message)
writerActive = True
for i in range(len(busMessage)): #decode message
busMessage[i] = int((busMessage[i] + 1) * 2000)
#start sending message
sendZero = True
triggerline.on()
for i in range(len(busMessage)):
time.sleep_us(busMessage[i])
sendZero = not sendZero
if sendZero:
triggerline.on()
else:
triggerline.off()
#finally end sending message
triggerline.off()
writerActive = False
# Main method for daily housekeeping
async def Housekeeper():
Logger.LogMessage("Housekeeper started")
while True:
Logger.LogMessage("Housekeeper is performing actions")
Logger.Housekeeping()
#todo also do a ntp sync
await asyncio.sleep(86400)
# Main entry point after booting
async def Main():
set_global_exception()
Logger.LogMessage("Entering MainLoop")
boottime = time.time()
loop = asyncio.get_event_loop()
ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
Logger.LogMessage("API started")
ShowText("TCS<->FHEM", "Firmware:", version)
Logger.LogMessage("Booting complete with Firmware " + version)
loop.create_task(UiHandling())
loop.create_task(TCSBusReader())
loop.create_task(Housekeeper())
loop.run_forever()
# Booting the device
def Boot():
ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
Networking.Connect(configs['disable_wifi_powersavingmode'])
if (Networking.Status()):
ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
else:
ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
#####################################################################
Boot()
try:
asyncio.run(Main())
except KeyboardInterrupt:
Logger.LogMessage("Shutdown.")
Oled.fill(0x0000)
Oled.show()
finally:
import rp2
import TimeUtils
import Logger
import Oled
import Networking
import NTP
import uasyncio as asyncio
import utime as time
import Keypad
import usocket as socket
from machine import ADC, Pin
from secrets import secrets
from configs import configs
rp2.country(configs['country'])
version = "0.8-beta"
Oled = Oled.Oled()
TimeUtils = TimeUtils.TimeUtils()
Logger = Logger.Logger(configs['log_housekeeping_days'])
Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
NTP = NTP.NTP(Logger)
Keypad = Keypad.Keypad()
boottime = time.time()
lastActionTicks = 0
subscreen = 0
sc = 0
partyMode = False
displayOff = False
busline = ADC(28)
triggerline = Pin(15, Pin.OUT)
writerActive = False
# Reboots the Pico W (f.e. in case of an error)
def Reboot():
Logger.LogMessage("Performing Reboot")
machine.reset()
# Calculates the uptime in hours and minutes
def Uptime():
global boottime
seconds = (time.time() - boottime) % (24 * 3600)
hours = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%d:%02d" % (hours, minutes)
# Shows text on the display
def ShowText(line1, line2, line3):
Oled.fill(0x0000)
Oled.text(line1,1,2,Oled.white)
Oled.text(line2,1,12,Oled.white)
Oled.text(line3,1,22,Oled.white)
Oled.show()
# Main method for display output
heartbeat = "H"
def BuildScreen():
global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
if heartbeat == "H":
heartbeat = " "
else:
heartbeat = "H"
lastActionTicks += 1
# after 10 Seconds of no activity, go back to main screen
if (lastActionTicks >= 20):
subscreen = 0
# after X Minutes turn off the display
if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
displayOff = True
ShowText("","","")
else:
if (subscreen == 0):
ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
elif (subscreen == 1):
ShowText("Eingangstuer", "getriggert", " Ext")
elif (subscreen == 2):
ShowText("Licht im Gang", "getriggert", " Ext")
elif (subscreen == 3):
ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
elif (subscreen == 4):
ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
elif (subscreen == 5):
ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
elif (subscreen == 6):
ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
elif (subscreen == 7):
ShowText("API key:", secrets['api'], "Up Dwn Ext")
elif (subscreen == 8):
ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
elif (subscreen == 9):
ShowText("Firmware vers.:", version, "Up Dwn Ext")
elif (subscreen == 10):
ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
elif (subscreen == 11):
ShowText("Perform", "reboot now", "Up Dwn OK Ext")
else:
ShowText("Error","Invalid Screen", " Ext")
# Helper-method for the systemcheck-output on the display
def ShowSystemCheck(screen):
global subscreen, sc
if (screen == "start"):
Logger.LogMessage("Showing Systemcheck")
sc = 0
elif (screen == "next"):
sc = sc + 1
else:
sc = sc - 1
if (sc > 7):
sc = 0
elif (sc < 0):
sc = 7
subscreen = sc + 4
# Activates/deactivates the party-mode
def TogglePartyMode():
global subscreen, partyMode
if (partyMode):
partyMode = False
Logger.LogMessage("Party-Mode off")
else:
partyMode = True
Logger.LogMessage("Party-Mode on")
subscreen = 3
# Sets the party-mode active/inactive
def SetPartyMode(newValue):
global partyMode
partyMode = newValue
Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
# Returns status of party-mode
def PartyModeState():
global partyMode
if (partyMode):
return "enabled"
else:
return "disabled"
# Returns status symbol if party-mode is active
def PartyModeActive():
global PartyMode
if (partyMode):
return "P"
else:
return " "
#####################################################################
# Helper-method to allow error handling and output in asyncio
def set_global_exception():
def handle_exception(loop, context):
Logger.LogMessage("Fatal error: " + str(context["exception"]))
import sys
sys.print_exception(context["exception"])
sys.exit()
Reboot()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
# Main method for all the UI-handling
async def UiHandling():
global interrupt_flag, lastActionTicks, displayOff, subscreen
Logger.LogMessage("UI handling started")
while True:
BuildScreen()
if Keypad.interrupt_flag is 1:
Keypad.interrupt_flag = 0
lastActionTicks = 0
if (displayOff):
displayOff = False
else:
Keypad.interrupt_flag = 0
if (subscreen == 0):
if (Keypad.tastePressed == Keypad.taste1):
ShowSystemCheck("start")
elif (Keypad.tastePressed == Keypad.taste2):
TogglePartyMode()
elif (Keypad.tastePressed == Keypad.taste3):
subscreen = 2
Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message'])
elif (Keypad.tastePressed == Keypad.taste4):
subscreen = 1
Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message'])
else:
Logger.LogMessage("Error: Invalid Button pressed!")
elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
Reboot()
elif (Keypad.tastePressed == Keypad.taste3):
ShowSystemCheck("next")
elif (Keypad.tastePressed == Keypad.taste4):
ShowSystemCheck("prev")
else:
Logger.LogMessage("Error: Invalid Button pressed!")
else:
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
await asyncio.sleep(0.5)
# Main method for the API
html = """<!DOCTYPE html>
<html>
<head> <title>TCS<->FHEM</title> </head>
<body> <h1>TCS<->FHEM</h1>
<p>%s</p>
</body>
</html>"""
json = """{ "TCS<->FHEM API":"%s" }"""
async def APIHandling(reader, writer):
request_line = await reader.readline()
while await reader.readline() != b"\r\n":
pass
request = str(request_line)
try:
request = request.split()[1]
except IndexError:
pass
client_ip = writer.get_extra_info('peername')[0]
Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
stateis = "<b>Error 401:</b> Client '" + client_ip + "' is not authorized to use the API!<br><br>Set authorized client IP in configs.py!"
response = html % stateis
writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
else:
req = request.split('/')
stateis = ""
if (len(req) == 3 or len(req) == 4):
if (req[1] == secrets['api']):
if (req[2] == "triggerdoor"):
Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message'])
stateis = "Triggered front door opener"
elif (req[2] == "triggerlight"):
Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message'])
stateis = "Triggered light"
elif (req[2] == "togglepartymode"):
TogglePartyMode()
stateis = "Toggled Party-Mode"
elif (req[2] == "partymodeon"):
SetPartyMode(True)
stateis = "Enabled Party-Mode"
elif (req[2] == "partymodeoff"):
SetPartyMode(False)
stateis = "Disabled Party-Mode"
elif (req[2] == "partymodestate"):
stateis = "Party-Mode is " + PartyModeState()
elif (req[2] == "ping"):
stateis = "OK"
elif (req[2] == "stats"):
stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "<br>Message 'Door ringing': " + str(configs['door_ringing_message']) + "<br>Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "<br>Message 'Light triggered': " + str(configs['light_trigger_message']) + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000)
elif (req[2] == "reboot"):
stateis = "Rebooting device now..."
Reboot()
elif (req[2] == "ringdoor"):
stateis = "Ringing doorbell"
await TCSBusWriter(configs['door_ringing_message'])
elif (req[2] == "ringfrontdoor"):
stateis = "Ringing front doorbell"
await TCSBusWriter(configs['frontdoor_ringing_message'])
elif (req[2] == "logs"):
stateis = Logger.LastLogs(50)
else:
stateis = "<b>Error:</b> Unknown command!"
else:
stateis = "<b>Error:</b> API key is invalid!"
if (len(req) == 4 and req[3] == "json"):
response = json % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
else:
response = html % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
else:
stateis = "<b>Error 400:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>stats</li><li>reboot</li><li>ringdoor</li><li>ringfrontdoor</li></ul><br><u>API Key:</u> set 'api' in secrets.py file."
response = html % stateis
writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
writer.write(response)
await writer.drain()
await writer.wait_closed()
microsFlanke = 0
def microsSeitLetzterFlanke():
global microsFlanke
return time.ticks_us() - microsFlanke
# Main method for the TCS:Bus reader
async def TCSBusReader():
global busline, microsFlanke, partyMode, writerActive
zustand = False
Logger.LogMessage("TCS Busreader started")
message = []
while True:
if not writerActive:
busValue = busline.read_u16()
val = 1
if (busValue >= 50000): #voltage on TCS:Bus 0...65535
val = 1
else:
val = 0
#measure voltage changes and time in between
dauer = microsSeitLetzterFlanke()
if (dauer > 10000) and (message): #handle recieved message, and reset message
message.pop(0) #remove first timing, because we do not need it
for i in range(len(message)): #encode message
message[i] = int(((round(message[i] / 1000.0) * 1000.0) / 2000) - 1)
if (message == configs['light_trigger_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
#nothing else to do
elif (message == configs['door_trigger_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
#nothing else to do
elif (message == configs['door_ringing_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
print ("türklingel")
#todo trigger external api
elif (message == configs['frontdoor_ringing_message']):
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
if (partyMode):
asyncio.sleep(1)
Logger.LogMessage("Triggering Door and Light for Party-Mode")
await TCSBusWriter(configs['door_trigger_message'])
asyncio.sleep(1)
await TCSBusWriter(configs['light_trigger_message'])
print ("haustürklingel")
#todo trigger external api
else:
if (configs['log_incoming_bus_messages']):
Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
message = []
else:
if (val == 0 and zustand == False):
message.append(dauer)
zustand = True
microsFlanke = time.ticks_us()
if (val == 1 and zustand == True):
message.append(dauer)
zustand = False
microsFlanke = time.ticks_us()
await asyncio.sleep(0)
# Main method for the TCS:Bus writer
async def TCSBusWriter(message):
global writerActive
if (message):
busMessage = list(message)
writerActive = True
for i in range(len(busMessage)): #decode message
busMessage[i] = int((busMessage[i] + 1) * 2000)
#start sending message
sendZero = True
triggerline.on()
for i in range(len(busMessage)):
time.sleep_us(busMessage[i])
sendZero = not sendZero
if sendZero:
triggerline.on()
else:
triggerline.off()
#finally end sending message
triggerline.off()
writerActive = False
# Main method for daily housekeeping
async def Housekeeper():
Logger.LogMessage("Housekeeper started")
while True:
Logger.LogMessage("Housekeeper is performing actions")
Logger.Housekeeping()
#todo also do a ntp sync
await asyncio.sleep(86400)
# Main entry point after booting
async def Main():
set_global_exception()
Logger.LogMessage("Entering MainLoop")
boottime = time.time()
loop = asyncio.get_event_loop()
ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
Logger.LogMessage("API started")
ShowText("TCS<->FHEM", "Firmware:", version)
Logger.LogMessage("Booting complete with Firmware " + version)
loop.create_task(UiHandling())
loop.create_task(TCSBusReader())
loop.create_task(Housekeeper())
loop.run_forever()
# Booting the device
def Boot():
ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
Networking.Connect(configs['disable_wifi_powersavingmode'])
if (Networking.Status()):
ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
else:
ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
#####################################################################
Boot()
try:
asyncio.run(Main())
except KeyboardInterrupt:
Logger.LogMessage("Shutdown.")
Oled.fill(0x0000)
Oled.show()
finally:
asyncio.new_event_loop()