some additional bugfixes

This commit is contained in:
Manuel Kamper 2023-05-22 08:01:16 +00:00
parent 6630078686
commit 833f1092d0
2 changed files with 505 additions and 487 deletions

View File

@ -38,16 +38,26 @@ class Logger():
fsize = os.stat(logfilename)[6] fsize = os.stat(logfilename)[6]
iter = 0 iter = 0
with open(logfilename, "r") as logfile: with open(logfilename, "r") as logfile:
fetched_lines = [] line_count = 0
for line in logfile:
line_count += 1
if (lines > line_count):
lines = line_count
#todo if lines from todays logfile are less than the number of requested lines, read missing lines from yesterdays logfile - only if not tempLogFile!
if bufsize > fsize: if bufsize > fsize:
bufsize = fsize - 1 bufsize = fsize - 1
fetched_lines = []
while True: while True:
iter += 1 iter += 1
logfile.seek(fsize - bufsize * iter) try:
fetched_lines.extend(logfile.readlines()) logfile.seek(fsize - bufsize * iter)
fetched_lines.extend(logfile.readlines())
except:
return "eof"
break
if len(fetched_lines) >= lines or logfile.tell() == 0: if len(fetched_lines) >= lines or logfile.tell() == 0:
#todo if lines from todays logfile are less than the number of requested lines, read missing lines from yesterdays logfile - only if not tempLogFile!
return "<br>".join(fetched_lines[-lines:]) return "<br>".join(fetched_lines[-lines:])
break
def MergeTempLogfile(self): def MergeTempLogfile(self):
if not self.tempLogFile: if not self.tempLogFile:

974
main.py
View File

@ -1,484 +1,492 @@
import rp2 import rp2
import TimeUtils import TimeUtils
import Logger import Logger
import Oled import Oled
import Networking import Networking
import NTP import NTP
import uasyncio as asyncio import uasyncio as asyncio
import utime as time import utime as time
import Keypad import Keypad
import usocket as socket import usocket as socket
import urequests as requests import urequests as requests
from machine import ADC, Pin from machine import ADC, Pin
from secrets import secrets from secrets import secrets
from configs import configs from configs import configs
rp2.country(configs['country']) rp2.country(configs['country'])
version = "0.9-beta" version = "0.10-beta"
Oled = Oled.Oled() Oled = Oled.Oled()
TimeUtils = TimeUtils.TimeUtils() TimeUtils = TimeUtils.TimeUtils()
Logger = Logger.Logger(configs['log_housekeeping_days']) Logger = Logger.Logger(configs['log_housekeeping_days'])
Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw']) Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
NTP = NTP.NTP(Logger) NTP = NTP.NTP(Logger)
Keypad = Keypad.Keypad() Keypad = Keypad.Keypad()
boottime = time.time() boottime = time.time()
lastActionTicks = 0 lastActionTicks = 0
subscreen = 0 subscreen = 0
sc = 0 sc = 0
partyMode = False partyMode = False
displayOff = False displayOff = False
busline = ADC(28) busline = ADC(28)
triggerline = Pin(15, Pin.OUT) triggerline = Pin(15, Pin.OUT)
writerActive = False writerActive = False
# Checks if string is integer # Checks if string is integer
def IsInt(possibleint): def IsInt(possibleint):
try: try:
int(possibleint) int(possibleint)
except: except:
return False return False
else: else:
return True return True
# Reboots the Pico W (f.e. in case of an error) # Reboots the Pico W (f.e. in case of an error)
def Reboot(): def Reboot():
Logger.LogMessage("Performing Reboot") Logger.LogMessage("Performing Reboot")
machine.reset() machine.reset()
# Calculates the uptime in hours and minutes # Calculates the uptime in hours and minutes
def Uptime(): def Uptime():
global boottime global boottime
seconds = (time.time() - boottime) % (24 * 3600) seconds = (time.time() - boottime) % (24 * 3600)
hours = seconds // 3600 hours = seconds // 3600
seconds %= 3600 seconds %= 3600
minutes = seconds // 60 minutes = seconds // 60
seconds %= 60 seconds %= 60
return "%d:%02d" % (hours, minutes) return "%d:%02d" % (hours, minutes)
# Shows text on the display # Shows text on the display
def ShowText(line1, line2, line3): def ShowText(line1, line2, line3):
Oled.fill(0x0000) Oled.fill(0x0000)
Oled.text(line1,1,2,Oled.white) Oled.text(line1,1,2,Oled.white)
Oled.text(line2,1,12,Oled.white) Oled.text(line2,1,12,Oled.white)
Oled.text(line3,1,22,Oled.white) Oled.text(line3,1,22,Oled.white)
Oled.show() Oled.show()
# Main method for display output # Main method for display output
heartbeat = "H" heartbeat = "H"
def BuildScreen(): def BuildScreen():
global subscreen, partyMode, lastActionTicks, displayOff, heartbeat global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
if heartbeat == "H": if heartbeat == "H":
heartbeat = " " heartbeat = " "
else: else:
heartbeat = "H" heartbeat = "H"
lastActionTicks += 1 lastActionTicks += 1
# after 10 Seconds of no activity, go back to main screen # after 10 Seconds of no activity, go back to main screen
if (lastActionTicks >= 20): if (lastActionTicks >= 20):
subscreen = 0 subscreen = 0
# after X Minutes turn off the display # after X Minutes turn off the display
if (lastActionTicks >= (configs['displayoff'] * 60 * 2)): if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
displayOff = True displayOff = True
ShowText("","","") ShowText("","","")
else: else:
if (subscreen == 0): if (subscreen == 0):
ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk") ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
elif (subscreen == 1): elif (subscreen == 1):
ShowText("Eingangstuer", "getriggert", " Ext") ShowText("Eingangstuer", "getriggert", " Ext")
elif (subscreen == 2): elif (subscreen == 2):
ShowText("Licht am Gang", "getriggert", " Ext") ShowText("Licht am Gang", "getriggert", " Ext")
elif (subscreen == 3): elif (subscreen == 3):
ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext") ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
elif (subscreen == 4): elif (subscreen == 4):
ShowText("Hostname:", configs['hostname'], "Up Dwn Ext") ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
elif (subscreen == 5): elif (subscreen == 5):
ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext") ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
elif (subscreen == 6): elif (subscreen == 6):
ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext") ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
elif (subscreen == 7): elif (subscreen == 7):
ShowText("API key:", secrets['api'], "Up Dwn Ext") ShowText("API key:", secrets['api'], "Up Dwn Ext")
elif (subscreen == 8): elif (subscreen == 8):
ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext") ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
elif (subscreen == 9): elif (subscreen == 9):
ShowText("Firmware vers.:", version, "Up Dwn Ext") ShowText("Firmware vers.:", version, "Up Dwn Ext")
elif (subscreen == 10): elif (subscreen == 10):
ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext") ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
elif (subscreen == 11): elif (subscreen == 11):
ShowText("Perform", "reboot now", "Up Dwn OK Ext") ShowText("Perform", "reboot now", "Up Dwn OK Ext")
else: else:
ShowText("Error","Invalid Screen", " Ext") ShowText("Error","Invalid Screen", " Ext")
# Helper-method for the systemcheck-output on the display # Helper-method for the systemcheck-output on the display
def ShowSystemCheck(screen): def ShowSystemCheck(screen):
global subscreen, sc global subscreen, sc
if (screen == "start"): if (screen == "start"):
Logger.LogMessage("Showing Systemcheck") Logger.LogMessage("Showing Systemcheck")
sc = 0 sc = 0
elif (screen == "next"): elif (screen == "next"):
sc = sc + 1 sc = sc + 1
else: else:
sc = sc - 1 sc = sc - 1
if (sc > 7): if (sc > 7):
sc = 0 sc = 0
elif (sc < 0): elif (sc < 0):
sc = 7 sc = 7
subscreen = sc + 4 subscreen = sc + 4
# Activates/deactivates the party-mode # Activates/deactivates the party-mode
def TogglePartyMode(): def TogglePartyMode():
global subscreen, partyMode global subscreen, partyMode
if (partyMode): if (partyMode):
partyMode = False partyMode = False
ExternalAPI("partymode" + PartyModeState()) ExternalAPI("partymode" + PartyModeState())
Logger.LogMessage("Party-Mode off") Logger.LogMessage("Party-Mode off")
else: else:
partyMode = True partyMode = True
ExternalAPI("partymode" + PartyModeState()) ExternalAPI("partymode" + PartyModeState())
Logger.LogMessage("Party-Mode on") Logger.LogMessage("Party-Mode on")
subscreen = 3 subscreen = 3
# Sets the party-mode active/inactive # Sets the party-mode active/inactive
def SetPartyMode(newValue): def SetPartyMode(newValue):
global partyMode global partyMode
partyMode = newValue partyMode = newValue
ExternalAPI("partymode" + PartyModeState()) ExternalAPI("partymode" + PartyModeState())
Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState()) Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
# Returns status of party-mode # Returns status of party-mode
def PartyModeState(): def PartyModeState():
global partyMode global partyMode
if (partyMode): if (partyMode):
return "enabled" return "enabled"
else: else:
return "disabled" return "disabled"
# Returns status symbol if party-mode is active # Returns status symbol if party-mode is active
def PartyModeActive(): def PartyModeActive():
global PartyMode global PartyMode
if (partyMode): if (partyMode):
return "P" return "P"
else: else:
return " " return " "
##################################################################### #####################################################################
# Helper-method to allow error handling and output in asyncio # Helper-method to allow error handling and output in asyncio
def set_global_exception(): def set_global_exception():
def handle_exception(loop, context): def handle_exception(loop, context):
Logger.LogMessage("Fatal error: " + str(context["exception"])) Logger.LogMessage("Fatal error: " + str(context["exception"]))
import sys import sys
sys.print_exception(context["exception"]) sys.print_exception(context["exception"])
sys.exit() sys.exit()
Reboot() Reboot()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception) loop.set_exception_handler(handle_exception)
# Main method for all the UI-handling # Main method for all the UI-handling
async def UiHandling(): async def UiHandling():
global interrupt_flag, lastActionTicks, displayOff, subscreen global interrupt_flag, lastActionTicks, displayOff, subscreen
Logger.LogMessage("UI handling started") Logger.LogMessage("UI handling started")
while True: while True:
BuildScreen() BuildScreen()
if Keypad.interrupt_flag is 1: if Keypad.interrupt_flag is 1:
Keypad.interrupt_flag = 0 Keypad.interrupt_flag = 0
lastActionTicks = 0 lastActionTicks = 0
if (displayOff): if (displayOff):
displayOff = False displayOff = False
else: else:
Keypad.interrupt_flag = 0 Keypad.interrupt_flag = 0
if (subscreen == 0): if (subscreen == 0):
if (Keypad.tastePressed == Keypad.taste1): if (Keypad.tastePressed == Keypad.taste1):
ShowSystemCheck("start") ShowSystemCheck("start")
elif (Keypad.tastePressed == Keypad.taste2): elif (Keypad.tastePressed == Keypad.taste2):
TogglePartyMode() TogglePartyMode()
elif (Keypad.tastePressed == Keypad.taste3): elif (Keypad.tastePressed == Keypad.taste3):
subscreen = 2 subscreen = 2
Logger.LogMessage("Triggering Licht") Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message']) await TCSBusWriter(configs['light_trigger_message'])
elif (Keypad.tastePressed == Keypad.taste4): elif (Keypad.tastePressed == Keypad.taste4):
subscreen = 1 subscreen = 1
Logger.LogMessage("Triggering Door") Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message']) await TCSBusWriter(configs['door_trigger_message'])
else: else:
Logger.LogMessage("Error: Invalid Button pressed!") Logger.LogMessage("Error: Invalid Button pressed!")
elif (subscreen == 1 or subscreen == 2 or subscreen == 3): elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
if (Keypad.tastePressed == Keypad.taste1): if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0 subscreen = 0
elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11): elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
if (Keypad.tastePressed == Keypad.taste1): if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0 subscreen = 0
elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2): elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
Reboot() Reboot()
elif (Keypad.tastePressed == Keypad.taste3): elif (Keypad.tastePressed == Keypad.taste3):
ShowSystemCheck("next") ShowSystemCheck("next")
elif (Keypad.tastePressed == Keypad.taste4): elif (Keypad.tastePressed == Keypad.taste4):
ShowSystemCheck("prev") ShowSystemCheck("prev")
else: else:
Logger.LogMessage("Error: Invalid Button pressed!") Logger.LogMessage("Error: Invalid Button pressed!")
else: else:
if (Keypad.tastePressed == Keypad.taste1): if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0 subscreen = 0
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
# Main method for the API # Main method for the API
html = """<!DOCTYPE html> html = """<!DOCTYPE html>
<html> <html>
<head> <title>TCS<->FHEM</title> </head> <head> <title>TCS<->FHEM</title> </head>
<body> <h1>TCS<->FHEM</h1> <body> <h1>TCS<->FHEM</h1>
<p>%s</p> <p>%s</p>
</body> </body>
</html>""" </html>"""
json = """{ "TCS<->FHEM API":"%s" }""" json = """{ "TCS<->FHEM API":"%s" }"""
async def APIHandling(reader, writer): async def APIHandling(reader, writer):
request_line = await reader.readline() request_line = await reader.readline()
while await reader.readline() != b"\r\n": while await reader.readline() != b"\r\n":
pass pass
request = str(request_line) request = str(request_line)
try: try:
request = request.split()[1] request = request.split()[1]
except IndexError: except IndexError:
pass pass
client_ip = writer.get_extra_info('peername')[0] client_ip = writer.get_extra_info('peername')[0]
Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip) Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip): if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
Logger.LogMessage("Unauthorized client! Aborting API Handling now.") Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
stateis = "<b>Error 401:</b> Client '" + client_ip + "' is not authorized to use the API!<br><br>Set authorized client IP in configs.py!" stateis = "<b>Error 401:</b> Client '" + client_ip + "' is not authorized to use the API!<br><br>Set authorized client IP in configs.py!"
response = html % stateis response = html % stateis
writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n') writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
else: else:
req = request.split('/') req = request.split('/')
stateis = "" stateis = ""
if (len(req) == 3 or len(req) == 4 or len(req) == 5): if (len(req) == 3 or len(req) == 4 or len(req) == 5):
if (req[1] == secrets['api']): if (req[1] == secrets['api']):
if (req[2] == "triggerdoor"): if (req[2] == "triggerdoor"):
Logger.LogMessage("Triggering Door") Logger.LogMessage("Triggering Door")
await TCSBusWriter(configs['door_trigger_message']) await TCSBusWriter(configs['door_trigger_message'])
stateis = "Triggered front door opener" stateis = "Triggered front door opener"
elif (req[2] == "triggerlight"): elif (req[2] == "triggerlight"):
Logger.LogMessage("Triggering Licht") Logger.LogMessage("Triggering Licht")
await TCSBusWriter(configs['light_trigger_message']) await TCSBusWriter(configs['light_trigger_message'])
stateis = "Triggered light" stateis = "Triggered light"
elif (req[2] == "togglepartymode"): elif (req[2] == "togglepartymode"):
TogglePartyMode() TogglePartyMode()
stateis = "Toggled Party-Mode" stateis = "Toggled Party-Mode"
elif (req[2] == "partymodeon"): elif (req[2] == "partymodeon"):
SetPartyMode(True) SetPartyMode(True)
stateis = "Enabled Party-Mode" stateis = "Enabled Party-Mode"
elif (req[2] == "partymodeoff"): elif (req[2] == "partymodeoff"):
SetPartyMode(False) SetPartyMode(False)
stateis = "Disabled Party-Mode" stateis = "Disabled Party-Mode"
elif (req[2] == "partymodestate"): elif (req[2] == "partymodestate"):
stateis = "Party-Mode is " + PartyModeState() stateis = "Party-Mode is " + PartyModeState()
elif (req[2] == "ping"): elif (req[2] == "ping"):
stateis = "OK" stateis = "OK"
elif (req[2] == "stats"): elif (req[2] == "stats"):
stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "<br>Message 'Door ringing': " + str(configs['door_ringing_message']) + "<br>Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "<br>Message 'Light triggered': " + str(configs['light_trigger_message']) + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000) stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "<br>Message 'Door ringing': " + str(configs['door_ringing_message']) + "<br>Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "<br>Message 'Light triggered': " + str(configs['light_trigger_message']) + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000) + "<br>Party-Mode is: " + PartyModeState()
elif (req[2] == "reboot"): elif (req[2] == "reboot"):
stateis = "Rebooting device now..." stateis = "Rebooting device now..."
Reboot() Reboot()
elif (req[2] == "ringdoor"): elif (req[2] == "ringdoor"):
stateis = "Ringing doorbell" stateis = "Ringing doorbell"
await TCSBusWriter(configs['door_ringing_message']) await TCSBusWriter(configs['door_ringing_message'])
elif (req[2] == "ringfrontdoor"): elif (req[2] == "ringfrontdoor"):
stateis = "Ringing front doorbell" stateis = "Ringing front doorbell"
await TCSBusWriter(configs['frontdoor_ringing_message']) await TCSBusWriter(configs['frontdoor_ringing_message'])
elif (req[2] == "logs"): elif (req[2] == "logs"):
if (len(req) >= 4 and req[3] != "json"): if (len(req) >= 4 and req[3] != "json"):
if (IsInt(req[3])): if (IsInt(req[3])):
stateis = Logger.LastLogs(int(req[3])) stateis = Logger.LastLogs(int(req[3]))
else: else:
stateis = "<b>Error:</b> Parameter for log length not an integer!" stateis = "<b>Error:</b> Parameter for log length not an integer!"
else: else:
stateis = Logger.LastLogs(50) stateis = Logger.LastLogs(12)
else: else:
stateis = "<b>Error:</b> Unknown command!" stateis = "<b>Error:</b> Unknown command!"
else: else:
stateis = "<b>Error:</b> API key is invalid!" stateis = "<b>Error:</b> API key is invalid!"
if ((len(req) == 4 and req[3] == "json") or (len(req) == 5 and req[4] == "json")): if ((len(req) == 4 and req[3] == "json") or (len(req) == 5 and req[4] == "json")):
response = json % stateis if (req[2] != "logs"):
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n') stateis = stateis.replace(": ", "\":\"")
else: stateis = stateis.replace("<br>", "\", \"")
response = html % stateis stateis = stateis.replace("&#176;", "°")
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n') else:
else: stateis = stateis.replace(";", "\":\"")
stateis = "<b>Error 400:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>stats</li><li>reboot</li><li>ringdoor</li><li>ringfrontdoor</li></ul><br><u>API Key:</u> set 'api' in secrets.py file." stateis = stateis.replace("<br>", "\", \"")
response = html % stateis stateis = stateis.replace("\n", "").replace("\r", "")
writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n') response = json % stateis
writer.write(response) writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
await writer.drain() else:
await writer.wait_closed() response = html % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
microsFlanke = 0 else:
def microsSeitLetzterFlanke(): stateis = "<b>Error 400:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>stats</li><li>reboot</li><li>ringdoor</li><li>ringfrontdoor</li></ul><br><u>API Key:</u> set 'api' in secrets.py file."
global microsFlanke response = html % stateis
return time.ticks_us() - microsFlanke writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
writer.write(response)
# Main method for the TCS:Bus reader await writer.drain()
async def TCSBusReader(): await writer.wait_closed()
global busline, microsFlanke, partyMode, writerActive
zustand = False microsFlanke = 0
Logger.LogMessage("TCS Busreader started") def microsSeitLetzterFlanke():
message = [] global microsFlanke
while True: return time.ticks_us() - microsFlanke
if not writerActive:
busValue = busline.read_u16() # Main method for the TCS:Bus reader
val = 1 async def TCSBusReader():
if (busValue >= 30000): #voltage on TCS:Bus 0...65535 global busline, microsFlanke, partyMode, writerActive
val = 1 zustand = False
else: Logger.LogMessage("TCS Busreader started")
val = 0 message = []
#measure voltage changes and time in between while True:
dauer = microsSeitLetzterFlanke() if not writerActive:
if (dauer > 10000) and (message): #handle recieved message, and reset message busValue = busline.read_u16()
message.pop(0) #remove first timing, because we do not need it val = 1
for i in range(len(message)): #encode bus message (lazy, because the TCS:Bus is not as precise as we are ;) ) if (busValue >= 30000): #voltage on TCS:Bus 0...65535
if (message[i] >= 1000 and message[i] <= 2999): val = 1
message[i] = 0 else:
elif (message[i] >= 3000 and message[i] <= 4999): val = 0
message[i] = 1 #measure voltage changes and time in between
elif (message[i] >= 5000 and message[i] <= 6999): dauer = microsSeitLetzterFlanke()
message[i] = 2 if (dauer > 10000) and (message): #handle recieved message, and reset message
elif (message[i] >= 7000): #this may be an invalid message bit, but for not having numbers like '7543', we encode all this to '3' message.pop(0) #remove first timing, because we do not need it
message[i] = 3 for i in range(len(message)): #encode bus message (lazy, because the TCS:Bus is not as precise as we are ;) )
if (message == configs['light_trigger_message']): if (message[i] >= 1000 and message[i] <= 2999):
if (configs['log_incoming_bus_messages']): message[i] = 0
Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message)) elif (message[i] >= 3000 and message[i] <= 4999):
#nothing else to do message[i] = 1
pass elif (message[i] >= 5000 and message[i] <= 6999):
elif (message == configs['door_trigger_message']): message[i] = 2
if (configs['log_incoming_bus_messages']): elif (message[i] >= 7000): #this may be an invalid message bit, but for not having numbers like '7543', we encode all this to '3'
Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message)) message[i] = 3
#nothing else to do if (message == configs['light_trigger_message']):
pass if (configs['log_incoming_bus_messages']):
elif (message == configs['tcs_message']): Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
if (configs['log_incoming_bus_messages']): #nothing else to do
Logger.LogMessage("Incoming TCS:Bus message from device: " + str(message)) pass
#nothing else to do elif (message == configs['door_trigger_message']):
pass if (configs['log_incoming_bus_messages']):
elif (message == configs['door_ringing_message']): Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
if (configs['log_incoming_bus_messages']): #nothing else to do
Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message)) pass
await ExternalAPI("doorbell") elif (message == configs['tcs_message']):
elif (message == configs['frontdoor_ringing_message']): if (configs['log_incoming_bus_messages']):
if (configs['log_incoming_bus_messages']): Logger.LogMessage("Incoming TCS:Bus message from device: " + str(message))
Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message)) #nothing else to do
if (partyMode): pass
asyncio.sleep(1) elif (message == configs['door_ringing_message']):
Logger.LogMessage("Triggering Door and Light for Party-Mode") if (configs['log_incoming_bus_messages']):
await TCSBusWriter(configs['door_trigger_message']) Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
asyncio.sleep(1) await ExternalAPI("doorbell")
await TCSBusWriter(configs['light_trigger_message']) elif (message == configs['frontdoor_ringing_message']):
await ExternalAPI("frontdoorbell") if (configs['log_incoming_bus_messages']):
else: Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
if (configs['log_incoming_bus_messages']): if (partyMode):
Logger.LogMessage("Unknown TCS:Bus message: " + str(message)) asyncio.sleep(1)
message = [] Logger.LogMessage("Triggering Door and Light for Party-Mode")
else: await TCSBusWriter(configs['door_trigger_message'])
if (val == 0 and zustand == False): asyncio.sleep(1)
message.append(dauer) await TCSBusWriter(configs['light_trigger_message'])
zustand = True await ExternalAPI("frontdoorbell")
microsFlanke = time.ticks_us() else:
if (val == 1 and zustand == True): if (configs['log_incoming_bus_messages']):
message.append(dauer) Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
zustand = False message = []
microsFlanke = time.ticks_us() else:
await asyncio.sleep(0) if (val == 0 and zustand == False):
message.append(dauer)
# Main method for the TCS:Bus writer zustand = True
async def TCSBusWriter(message): microsFlanke = time.ticks_us()
global writerActive if (val == 1 and zustand == True):
if (message): message.append(dauer)
busMessage = list(message) zustand = False
writerActive = True microsFlanke = time.ticks_us()
for i in range(len(busMessage)): #decode message await asyncio.sleep(0)
busMessage[i] = int((busMessage[i] + 1) * 2000)
#start sending message # Main method for the TCS:Bus writer
sendZero = True async def TCSBusWriter(message):
triggerline.on() global writerActive
for i in range(len(busMessage)): if (message):
time.sleep_us(busMessage[i]) busMessage = list(message)
sendZero = not sendZero writerActive = True
if sendZero: for i in range(len(busMessage)): #decode message
triggerline.on() busMessage[i] = int((busMessage[i] + 1) * 2000)
else: #start sending message
triggerline.off() sendZero = True
#finally end sending message triggerline.on()
triggerline.off() for i in range(len(busMessage)):
writerActive = False time.sleep_us(busMessage[i])
sendZero = not sendZero
# Main method for external API (=FHEM) if sendZero:
async def ExternalAPI(action): triggerline.on()
Logger.LogMessage("External API action: " + action) else:
if (action == "doorbell"): triggerline.off()
if (configs['fhem_door_ringed'] is not ""): #finally end sending message
response = requests.get(configs['fhem_door_ringed']) triggerline.off()
response.close() writerActive = False
else:
Logger.LogMessage("Warning: No fhem_door_ringed setting!") # Main method for external API (=FHEM)
elif (action == "frontdoorbell"): async def ExternalAPI(action):
if (configs['fhem_frontdoor_ringed'] is not ""): Logger.LogMessage("External API action: " + action)
response = requests.get(configs['fhem_frontdoor_ringed']) if (action == "doorbell"):
response.close() if (configs['fhem_door_ringed'] is not ""):
else: response = requests.get(configs['fhem_door_ringed'])
Logger.LogMessage("Warning: No fhem_frontdoor_ringed setting!") response.close()
else: else:
Logger.LogMessage("Error: Unknown ExternalAPI action!") Logger.LogMessage("Warning: No fhem_door_ringed setting!")
pass elif (action == "frontdoorbell"):
if (configs['fhem_frontdoor_ringed'] is not ""):
# Main method for daily housekeeping response = requests.get(configs['fhem_frontdoor_ringed'])
async def Housekeeper(): response.close()
Logger.LogMessage("Housekeeper started") else:
while True: Logger.LogMessage("Warning: No fhem_frontdoor_ringed setting!")
Logger.LogMessage("Housekeeper is performing actions") else:
Logger.Housekeeping() Logger.LogMessage("Error: Unknown ExternalAPI action!")
Logger.LogMessage("Housekeeper is performing NTP sync") pass
NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
Logger.LogMessage("Housekeeper has finished its jobs") # Main method for daily housekeeping
await asyncio.sleep(86400) async def Housekeeper():
Logger.LogMessage("Housekeeper started")
# Main entry point after booting while True:
async def Main(): Logger.LogMessage("Housekeeper is performing actions")
set_global_exception() Logger.Housekeeping()
Logger.LogMessage("Entering MainLoop") Logger.LogMessage("Housekeeper is performing NTP sync")
boottime = time.time() NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
loop = asyncio.get_event_loop() Logger.LogMessage("Housekeeper has finished its jobs")
ShowText("Booting [3/3]", "API Setup: key", secrets['api']) await asyncio.sleep(86400)
Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port'])) # Main entry point after booting
Logger.LogMessage("API started") async def Main():
ShowText("TCS<->FHEM", "Firmware:", version) set_global_exception()
Logger.LogMessage("Booting complete with Firmware " + version) Logger.LogMessage("Entering MainLoop")
loop.create_task(UiHandling()) boottime = time.time()
loop.create_task(TCSBusReader()) loop = asyncio.get_event_loop()
loop.create_task(Housekeeper()) ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
loop.run_forever() Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
# Booting the device Logger.LogMessage("API started")
def Boot(): ShowText("TCS<->FHEM", "Firmware:", version)
ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...") Logger.LogMessage("Booting complete with Firmware " + version)
ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress()) loop.create_task(UiHandling())
Networking.Connect(configs['disable_wifi_powersavingmode']) loop.create_task(TCSBusReader())
if (Networking.Status()): loop.create_task(Housekeeper())
ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress()) loop.run_forever()
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
if (NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])): # Booting the device
Logger.DisableTempLogfile() def Boot():
ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow()) ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
else: ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!") Networking.Connect(configs['disable_wifi_powersavingmode'])
time.sleep(3) if (Networking.Status()):
Reboot() ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
##################################################################### if (NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])):
Logger.DisableTempLogfile()
Boot() ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
try: else:
asyncio.run(Main()) ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
except KeyboardInterrupt: time.sleep(3)
Logger.LogMessage("Shutdown.") Reboot()
Oled.fill(0x0000)
Oled.show() #####################################################################
finally:
Boot()
try:
asyncio.run(Main())
except KeyboardInterrupt:
Logger.LogMessage("Shutdown.")
Oled.fill(0x0000)
Oled.show()
finally:
asyncio.new_event_loop() asyncio.new_event_loop()