TCS2FHEM/main.py

420 lines
15 KiB
Python

import rp2
import TimeUtils
import Logger
import Oled
import Networking
import NTP
import uasyncio as asyncio
import utime as time
import Keypad
import usocket as socket
from secrets import secrets
from configs import configs
rp2.country(configs['country'])
version = "0.4-alpha"
Oled = Oled.Oled()
TimeUtils = TimeUtils.TimeUtils()
Logger = Logger.Logger(configs['log_housekeeping_days'])
Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
NTP = NTP.NTP(Logger)
Keypad = Keypad.Keypad()
boottime = time.time()
lastActionTicks = 0
subscreen = 0
sc = 0
partyMode = False
displayOff = False
busline = machine.ADC(28)
# Führt einen Reboot des Pico aus (z.B. im Fehlerfall)
def Reboot():
Logger.LogMessage("Performing Reboot")
machine.reset()
# Berechnet die Uptime in Sekunden
def Uptime():
global boottime
seconds = (time.time() - boottime) % (24 * 3600)
hours = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return "%d:%02d" % (hours, minutes)
# Zeigt Text am Display an
def ShowText(line1, line2, line3):
Oled.fill(0x0000)
Oled.text(line1,1,2,Oled.white)
Oled.text(line2,1,12,Oled.white)
Oled.text(line3,1,22,Oled.white)
Oled.show()
# Bildschirmausgabe
def BuildScreen():
global subscreen, partyMode, lastActionTicks, displayOff
lastActionTicks += 1
# after 10 Seconds of no activity, go back to main screen
if (lastActionTicks >= 20):
subscreen = 0
# after X Minutes turn off the display
if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
displayOff = True
ShowText("","","")
else:
if (subscreen == 0):
ShowText("TCS<->FHEM", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
elif (subscreen == 1):
ShowText("Eingangstuer", "getriggert", " Ext")
elif (subscreen == 2):
ShowText("Licht im Gang", "getriggert", " Ext")
elif (subscreen == 3):
ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
elif (subscreen == 4):
ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
elif (subscreen == 5):
ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
elif (subscreen == 6):
ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
elif (subscreen == 7):
ShowText("API key:", secrets['api'], "Up Dwn Ext")
elif (subscreen == 8):
ShowText("CPU Frequency:", str(machine.freq()/1000000) + " Hz", "Up Dwn Ext")
elif (subscreen == 9):
ShowText("Firmware vers.:", version, "Up Dwn Ext")
elif (subscreen == 10):
ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
elif (subscreen == 11):
ShowText("Perform", "reboot now", "Up Dwn OK Ext")
else:
ShowText("Error","Invalid Screen", " Ext")
# Helfermethode für Systemcheck-Anzeige
def ShowSystemCheck(screen):
global subscreen, sc
if (screen == "start"):
Logger.LogMessage("Showing Systemcheck")
sc = 0
elif (screen == "next"):
sc = sc + 1
else:
sc = sc - 1
if (sc > 7):
sc = 0
elif (sc < 0):
sc = 7
subscreen = sc + 4
# Aktiviert/Deaktiviert den Party-Mode
def TogglePartyMode():
global subscreen, partyMode
if (partyMode):
partyMode = False
Logger.LogMessage("Party-Mode off")
else:
partyMode = True
Logger.LogMessage("Party-Mode on")
subscreen = 3
# Setzt den Party-Mode auf Aktiv/Inaktiv
def SetPartyMode(newValue):
global partyMode
partyMode = newValue
Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
# Gibt den Status des Party-Mode zurück
def PartyModeState():
global partyMode
if (partyMode):
return "enabled"
else:
return "disabled"
# Triggert das Licht am Gang
def TriggerLicht():
#todo
global subscreen
subscreen = 2
Logger.LogMessage("Triggering Licht")
# Triggert den Türöffner
def TriggerDoor():
#todo
global subscreen
subscreen = 1
Logger.LogMessage("Triggering Door")
#####################################################################
# Helfer-Methode, um Fehler auch in asyncio ausgeben zu können
def set_global_exception():
def handle_exception(loop, context):
Logger.LogMessage("Fatal error: " + str(context["exception"]))
#todo bei prod-version folgende Zeilen einkommentieren
#import sys
#sys.print_exception(context["exception"])
#sys.exit()
#Reboot()
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
# Haupt-Methode für das ganze UI-Handling
async def UiHandling():
global interrupt_flag, lastActionTicks, displayOff, subscreen
Logger.LogMessage("UI handling started")
while True:
BuildScreen()
if Keypad.interrupt_flag is 1:
Keypad.interrupt_flag = 0
lastActionTicks = 0
if (displayOff):
displayOff = False
else:
Keypad.interrupt_flag = 0
if (subscreen == 0):
if (Keypad.tastePressed == Keypad.taste1):
ShowSystemCheck("start")
elif (Keypad.tastePressed == Keypad.taste2):
TogglePartyMode()
elif (Keypad.tastePressed == Keypad.taste3):
TriggerLicht()
elif (Keypad.tastePressed == Keypad.taste4):
TriggerDoor()
else:
Logger.LogMessage("Error: Invalid Button pressed!")
elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
Reboot()
elif (Keypad.tastePressed == Keypad.taste3):
ShowSystemCheck("next")
elif (Keypad.tastePressed == Keypad.taste4):
ShowSystemCheck("prev")
else:
Logger.LogMessage("Error: Invalid Button pressed!")
else:
if (Keypad.tastePressed == Keypad.taste1):
subscreen = 0
await asyncio.sleep(0.5)
# Hauptmethode für die API
html = """<!DOCTYPE html>
<html>
<head> <title>TCS<->FHEM</title> </head>
<body> <h1>TCS<->FHEM</h1>
<p>%s</p>
</body>
</html>"""
json = """{ "TCS<->FHEM API":"%s" }"""
async def APIHandling(reader, writer):
request_line = await reader.readline()
while await reader.readline() != b"\r\n":
pass
request = str(request_line)
try:
request = request.split()[1]
except IndexError:
pass
Logger.LogMessage("API request: " + request + " - from client IP: " + writer.get_extra_info('peername')[0])
req = request.split('/')
stateis = ""
if (len(req) == 3 or len(req) == 4):
if (req[1] == secrets['api']):
if (req[2] == "triggerdoor"):
TriggerDoor()
stateis = "Triggered front door opener"
elif (req[2] == "triggerlight"):
TriggerLicht()
stateis = "Triggered light"
elif (req[2] == "togglepartymode"):
TogglePartyMode()
stateis = "Toggled Party-Mode"
elif (req[2] == "partymodeon"):
SetPartyMode(True)
stateis = "Enabled Party-Mode"
elif (req[2] == "partymodeoff"):
SetPartyMode(False)
stateis = "Disabled Party-Mode"
elif (req[2] == "partymodestate"):
stateis = "Party-Mode is " + PartyModeState()
elif (req[2] == "ping"):
stateis = "OK"
elif (req[2] == "stats"):
stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + configs['frontdoor_ringing_message'] + "<br>Message 'Door ringing': " + configs['door_ringing_message'] + "<br>Message 'Door opener triggered': " + configs['door_trigger_message'] + "<br>Message 'Light triggered': " + configs['light_trigger_message'] + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000)
elif (req[2] == "reboot"):
stateis = "Rebooting device now..."
#todo reboot ausführen
else:
stateis = "Error: Unknown command!"
else:
stateis = "<b>Error:</b> API key is invalid!"
if (len(req) == 4 and req[3] == "json"):
response = json % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
else:
response = html % stateis
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
else:
stateis = "<b>Error:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>stats</li></ul><br><u>API Key:</u> set 'api' in secrets.py file."
response = html % stateis
writer.write('HTTP/1.0 500 Server Error\r\nContent-type: text/html\r\n\r\n')
writer.write(response)
await writer.drain()
await writer.wait_closed()
microsFlanke = 0
def microsSeitLetzterFlanke():
global microsFlanke
return time.ticks_us() - microsFlanke
sequenz = [
6000,
4000,
2000,
2000,
2000,
2000,
4000,
4000,
4000,
2000,
2000,
2000,
4000,
2000,
2000,
2000,
4000,
2000,
4000,
4000,
4000,
2000,
2000,
2000,
4000,
2000,
4000,
2000,
2000,
2000,
2000,
2000,
2000,
2000,
4000,
5000,
6000,
2000,
2000,
2000,
2000,
4000,
2000
]
# Hauptmethode für den TCS Bus Reader
async def TCSBusReader():
global busline, microsFlanke
sequenzZaehler = 0
sequenzLaeuft = False
sequenzLaenge = 43
zustand = False
jitter = 400
Logger.LogMessage("TCS Busreader started")
while True:
reading = busline.read_u16()
val = 1
if (reading >= 50000):
val = 1
else:
val = 0
if (val == 0 and zustand == False):
if (sequenzLaeuft == False):
sequenzLaeuft = True
else:
if (abs(microsSeitLetzterFlanke() - sequenz[sequenzZaehler]) < jitter):
sequenzZaehler += 1
else:
sequenzLaeuft = False
sequenzZaehler = 0
print("Positive Flanke nach " + str(microsSeitLetzterFlanke()) + "us")
zustand = True
microsFlanke = time.ticks_us()
if (val == 1 and zustand == True):
if (sequenzLaeuft):
if (abs(microsSeitLetzterFlanke() - sequenz[sequenzZaehler]) < jitter):
sequenzZaehler += 1
else:
sequenzLaeuft = False
sequenzZaehler = 0
print("Negative Flanke nach " + str(microsSeitLetzterFlanke()) + "us")
zustand = False
microsFlanke = time.ticks_us()
if (sequenzLaeuft and sequenzZaehler >= sequenzLaenge):
print("Sequenz fertig!")
#todo sendMessage()
sequenzLaeuft = False
sequenzZaehler = 0
#############
#print("tcs")
#if doorbell ringing is recognized, trigger external api
#if frontdoorbell ringing is recognized, trigger external api
#if frontdoorbell ringing is recognized and party-mode enabled, wait a short time and trigger DoorOpener and Licht
#if any other message and log_incoming_bus_messages is true, log the messages to the bus (it may help identify useful messages, or if someone ringed at your neighbours door)
await asyncio.sleep(0)
# Hauptmethode für das tägliche Housekeeping
async def Housekeeper():
#todo wenn es 1:00 Uhr ist, housekeeping ausführen
Logger.LogMessage("Housekeeper started")
while True:
Logger.Housekeeping()
await asyncio.sleep(60)
# Hauptroutine
async def Main():
set_global_exception()
Logger.LogMessage("Entering MainLoop")
boottime = time.time()
loop = asyncio.get_event_loop()
ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
Logger.LogMessage("API started")
ShowText("TCS<->FHEM", "Firmware:", version)
Logger.LogMessage("Booting complete with Firmware " + version)
loop.create_task(UiHandling())
loop.create_task(TCSBusReader())
loop.create_task(Housekeeper())
loop.run_forever()
# Booten des Device
def Boot():
Logger.Housekeeping()
ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
Networking.Connect(configs['disable_wifi_powersavingmode'])
if (Networking.Status()):
ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
else:
ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
#####################################################################
Boot()
try:
asyncio.run(Main())
except KeyboardInterrupt:
Logger.LogMessage("Shutdown.")
finally:
asyncio.new_event_loop()