diff --git a/Logger.py b/Logger.py
index 979c3e2..b9420cf 100644
--- a/Logger.py
+++ b/Logger.py
@@ -38,16 +38,26 @@ class Logger():
fsize = os.stat(logfilename)[6]
iter = 0
with open(logfilename, "r") as logfile:
- fetched_lines = []
+ line_count = 0
+ for line in logfile:
+ line_count += 1
+ if (lines > line_count):
+ lines = line_count
+ #todo if lines from todays logfile are less than the number of requested lines, read missing lines from yesterdays logfile - only if not tempLogFile!
if bufsize > fsize:
bufsize = fsize - 1
+ fetched_lines = []
while True:
iter += 1
- logfile.seek(fsize - bufsize * iter)
- fetched_lines.extend(logfile.readlines())
+ try:
+ logfile.seek(fsize - bufsize * iter)
+ fetched_lines.extend(logfile.readlines())
+ except:
+ return "eof"
+ break
if len(fetched_lines) >= lines or logfile.tell() == 0:
- #todo if lines from todays logfile are less than the number of requested lines, read missing lines from yesterdays logfile - only if not tempLogFile!
return "
".join(fetched_lines[-lines:])
+ break
def MergeTempLogfile(self):
if not self.tempLogFile:
diff --git a/main.py b/main.py
index 7965431..91ace34 100644
--- a/main.py
+++ b/main.py
@@ -1,484 +1,492 @@
-import rp2
-import TimeUtils
-import Logger
-import Oled
-import Networking
-import NTP
-import uasyncio as asyncio
-import utime as time
-import Keypad
-import usocket as socket
-import urequests as requests
-from machine import ADC, Pin
-
-from secrets import secrets
-from configs import configs
-
-rp2.country(configs['country'])
-version = "0.9-beta"
-
-Oled = Oled.Oled()
-TimeUtils = TimeUtils.TimeUtils()
-Logger = Logger.Logger(configs['log_housekeeping_days'])
-Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
-NTP = NTP.NTP(Logger)
-Keypad = Keypad.Keypad()
-
-boottime = time.time()
-lastActionTicks = 0
-subscreen = 0
-sc = 0
-partyMode = False
-displayOff = False
-busline = ADC(28)
-triggerline = Pin(15, Pin.OUT)
-writerActive = False
-
-# Checks if string is integer
-def IsInt(possibleint):
- try:
- int(possibleint)
- except:
- return False
- else:
- return True
-
-# Reboots the Pico W (f.e. in case of an error)
-def Reboot():
- Logger.LogMessage("Performing Reboot")
- machine.reset()
-
-# Calculates the uptime in hours and minutes
-def Uptime():
- global boottime
- seconds = (time.time() - boottime) % (24 * 3600)
- hours = seconds // 3600
- seconds %= 3600
- minutes = seconds // 60
- seconds %= 60
- return "%d:%02d" % (hours, minutes)
-
-# Shows text on the display
-def ShowText(line1, line2, line3):
- Oled.fill(0x0000)
- Oled.text(line1,1,2,Oled.white)
- Oled.text(line2,1,12,Oled.white)
- Oled.text(line3,1,22,Oled.white)
- Oled.show()
-
-# Main method for display output
-heartbeat = "H"
-def BuildScreen():
- global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
- if heartbeat == "H":
- heartbeat = " "
- else:
- heartbeat = "H"
- lastActionTicks += 1
- # after 10 Seconds of no activity, go back to main screen
- if (lastActionTicks >= 20):
- subscreen = 0
- # after X Minutes turn off the display
- if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
- displayOff = True
- ShowText("","","")
- else:
- if (subscreen == 0):
- ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
- elif (subscreen == 1):
- ShowText("Eingangstuer", "getriggert", " Ext")
- elif (subscreen == 2):
- ShowText("Licht am Gang", "getriggert", " Ext")
- elif (subscreen == 3):
- ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
- elif (subscreen == 4):
- ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
- elif (subscreen == 5):
- ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
- elif (subscreen == 6):
- ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
- elif (subscreen == 7):
- ShowText("API key:", secrets['api'], "Up Dwn Ext")
- elif (subscreen == 8):
- ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
- elif (subscreen == 9):
- ShowText("Firmware vers.:", version, "Up Dwn Ext")
- elif (subscreen == 10):
- ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
- elif (subscreen == 11):
- ShowText("Perform", "reboot now", "Up Dwn OK Ext")
- else:
- ShowText("Error","Invalid Screen", " Ext")
-
-# Helper-method for the systemcheck-output on the display
-def ShowSystemCheck(screen):
- global subscreen, sc
- if (screen == "start"):
- Logger.LogMessage("Showing Systemcheck")
- sc = 0
- elif (screen == "next"):
- sc = sc + 1
- else:
- sc = sc - 1
- if (sc > 7):
- sc = 0
- elif (sc < 0):
- sc = 7
- subscreen = sc + 4
-
-# Activates/deactivates the party-mode
-def TogglePartyMode():
- global subscreen, partyMode
- if (partyMode):
- partyMode = False
- ExternalAPI("partymode" + PartyModeState())
- Logger.LogMessage("Party-Mode off")
- else:
- partyMode = True
- ExternalAPI("partymode" + PartyModeState())
- Logger.LogMessage("Party-Mode on")
- subscreen = 3
-
-# Sets the party-mode active/inactive
-def SetPartyMode(newValue):
- global partyMode
- partyMode = newValue
- ExternalAPI("partymode" + PartyModeState())
- Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
-
-# Returns status of party-mode
-def PartyModeState():
- global partyMode
- if (partyMode):
- return "enabled"
- else:
- return "disabled"
-
-# Returns status symbol if party-mode is active
-def PartyModeActive():
- global PartyMode
- if (partyMode):
- return "P"
- else:
- return " "
-
-#####################################################################
-
-# Helper-method to allow error handling and output in asyncio
-def set_global_exception():
- def handle_exception(loop, context):
- Logger.LogMessage("Fatal error: " + str(context["exception"]))
- import sys
- sys.print_exception(context["exception"])
- sys.exit()
- Reboot()
- loop = asyncio.get_event_loop()
- loop.set_exception_handler(handle_exception)
-
-# Main method for all the UI-handling
-async def UiHandling():
- global interrupt_flag, lastActionTicks, displayOff, subscreen
- Logger.LogMessage("UI handling started")
- while True:
- BuildScreen()
- if Keypad.interrupt_flag is 1:
- Keypad.interrupt_flag = 0
- lastActionTicks = 0
- if (displayOff):
- displayOff = False
- else:
- Keypad.interrupt_flag = 0
- if (subscreen == 0):
- if (Keypad.tastePressed == Keypad.taste1):
- ShowSystemCheck("start")
- elif (Keypad.tastePressed == Keypad.taste2):
- TogglePartyMode()
- elif (Keypad.tastePressed == Keypad.taste3):
- subscreen = 2
- Logger.LogMessage("Triggering Licht")
- await TCSBusWriter(configs['light_trigger_message'])
- elif (Keypad.tastePressed == Keypad.taste4):
- subscreen = 1
- Logger.LogMessage("Triggering Door")
- await TCSBusWriter(configs['door_trigger_message'])
- else:
- Logger.LogMessage("Error: Invalid Button pressed!")
- elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
- if (Keypad.tastePressed == Keypad.taste1):
- subscreen = 0
- elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
- if (Keypad.tastePressed == Keypad.taste1):
- subscreen = 0
- elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
- Reboot()
- elif (Keypad.tastePressed == Keypad.taste3):
- ShowSystemCheck("next")
- elif (Keypad.tastePressed == Keypad.taste4):
- ShowSystemCheck("prev")
- else:
- Logger.LogMessage("Error: Invalid Button pressed!")
- else:
- if (Keypad.tastePressed == Keypad.taste1):
- subscreen = 0
- await asyncio.sleep(0.5)
-
-# Main method for the API
-html = """
-
-
TCS<->FHEM
- TCS<->FHEM
- %s
-
-"""
-json = """{ "TCS<->FHEM API":"%s" }"""
-async def APIHandling(reader, writer):
- request_line = await reader.readline()
- while await reader.readline() != b"\r\n":
- pass
- request = str(request_line)
- try:
- request = request.split()[1]
- except IndexError:
- pass
- client_ip = writer.get_extra_info('peername')[0]
- Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
- if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
- Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
- stateis = "Error 401: Client '" + client_ip + "' is not authorized to use the API!
Set authorized client IP in configs.py!"
- response = html % stateis
- writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
- else:
- req = request.split('/')
- stateis = ""
- if (len(req) == 3 or len(req) == 4 or len(req) == 5):
- if (req[1] == secrets['api']):
- if (req[2] == "triggerdoor"):
- Logger.LogMessage("Triggering Door")
- await TCSBusWriter(configs['door_trigger_message'])
- stateis = "Triggered front door opener"
- elif (req[2] == "triggerlight"):
- Logger.LogMessage("Triggering Licht")
- await TCSBusWriter(configs['light_trigger_message'])
- stateis = "Triggered light"
- elif (req[2] == "togglepartymode"):
- TogglePartyMode()
- stateis = "Toggled Party-Mode"
- elif (req[2] == "partymodeon"):
- SetPartyMode(True)
- stateis = "Enabled Party-Mode"
- elif (req[2] == "partymodeoff"):
- SetPartyMode(False)
- stateis = "Disabled Party-Mode"
- elif (req[2] == "partymodestate"):
- stateis = "Party-Mode is " + PartyModeState()
- elif (req[2] == "ping"):
- stateis = "OK"
- elif (req[2] == "stats"):
- stateis = "IP address: " + Networking.GetIPAddress() + "
MAC address: " + Networking.GetMACAddress() + "
Hostname: " + configs['hostname'] + "
API Port: " + str(configs['api_port']) + "
Uptime (h:m): " + Uptime() + "
Date/Time: " + TimeUtils.DateTimeNow() + "
Version: " + version + "
GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "
Auto summertime: " + str(configs['auto_summertime']) + "
Display off time (mins): " + str(configs['displayoff']) + "
Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "
Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "
Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "
Message 'Door ringing': " + str(configs['door_ringing_message']) + "
Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "
Message 'Light triggered': " + str(configs['light_trigger_message']) + "
CPU frequency (MHz): " + str(machine.freq()/1000000)
- elif (req[2] == "reboot"):
- stateis = "Rebooting device now..."
- Reboot()
- elif (req[2] == "ringdoor"):
- stateis = "Ringing doorbell"
- await TCSBusWriter(configs['door_ringing_message'])
- elif (req[2] == "ringfrontdoor"):
- stateis = "Ringing front doorbell"
- await TCSBusWriter(configs['frontdoor_ringing_message'])
- elif (req[2] == "logs"):
- if (len(req) >= 4 and req[3] != "json"):
- if (IsInt(req[3])):
- stateis = Logger.LastLogs(int(req[3]))
- else:
- stateis = "Error: Parameter for log length not an integer!"
- else:
- stateis = Logger.LastLogs(50)
- else:
- stateis = "Error: Unknown command!"
- else:
- stateis = "Error: API key is invalid!"
- if ((len(req) == 4 and req[3] == "json") or (len(req) == 5 and req[4] == "json")):
- response = json % stateis
- writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
- else:
- response = html % stateis
- writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
- else:
- stateis = "Error 400: Invalid usage of API!
Usage: http://servername/api_key/command[/json]
Commands:- triggerdoor
- triggerlight
- togglepartymode
- partymodeon
- partymodeoff
- partymodestate
- ping
- stats
- reboot
- ringdoor
- ringfrontdoor
API Key: set 'api' in secrets.py file."
- response = html % stateis
- writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
- writer.write(response)
- await writer.drain()
- await writer.wait_closed()
-
-microsFlanke = 0
-def microsSeitLetzterFlanke():
- global microsFlanke
- return time.ticks_us() - microsFlanke
-
-# Main method for the TCS:Bus reader
-async def TCSBusReader():
- global busline, microsFlanke, partyMode, writerActive
- zustand = False
- Logger.LogMessage("TCS Busreader started")
- message = []
- while True:
- if not writerActive:
- busValue = busline.read_u16()
- val = 1
- if (busValue >= 30000): #voltage on TCS:Bus 0...65535
- val = 1
- else:
- val = 0
- #measure voltage changes and time in between
- dauer = microsSeitLetzterFlanke()
- if (dauer > 10000) and (message): #handle recieved message, and reset message
- message.pop(0) #remove first timing, because we do not need it
- for i in range(len(message)): #encode bus message (lazy, because the TCS:Bus is not as precise as we are ;) )
- if (message[i] >= 1000 and message[i] <= 2999):
- message[i] = 0
- elif (message[i] >= 3000 and message[i] <= 4999):
- message[i] = 1
- elif (message[i] >= 5000 and message[i] <= 6999):
- message[i] = 2
- elif (message[i] >= 7000): #this may be an invalid message bit, but for not having numbers like '7543', we encode all this to '3'
- message[i] = 3
- if (message == configs['light_trigger_message']):
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
- #nothing else to do
- pass
- elif (message == configs['door_trigger_message']):
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
- #nothing else to do
- pass
- elif (message == configs['tcs_message']):
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Incoming TCS:Bus message from device: " + str(message))
- #nothing else to do
- pass
- elif (message == configs['door_ringing_message']):
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
- await ExternalAPI("doorbell")
- elif (message == configs['frontdoor_ringing_message']):
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
- if (partyMode):
- asyncio.sleep(1)
- Logger.LogMessage("Triggering Door and Light for Party-Mode")
- await TCSBusWriter(configs['door_trigger_message'])
- asyncio.sleep(1)
- await TCSBusWriter(configs['light_trigger_message'])
- await ExternalAPI("frontdoorbell")
- else:
- if (configs['log_incoming_bus_messages']):
- Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
- message = []
- else:
- if (val == 0 and zustand == False):
- message.append(dauer)
- zustand = True
- microsFlanke = time.ticks_us()
- if (val == 1 and zustand == True):
- message.append(dauer)
- zustand = False
- microsFlanke = time.ticks_us()
- await asyncio.sleep(0)
-
-# Main method for the TCS:Bus writer
-async def TCSBusWriter(message):
- global writerActive
- if (message):
- busMessage = list(message)
- writerActive = True
- for i in range(len(busMessage)): #decode message
- busMessage[i] = int((busMessage[i] + 1) * 2000)
- #start sending message
- sendZero = True
- triggerline.on()
- for i in range(len(busMessage)):
- time.sleep_us(busMessage[i])
- sendZero = not sendZero
- if sendZero:
- triggerline.on()
- else:
- triggerline.off()
- #finally end sending message
- triggerline.off()
- writerActive = False
-
-# Main method for external API (=FHEM)
-async def ExternalAPI(action):
- Logger.LogMessage("External API action: " + action)
- if (action == "doorbell"):
- if (configs['fhem_door_ringed'] is not ""):
- response = requests.get(configs['fhem_door_ringed'])
- response.close()
- else:
- Logger.LogMessage("Warning: No fhem_door_ringed setting!")
- elif (action == "frontdoorbell"):
- if (configs['fhem_frontdoor_ringed'] is not ""):
- response = requests.get(configs['fhem_frontdoor_ringed'])
- response.close()
- else:
- Logger.LogMessage("Warning: No fhem_frontdoor_ringed setting!")
- else:
- Logger.LogMessage("Error: Unknown ExternalAPI action!")
- pass
-
-# Main method for daily housekeeping
-async def Housekeeper():
- Logger.LogMessage("Housekeeper started")
- while True:
- Logger.LogMessage("Housekeeper is performing actions")
- Logger.Housekeeping()
- Logger.LogMessage("Housekeeper is performing NTP sync")
- NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
- Logger.LogMessage("Housekeeper has finished its jobs")
- await asyncio.sleep(86400)
-
-# Main entry point after booting
-async def Main():
- set_global_exception()
- Logger.LogMessage("Entering MainLoop")
- boottime = time.time()
- loop = asyncio.get_event_loop()
- ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
- Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
- loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
- Logger.LogMessage("API started")
- ShowText("TCS<->FHEM", "Firmware:", version)
- Logger.LogMessage("Booting complete with Firmware " + version)
- loop.create_task(UiHandling())
- loop.create_task(TCSBusReader())
- loop.create_task(Housekeeper())
- loop.run_forever()
-
-# Booting the device
-def Boot():
- ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
- ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
- Networking.Connect(configs['disable_wifi_powersavingmode'])
- if (Networking.Status()):
- ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
- ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
- if (NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])):
- Logger.DisableTempLogfile()
- ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
- else:
- ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
- time.sleep(3)
- Reboot()
-
-#####################################################################
-
-Boot()
-try:
- asyncio.run(Main())
-except KeyboardInterrupt:
- Logger.LogMessage("Shutdown.")
- Oled.fill(0x0000)
- Oled.show()
-finally:
+import rp2
+import TimeUtils
+import Logger
+import Oled
+import Networking
+import NTP
+import uasyncio as asyncio
+import utime as time
+import Keypad
+import usocket as socket
+import urequests as requests
+from machine import ADC, Pin
+
+from secrets import secrets
+from configs import configs
+
+rp2.country(configs['country'])
+version = "0.10-beta"
+
+Oled = Oled.Oled()
+TimeUtils = TimeUtils.TimeUtils()
+Logger = Logger.Logger(configs['log_housekeeping_days'])
+Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
+NTP = NTP.NTP(Logger)
+Keypad = Keypad.Keypad()
+
+boottime = time.time()
+lastActionTicks = 0
+subscreen = 0
+sc = 0
+partyMode = False
+displayOff = False
+busline = ADC(28)
+triggerline = Pin(15, Pin.OUT)
+writerActive = False
+
+# Checks if string is integer
+def IsInt(possibleint):
+ try:
+ int(possibleint)
+ except:
+ return False
+ else:
+ return True
+
+# Reboots the Pico W (f.e. in case of an error)
+def Reboot():
+ Logger.LogMessage("Performing Reboot")
+ machine.reset()
+
+# Calculates the uptime in hours and minutes
+def Uptime():
+ global boottime
+ seconds = (time.time() - boottime) % (24 * 3600)
+ hours = seconds // 3600
+ seconds %= 3600
+ minutes = seconds // 60
+ seconds %= 60
+ return "%d:%02d" % (hours, minutes)
+
+# Shows text on the display
+def ShowText(line1, line2, line3):
+ Oled.fill(0x0000)
+ Oled.text(line1,1,2,Oled.white)
+ Oled.text(line2,1,12,Oled.white)
+ Oled.text(line3,1,22,Oled.white)
+ Oled.show()
+
+# Main method for display output
+heartbeat = "H"
+def BuildScreen():
+ global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
+ if heartbeat == "H":
+ heartbeat = " "
+ else:
+ heartbeat = "H"
+ lastActionTicks += 1
+ # after 10 Seconds of no activity, go back to main screen
+ if (lastActionTicks >= 20):
+ subscreen = 0
+ # after X Minutes turn off the display
+ if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
+ displayOff = True
+ ShowText("","","")
+ else:
+ if (subscreen == 0):
+ ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
+ elif (subscreen == 1):
+ ShowText("Eingangstuer", "getriggert", " Ext")
+ elif (subscreen == 2):
+ ShowText("Licht am Gang", "getriggert", " Ext")
+ elif (subscreen == 3):
+ ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
+ elif (subscreen == 4):
+ ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
+ elif (subscreen == 5):
+ ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
+ elif (subscreen == 6):
+ ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
+ elif (subscreen == 7):
+ ShowText("API key:", secrets['api'], "Up Dwn Ext")
+ elif (subscreen == 8):
+ ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
+ elif (subscreen == 9):
+ ShowText("Firmware vers.:", version, "Up Dwn Ext")
+ elif (subscreen == 10):
+ ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
+ elif (subscreen == 11):
+ ShowText("Perform", "reboot now", "Up Dwn OK Ext")
+ else:
+ ShowText("Error","Invalid Screen", " Ext")
+
+# Helper-method for the systemcheck-output on the display
+def ShowSystemCheck(screen):
+ global subscreen, sc
+ if (screen == "start"):
+ Logger.LogMessage("Showing Systemcheck")
+ sc = 0
+ elif (screen == "next"):
+ sc = sc + 1
+ else:
+ sc = sc - 1
+ if (sc > 7):
+ sc = 0
+ elif (sc < 0):
+ sc = 7
+ subscreen = sc + 4
+
+# Activates/deactivates the party-mode
+def TogglePartyMode():
+ global subscreen, partyMode
+ if (partyMode):
+ partyMode = False
+ ExternalAPI("partymode" + PartyModeState())
+ Logger.LogMessage("Party-Mode off")
+ else:
+ partyMode = True
+ ExternalAPI("partymode" + PartyModeState())
+ Logger.LogMessage("Party-Mode on")
+ subscreen = 3
+
+# Sets the party-mode active/inactive
+def SetPartyMode(newValue):
+ global partyMode
+ partyMode = newValue
+ ExternalAPI("partymode" + PartyModeState())
+ Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
+
+# Returns status of party-mode
+def PartyModeState():
+ global partyMode
+ if (partyMode):
+ return "enabled"
+ else:
+ return "disabled"
+
+# Returns status symbol if party-mode is active
+def PartyModeActive():
+ global PartyMode
+ if (partyMode):
+ return "P"
+ else:
+ return " "
+
+#####################################################################
+
+# Helper-method to allow error handling and output in asyncio
+def set_global_exception():
+ def handle_exception(loop, context):
+ Logger.LogMessage("Fatal error: " + str(context["exception"]))
+ import sys
+ sys.print_exception(context["exception"])
+ sys.exit()
+ Reboot()
+ loop = asyncio.get_event_loop()
+ loop.set_exception_handler(handle_exception)
+
+# Main method for all the UI-handling
+async def UiHandling():
+ global interrupt_flag, lastActionTicks, displayOff, subscreen
+ Logger.LogMessage("UI handling started")
+ while True:
+ BuildScreen()
+ if Keypad.interrupt_flag is 1:
+ Keypad.interrupt_flag = 0
+ lastActionTicks = 0
+ if (displayOff):
+ displayOff = False
+ else:
+ Keypad.interrupt_flag = 0
+ if (subscreen == 0):
+ if (Keypad.tastePressed == Keypad.taste1):
+ ShowSystemCheck("start")
+ elif (Keypad.tastePressed == Keypad.taste2):
+ TogglePartyMode()
+ elif (Keypad.tastePressed == Keypad.taste3):
+ subscreen = 2
+ Logger.LogMessage("Triggering Licht")
+ await TCSBusWriter(configs['light_trigger_message'])
+ elif (Keypad.tastePressed == Keypad.taste4):
+ subscreen = 1
+ Logger.LogMessage("Triggering Door")
+ await TCSBusWriter(configs['door_trigger_message'])
+ else:
+ Logger.LogMessage("Error: Invalid Button pressed!")
+ elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
+ if (Keypad.tastePressed == Keypad.taste1):
+ subscreen = 0
+ elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
+ if (Keypad.tastePressed == Keypad.taste1):
+ subscreen = 0
+ elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
+ Reboot()
+ elif (Keypad.tastePressed == Keypad.taste3):
+ ShowSystemCheck("next")
+ elif (Keypad.tastePressed == Keypad.taste4):
+ ShowSystemCheck("prev")
+ else:
+ Logger.LogMessage("Error: Invalid Button pressed!")
+ else:
+ if (Keypad.tastePressed == Keypad.taste1):
+ subscreen = 0
+ await asyncio.sleep(0.5)
+
+# Main method for the API
+html = """
+
+ TCS<->FHEM
+ TCS<->FHEM
+ %s
+
+"""
+json = """{ "TCS<->FHEM API":"%s" }"""
+async def APIHandling(reader, writer):
+ request_line = await reader.readline()
+ while await reader.readline() != b"\r\n":
+ pass
+ request = str(request_line)
+ try:
+ request = request.split()[1]
+ except IndexError:
+ pass
+ client_ip = writer.get_extra_info('peername')[0]
+ Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
+ if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
+ Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
+ stateis = "Error 401: Client '" + client_ip + "' is not authorized to use the API!
Set authorized client IP in configs.py!"
+ response = html % stateis
+ writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
+ else:
+ req = request.split('/')
+ stateis = ""
+ if (len(req) == 3 or len(req) == 4 or len(req) == 5):
+ if (req[1] == secrets['api']):
+ if (req[2] == "triggerdoor"):
+ Logger.LogMessage("Triggering Door")
+ await TCSBusWriter(configs['door_trigger_message'])
+ stateis = "Triggered front door opener"
+ elif (req[2] == "triggerlight"):
+ Logger.LogMessage("Triggering Licht")
+ await TCSBusWriter(configs['light_trigger_message'])
+ stateis = "Triggered light"
+ elif (req[2] == "togglepartymode"):
+ TogglePartyMode()
+ stateis = "Toggled Party-Mode"
+ elif (req[2] == "partymodeon"):
+ SetPartyMode(True)
+ stateis = "Enabled Party-Mode"
+ elif (req[2] == "partymodeoff"):
+ SetPartyMode(False)
+ stateis = "Disabled Party-Mode"
+ elif (req[2] == "partymodestate"):
+ stateis = "Party-Mode is " + PartyModeState()
+ elif (req[2] == "ping"):
+ stateis = "OK"
+ elif (req[2] == "stats"):
+ stateis = "IP address: " + Networking.GetIPAddress() + "
MAC address: " + Networking.GetMACAddress() + "
Hostname: " + configs['hostname'] + "
API Port: " + str(configs['api_port']) + "
Uptime (h:m): " + Uptime() + "
Date/Time: " + TimeUtils.DateTimeNow() + "
Version: " + version + "
GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "
Auto summertime: " + str(configs['auto_summertime']) + "
Display off time (mins): " + str(configs['displayoff']) + "
Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "
Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "
Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "
Message 'Door ringing': " + str(configs['door_ringing_message']) + "
Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "
Message 'Light triggered': " + str(configs['light_trigger_message']) + "
CPU frequency (MHz): " + str(machine.freq()/1000000) + "
Party-Mode is: " + PartyModeState()
+ elif (req[2] == "reboot"):
+ stateis = "Rebooting device now..."
+ Reboot()
+ elif (req[2] == "ringdoor"):
+ stateis = "Ringing doorbell"
+ await TCSBusWriter(configs['door_ringing_message'])
+ elif (req[2] == "ringfrontdoor"):
+ stateis = "Ringing front doorbell"
+ await TCSBusWriter(configs['frontdoor_ringing_message'])
+ elif (req[2] == "logs"):
+ if (len(req) >= 4 and req[3] != "json"):
+ if (IsInt(req[3])):
+ stateis = Logger.LastLogs(int(req[3]))
+ else:
+ stateis = "Error: Parameter for log length not an integer!"
+ else:
+ stateis = Logger.LastLogs(12)
+ else:
+ stateis = "Error: Unknown command!"
+ else:
+ stateis = "Error: API key is invalid!"
+ if ((len(req) == 4 and req[3] == "json") or (len(req) == 5 and req[4] == "json")):
+ if (req[2] != "logs"):
+ stateis = stateis.replace(": ", "\":\"")
+ stateis = stateis.replace("
", "\", \"")
+ stateis = stateis.replace("°", "°")
+ else:
+ stateis = stateis.replace(";", "\":\"")
+ stateis = stateis.replace("
", "\", \"")
+ stateis = stateis.replace("\n", "").replace("\r", "")
+ response = json % stateis
+ writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
+ else:
+ response = html % stateis
+ writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
+ else:
+ stateis = "Error 400: Invalid usage of API!
Usage: http://servername/api_key/command[/json]
Commands:- triggerdoor
- triggerlight
- togglepartymode
- partymodeon
- partymodeoff
- partymodestate
- ping
- stats
- reboot
- ringdoor
- ringfrontdoor
API Key: set 'api' in secrets.py file."
+ response = html % stateis
+ writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
+ writer.write(response)
+ await writer.drain()
+ await writer.wait_closed()
+
+microsFlanke = 0
+def microsSeitLetzterFlanke():
+ global microsFlanke
+ return time.ticks_us() - microsFlanke
+
+# Main method for the TCS:Bus reader
+async def TCSBusReader():
+ global busline, microsFlanke, partyMode, writerActive
+ zustand = False
+ Logger.LogMessage("TCS Busreader started")
+ message = []
+ while True:
+ if not writerActive:
+ busValue = busline.read_u16()
+ val = 1
+ if (busValue >= 30000): #voltage on TCS:Bus 0...65535
+ val = 1
+ else:
+ val = 0
+ #measure voltage changes and time in between
+ dauer = microsSeitLetzterFlanke()
+ if (dauer > 10000) and (message): #handle recieved message, and reset message
+ message.pop(0) #remove first timing, because we do not need it
+ for i in range(len(message)): #encode bus message (lazy, because the TCS:Bus is not as precise as we are ;) )
+ if (message[i] >= 1000 and message[i] <= 2999):
+ message[i] = 0
+ elif (message[i] >= 3000 and message[i] <= 4999):
+ message[i] = 1
+ elif (message[i] >= 5000 and message[i] <= 6999):
+ message[i] = 2
+ elif (message[i] >= 7000): #this may be an invalid message bit, but for not having numbers like '7543', we encode all this to '3'
+ message[i] = 3
+ if (message == configs['light_trigger_message']):
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
+ #nothing else to do
+ pass
+ elif (message == configs['door_trigger_message']):
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
+ #nothing else to do
+ pass
+ elif (message == configs['tcs_message']):
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Incoming TCS:Bus message from device: " + str(message))
+ #nothing else to do
+ pass
+ elif (message == configs['door_ringing_message']):
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
+ await ExternalAPI("doorbell")
+ elif (message == configs['frontdoor_ringing_message']):
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
+ if (partyMode):
+ asyncio.sleep(1)
+ Logger.LogMessage("Triggering Door and Light for Party-Mode")
+ await TCSBusWriter(configs['door_trigger_message'])
+ asyncio.sleep(1)
+ await TCSBusWriter(configs['light_trigger_message'])
+ await ExternalAPI("frontdoorbell")
+ else:
+ if (configs['log_incoming_bus_messages']):
+ Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
+ message = []
+ else:
+ if (val == 0 and zustand == False):
+ message.append(dauer)
+ zustand = True
+ microsFlanke = time.ticks_us()
+ if (val == 1 and zustand == True):
+ message.append(dauer)
+ zustand = False
+ microsFlanke = time.ticks_us()
+ await asyncio.sleep(0)
+
+# Main method for the TCS:Bus writer
+async def TCSBusWriter(message):
+ global writerActive
+ if (message):
+ busMessage = list(message)
+ writerActive = True
+ for i in range(len(busMessage)): #decode message
+ busMessage[i] = int((busMessage[i] + 1) * 2000)
+ #start sending message
+ sendZero = True
+ triggerline.on()
+ for i in range(len(busMessage)):
+ time.sleep_us(busMessage[i])
+ sendZero = not sendZero
+ if sendZero:
+ triggerline.on()
+ else:
+ triggerline.off()
+ #finally end sending message
+ triggerline.off()
+ writerActive = False
+
+# Main method for external API (=FHEM)
+async def ExternalAPI(action):
+ Logger.LogMessage("External API action: " + action)
+ if (action == "doorbell"):
+ if (configs['fhem_door_ringed'] is not ""):
+ response = requests.get(configs['fhem_door_ringed'])
+ response.close()
+ else:
+ Logger.LogMessage("Warning: No fhem_door_ringed setting!")
+ elif (action == "frontdoorbell"):
+ if (configs['fhem_frontdoor_ringed'] is not ""):
+ response = requests.get(configs['fhem_frontdoor_ringed'])
+ response.close()
+ else:
+ Logger.LogMessage("Warning: No fhem_frontdoor_ringed setting!")
+ else:
+ Logger.LogMessage("Error: Unknown ExternalAPI action!")
+ pass
+
+# Main method for daily housekeeping
+async def Housekeeper():
+ Logger.LogMessage("Housekeeper started")
+ while True:
+ Logger.LogMessage("Housekeeper is performing actions")
+ Logger.Housekeeping()
+ Logger.LogMessage("Housekeeper is performing NTP sync")
+ NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
+ Logger.LogMessage("Housekeeper has finished its jobs")
+ await asyncio.sleep(86400)
+
+# Main entry point after booting
+async def Main():
+ set_global_exception()
+ Logger.LogMessage("Entering MainLoop")
+ boottime = time.time()
+ loop = asyncio.get_event_loop()
+ ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
+ Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
+ loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
+ Logger.LogMessage("API started")
+ ShowText("TCS<->FHEM", "Firmware:", version)
+ Logger.LogMessage("Booting complete with Firmware " + version)
+ loop.create_task(UiHandling())
+ loop.create_task(TCSBusReader())
+ loop.create_task(Housekeeper())
+ loop.run_forever()
+
+# Booting the device
+def Boot():
+ ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
+ ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
+ Networking.Connect(configs['disable_wifi_powersavingmode'])
+ if (Networking.Status()):
+ ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
+ ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
+ if (NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])):
+ Logger.DisableTempLogfile()
+ ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
+ else:
+ ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
+ time.sleep(3)
+ Reboot()
+
+#####################################################################
+
+Boot()
+try:
+ asyncio.run(Main())
+except KeyboardInterrupt:
+ Logger.LogMessage("Shutdown.")
+ Oled.fill(0x0000)
+ Oled.show()
+finally:
asyncio.new_event_loop()
\ No newline at end of file