501 lines
21 KiB
Python
501 lines
21 KiB
Python
import rp2
|
|
import TimeUtils
|
|
import Logger
|
|
import Oled
|
|
import Networking
|
|
import NTP
|
|
import uasyncio as asyncio
|
|
import utime as time
|
|
import Keypad
|
|
import usocket as socket
|
|
import urequests as requests
|
|
from machine import ADC, Pin
|
|
|
|
from secrets import secrets
|
|
from configs import configs
|
|
|
|
rp2.country(configs['country'])
|
|
version = "0.11-beta"
|
|
|
|
Oled = Oled.Oled()
|
|
TimeUtils = TimeUtils.TimeUtils()
|
|
Logger = Logger.Logger(configs['log_housekeeping_days'])
|
|
Networking = Networking.Networking(Logger, secrets['ssid'], secrets['pw'])
|
|
NTP = NTP.NTP(Logger)
|
|
Keypad = Keypad.Keypad()
|
|
|
|
boottime = time.time()
|
|
lastActionTicks = 0
|
|
subscreen = 0
|
|
sc = 0
|
|
partyMode = False
|
|
displayOff = False
|
|
busline = ADC(28)
|
|
triggerline = Pin(15, Pin.OUT)
|
|
writerActive = False
|
|
|
|
# Checks if string is integer
|
|
def IsInt(possibleint):
|
|
try:
|
|
int(possibleint)
|
|
except:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
# Reboots the Pico W (f.e. in case of an error)
|
|
def Reboot():
|
|
Logger.LogMessage("Performing Reboot")
|
|
machine.reset()
|
|
|
|
# Calculates the uptime in hours and minutes
|
|
def Uptime():
|
|
global boottime
|
|
seconds = (time.time() - boottime) % (24 * 3600)
|
|
hours = seconds // 3600
|
|
seconds %= 3600
|
|
minutes = seconds // 60
|
|
seconds %= 60
|
|
return "%d:%02d" % (hours, minutes)
|
|
|
|
# Shows text on the display
|
|
def ShowText(line1, line2, line3):
|
|
Oled.fill(0x0000)
|
|
Oled.text(line1,1,2,Oled.white)
|
|
Oled.text(line2,1,12,Oled.white)
|
|
Oled.text(line3,1,22,Oled.white)
|
|
Oled.show()
|
|
|
|
# Main method for display output
|
|
heartbeat = "H"
|
|
def BuildScreen():
|
|
global subscreen, partyMode, lastActionTicks, displayOff, heartbeat
|
|
if heartbeat == "H":
|
|
heartbeat = " "
|
|
else:
|
|
heartbeat = "H"
|
|
lastActionTicks += 1
|
|
# after 10 Seconds of no activity, go back to main screen
|
|
if (lastActionTicks >= 20):
|
|
subscreen = 0
|
|
# after X Minutes turn off the display
|
|
if (lastActionTicks >= (configs['displayoff'] * 60 * 2)):
|
|
displayOff = True
|
|
ShowText("","","")
|
|
else:
|
|
if (subscreen == 0):
|
|
ShowText("TCS<->FHEM " + PartyModeActive() + " " + Networking.IsWifiConnected() + " " + heartbeat + "", TimeUtils.DateTimeNow(), "Auf LiG PaM Chk")
|
|
elif (subscreen == 1):
|
|
ShowText("Eingangstuer", "getriggert", " Ext")
|
|
elif (subscreen == 2):
|
|
ShowText("Licht am Gang", "getriggert", " Ext")
|
|
elif (subscreen == 3):
|
|
ShowText("Party-Mode", ("aktiviert" if partyMode else "deaktiviert"), " Ext")
|
|
elif (subscreen == 4):
|
|
ShowText("Hostname:", configs['hostname'], "Up Dwn Ext")
|
|
elif (subscreen == 5):
|
|
ShowText("MAC Address:", Networking.GetMACAddress(), "Up Dwn Ext")
|
|
elif (subscreen == 6):
|
|
ShowText("IP Address:", Networking.GetIPAddress(), "Up Dwn Ext")
|
|
elif (subscreen == 7):
|
|
ShowText("API key:", secrets['api'], "Up Dwn Ext")
|
|
elif (subscreen == 8):
|
|
ShowText("CPU Frequency:", str(machine.freq()/1000000) + " MHz", "Up Dwn Ext")
|
|
elif (subscreen == 9):
|
|
ShowText("Firmware vers.:", version, "Up Dwn Ext")
|
|
elif (subscreen == 10):
|
|
ShowText("Uptime (H:m):", Uptime(), "Up Dwn Ext")
|
|
elif (subscreen == 11):
|
|
ShowText("Perform", "reboot now", "Up Dwn OK Ext")
|
|
else:
|
|
ShowText("Error","Invalid Screen", " Ext")
|
|
|
|
# Helper-method for the systemcheck-output on the display
|
|
def ShowSystemCheck(screen):
|
|
global subscreen, sc
|
|
if (screen == "start"):
|
|
Logger.LogMessage("Showing Systemcheck")
|
|
sc = 0
|
|
elif (screen == "next"):
|
|
sc = sc + 1
|
|
else:
|
|
sc = sc - 1
|
|
if (sc > 7):
|
|
sc = 0
|
|
elif (sc < 0):
|
|
sc = 7
|
|
subscreen = sc + 4
|
|
|
|
# Activates/deactivates the party-mode
|
|
def TogglePartyMode():
|
|
global subscreen, partyMode
|
|
if (partyMode):
|
|
partyMode = False
|
|
ExternalAPI("partymode" + PartyModeState())
|
|
Logger.LogMessage("Party-Mode off")
|
|
else:
|
|
partyMode = True
|
|
ExternalAPI("partymode" + PartyModeState())
|
|
Logger.LogMessage("Party-Mode on")
|
|
subscreen = 3
|
|
|
|
# Sets the party-mode active/inactive
|
|
def SetPartyMode(newValue):
|
|
global partyMode
|
|
partyMode = newValue
|
|
ExternalAPI("partymode" + PartyModeState())
|
|
Logger.LogMessage("Setting Party-Mode via API to " + PartyModeState())
|
|
|
|
# Returns status of party-mode
|
|
def PartyModeState():
|
|
global partyMode
|
|
if (partyMode):
|
|
return "enabled"
|
|
else:
|
|
return "disabled"
|
|
|
|
# Returns status symbol if party-mode is active
|
|
def PartyModeActive():
|
|
global PartyMode
|
|
if (partyMode):
|
|
return "P"
|
|
else:
|
|
return " "
|
|
|
|
#####################################################################
|
|
|
|
# Helper-method to allow error handling and output in asyncio
|
|
def set_global_exception():
|
|
def handle_exception(loop, context):
|
|
Logger.LogMessage("Fatal error: " + str(context["exception"]))
|
|
import sys
|
|
sys.print_exception(context["exception"])
|
|
sys.exit()
|
|
Reboot()
|
|
loop = asyncio.get_event_loop()
|
|
loop.set_exception_handler(handle_exception)
|
|
|
|
# Main method for all the UI-handling
|
|
async def UiHandling():
|
|
global interrupt_flag, lastActionTicks, displayOff, subscreen
|
|
Logger.LogMessage("UI handling started")
|
|
while True:
|
|
BuildScreen()
|
|
if Keypad.interrupt_flag is 1:
|
|
Keypad.interrupt_flag = 0
|
|
lastActionTicks = 0
|
|
if (displayOff):
|
|
displayOff = False
|
|
else:
|
|
Keypad.interrupt_flag = 0
|
|
if (subscreen == 0):
|
|
if (Keypad.tastePressed == Keypad.taste1):
|
|
ShowSystemCheck("start")
|
|
elif (Keypad.tastePressed == Keypad.taste2):
|
|
TogglePartyMode()
|
|
elif (Keypad.tastePressed == Keypad.taste3):
|
|
subscreen = 2
|
|
Logger.LogMessage("Triggering Licht")
|
|
await TCSBusWriter(configs['light_trigger_message'])
|
|
elif (Keypad.tastePressed == Keypad.taste4):
|
|
subscreen = 1
|
|
Logger.LogMessage("Triggering Door")
|
|
await TCSBusWriter(configs['door_trigger_message'])
|
|
else:
|
|
Logger.LogMessage("Error: Invalid Button pressed!")
|
|
elif (subscreen == 1 or subscreen == 2 or subscreen == 3):
|
|
if (Keypad.tastePressed == Keypad.taste1):
|
|
subscreen = 0
|
|
elif (subscreen == 4 or subscreen == 5 or subscreen == 6 or subscreen == 7 or subscreen == 8 or subscreen == 9 or subscreen == 10 or subscreen == 11):
|
|
if (Keypad.tastePressed == Keypad.taste1):
|
|
subscreen = 0
|
|
elif (subscreen == 11 and Keypad.tastePressed == Keypad.taste2):
|
|
Reboot()
|
|
elif (Keypad.tastePressed == Keypad.taste3):
|
|
ShowSystemCheck("next")
|
|
elif (Keypad.tastePressed == Keypad.taste4):
|
|
ShowSystemCheck("prev")
|
|
else:
|
|
Logger.LogMessage("Error: Invalid Button pressed!")
|
|
else:
|
|
if (Keypad.tastePressed == Keypad.taste1):
|
|
subscreen = 0
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Main method for the API
|
|
html = """<!DOCTYPE html>
|
|
<html>
|
|
<head> <title>TCS<->FHEM</title> </head>
|
|
<body> <h1>TCS<->FHEM</h1>
|
|
<p>%s</p>
|
|
</body>
|
|
</html>"""
|
|
json = """{ "TCS<->FHEM API": { "%s" } }"""
|
|
async def APIHandling(reader, writer):
|
|
request_line = await reader.readline()
|
|
while await reader.readline() != b"\r\n":
|
|
pass
|
|
request = str(request_line)
|
|
try:
|
|
request = request.split()[1]
|
|
except IndexError:
|
|
pass
|
|
client_ip = writer.get_extra_info('peername')[0]
|
|
Logger.LogMessage("API request: " + request + " - from client IP: " + client_ip)
|
|
if (configs['api_client_ip'] != "") and (configs['api_client_ip'] != client_ip):
|
|
Logger.LogMessage("Unauthorized client! Aborting API Handling now.")
|
|
stateis = "<b>Error 401:</b> Client '" + client_ip + "' is not authorized to use the API!<br><br>Set authorized client IP in configs.py!"
|
|
response = html % stateis
|
|
writer.write('HTTP/1.0 401 Unauthorized\r\nContent-type: text/html\r\n\r\n')
|
|
else:
|
|
req = request.split('/')
|
|
stateis = ""
|
|
if (len(req) == 3 or len(req) == 4 or len(req) == 5):
|
|
if (req[1] == secrets['api']):
|
|
if (req[2] == "triggerdoor"):
|
|
Logger.LogMessage("Triggering Door")
|
|
await TCSBusWriter(configs['door_trigger_message'])
|
|
stateis = "Triggered: front door opener"
|
|
elif (req[2] == "triggerlight"):
|
|
Logger.LogMessage("Triggering Licht")
|
|
await TCSBusWriter(configs['light_trigger_message'])
|
|
stateis = "Triggered: light"
|
|
elif (req[2] == "togglepartymode"):
|
|
TogglePartyMode()
|
|
stateis = "Toggled: Party-Mode"
|
|
elif (req[2] == "partymodeon"):
|
|
SetPartyMode(True)
|
|
stateis = "Enabled: Party-Mode"
|
|
elif (req[2] == "partymodeoff"):
|
|
SetPartyMode(False)
|
|
stateis = "Disabled: Party-Mode"
|
|
elif (req[2] == "partymodestate"):
|
|
stateis = "Party-Mode is " + PartyModeState()
|
|
elif (req[2] == "ping"):
|
|
stateis = "Ping: OK"
|
|
elif (req[2] == "specialcommand"):
|
|
ExternalAPI("special")
|
|
stateis = "Special command: executed"
|
|
elif (req[2] == "stats"):
|
|
stateis = "IP address: " + Networking.GetIPAddress() + "<br>MAC address: " + Networking.GetMACAddress() + "<br>Hostname: " + configs['hostname'] + "<br>API Port: " + str(configs['api_port']) + "<br>Uptime (h:m): " + Uptime() + "<br>Date/Time: " + TimeUtils.DateTimeNow() + "<br>Version: " + version + "<br>GMT Timezone Offset (hours): " + str(configs['gmt_offset']) + "<br>Auto summertime: " + str(configs['auto_summertime']) + "<br>Display off time (mins): " + str(configs['displayoff']) + "<br>Log incoming bus messages: " + str(configs['log_incoming_bus_messages']) + "<br>Housekeep logfiles after days: " + str(configs['log_housekeeping_days']) + "<br>Message 'Front door ringing': " + str(configs['frontdoor_ringing_message']) + "<br>Message 'Door ringing': " + str(configs['door_ringing_message']) + "<br>Message 'Door opener triggered': " + str(configs['door_trigger_message']) + "<br>Message 'Light triggered': " + str(configs['light_trigger_message']) + "<br>CPU frequency (MHz): " + str(machine.freq()/1000000) + "<br>Party-Mode is: " + PartyModeState()
|
|
elif (req[2] == "reboot"):
|
|
stateis = "Rebooting device: now..."
|
|
Reboot()
|
|
elif (req[2] == "ringdoor"):
|
|
stateis = "Ringing: doorbell"
|
|
await TCSBusWriter(configs['door_ringing_message'])
|
|
elif (req[2] == "ringfrontdoor"):
|
|
stateis = "Ringing: front doorbell"
|
|
await TCSBusWriter(configs['frontdoor_ringing_message'])
|
|
elif (req[2] == "logs"):
|
|
if (len(req) >= 4 and req[3] != "json"):
|
|
if (IsInt(req[3])):
|
|
stateis = Logger.LastLogs(int(req[3]))
|
|
else:
|
|
stateis = "<b>Error:</b> Parameter for log length not an integer!"
|
|
else:
|
|
stateis = Logger.LastLogs(12)
|
|
else:
|
|
stateis = "<b>Error:</b> Unknown command!"
|
|
else:
|
|
stateis = "<b>Error:</b> API key is invalid!"
|
|
if ((len(req) == 4 and req[3] == "json") or (len(req) == 5 and req[4] == "json")):
|
|
if (req[2] != "logs"):
|
|
stateis = stateis.replace(": ", "\":\"")
|
|
stateis = stateis.replace("<br>", "\", \"")
|
|
stateis = stateis.replace("°", "°")
|
|
else:
|
|
stateis = stateis.replace(";", "\":\"")
|
|
stateis = stateis.replace("<br>", "\", \"")
|
|
stateis = stateis.replace("\n", "").replace("\r", "")
|
|
response = json % stateis
|
|
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/json\r\n\r\n')
|
|
else:
|
|
response = html % stateis
|
|
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
|
|
else:
|
|
stateis = "<b>Error 400:</b> Invalid usage of API!<br><br><u>Usage:</u> http://servername/api_key/command[/json]<br><br><u>Commands:</u><ul><li>triggerdoor</li><li>triggerlight</li><li>togglepartymode</li><li>partymodeon</li><li>partymodeoff</li><li>partymodestate</li><li>ping</li><li>specialcommand</li><li>stats</li><li>reboot</li><li>ringdoor</li><li>ringfrontdoor</li><li>logs</li></ul><br><u>API Key:</u> set 'api' in secrets.py file."
|
|
response = html % stateis
|
|
writer.write('HTTP/1.0 400 Bad Request\r\nContent-type: text/html\r\n\r\n')
|
|
writer.write(response)
|
|
await writer.drain()
|
|
await writer.wait_closed()
|
|
|
|
microsFlanke = 0
|
|
def microsSeitLetzterFlanke():
|
|
global microsFlanke
|
|
return time.ticks_us() - microsFlanke
|
|
|
|
# Main method for the TCS:Bus reader
|
|
async def TCSBusReader():
|
|
global busline, microsFlanke, partyMode, writerActive
|
|
zustand = False
|
|
Logger.LogMessage("TCS Busreader started")
|
|
message = []
|
|
while True:
|
|
if not writerActive:
|
|
busValue = busline.read_u16()
|
|
val = 1
|
|
if (busValue >= 30000): #voltage on TCS:Bus 0...65535
|
|
val = 1
|
|
else:
|
|
val = 0
|
|
#measure voltage changes and time in between
|
|
dauer = microsSeitLetzterFlanke()
|
|
if (dauer > 10000) and (message): #handle recieved message, and reset message
|
|
message.pop(0) #remove first timing, because we do not need it
|
|
for i in range(len(message)): #encode bus message (lazy, because the TCS:Bus is not as precise as we are ;) )
|
|
if (message[i] >= 1000 and message[i] <= 2999):
|
|
message[i] = 0
|
|
elif (message[i] >= 3000 and message[i] <= 4999):
|
|
message[i] = 1
|
|
elif (message[i] >= 5000 and message[i] <= 6999):
|
|
message[i] = 2
|
|
elif (message[i] >= 7000): #this may be an invalid message bit, but for not having numbers like '7543', we encode all this to '3'
|
|
message[i] = 3
|
|
if (message == configs['light_trigger_message']):
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Incoming TCS:Bus message for triggering light: " + str(message))
|
|
#nothing else to do
|
|
pass
|
|
elif (message == configs['door_trigger_message']):
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Incoming TCS:Bus message for door trigger: " + str(message))
|
|
#nothing else to do
|
|
pass
|
|
elif (message == configs['tcs_message']):
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Incoming TCS:Bus message from device: " + str(message))
|
|
#nothing else to do
|
|
pass
|
|
elif (message == configs['door_ringing_message']):
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Incoming TCS:Bus message for door ringing: " + str(message))
|
|
await ExternalAPI("doorbell")
|
|
elif (message == configs['frontdoor_ringing_message']):
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Incoming TCS:Bus message for frontdoor ringing: " + str(message))
|
|
if (partyMode):
|
|
asyncio.sleep(1)
|
|
Logger.LogMessage("Triggering Door and Light for Party-Mode")
|
|
await TCSBusWriter(configs['door_trigger_message'])
|
|
asyncio.sleep(1)
|
|
await TCSBusWriter(configs['light_trigger_message'])
|
|
await ExternalAPI("frontdoorbell")
|
|
else:
|
|
if (configs['log_incoming_bus_messages']):
|
|
Logger.LogMessage("Unknown TCS:Bus message: " + str(message))
|
|
message = []
|
|
else:
|
|
if (val == 0 and zustand == False):
|
|
message.append(dauer)
|
|
zustand = True
|
|
microsFlanke = time.ticks_us()
|
|
if (val == 1 and zustand == True):
|
|
message.append(dauer)
|
|
zustand = False
|
|
microsFlanke = time.ticks_us()
|
|
await asyncio.sleep(0)
|
|
|
|
# Main method for the TCS:Bus writer
|
|
async def TCSBusWriter(message):
|
|
global writerActive
|
|
if (message):
|
|
busMessage = list(message)
|
|
writerActive = True
|
|
for i in range(len(busMessage)): #decode message
|
|
busMessage[i] = int((busMessage[i] + 1) * 2000)
|
|
#start sending message
|
|
sendZero = True
|
|
triggerline.on()
|
|
for i in range(len(busMessage)):
|
|
time.sleep_us(busMessage[i])
|
|
sendZero = not sendZero
|
|
if sendZero:
|
|
triggerline.on()
|
|
else:
|
|
triggerline.off()
|
|
#finally end sending message
|
|
triggerline.off()
|
|
writerActive = False
|
|
|
|
# Main method for external API (=FHEM)
|
|
async def ExternalAPI(action):
|
|
Logger.LogMessage("External API action: " + action)
|
|
if (action == "doorbell"):
|
|
if (configs['fhem_door_ringed'] is not ""):
|
|
response = requests.get(configs['fhem_door_ringed'])
|
|
response.close()
|
|
else:
|
|
Logger.LogMessage("Warning: No fhem_door_ringed setting!")
|
|
elif (action == "frontdoorbell"):
|
|
if (configs['fhem_frontdoor_ringed'] is not ""):
|
|
response = requests.get(configs['fhem_frontdoor_ringed'])
|
|
response.close()
|
|
else:
|
|
Logger.LogMessage("Warning: No fhem_frontdoor_ringed setting!")
|
|
elif (action == "special"):
|
|
if (configs['fhem_special_command'] is not ""):
|
|
response = requests.get(configs['fhem_special_command'])
|
|
response.close()
|
|
else:
|
|
Logger.LogMessage("Warning: No fhem_special_command setting!")
|
|
else:
|
|
Logger.LogMessage("Error: Unknown ExternalAPI action!")
|
|
pass
|
|
|
|
# Main method for daily housekeeping
|
|
async def Housekeeper():
|
|
Logger.LogMessage("Housekeeper started")
|
|
while True:
|
|
Logger.LogMessage("Housekeeper is performing actions")
|
|
Logger.Housekeeping()
|
|
Logger.LogMessage("Housekeeper is performing NTP sync")
|
|
NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])
|
|
Logger.LogMessage("Housekeeper has finished its jobs")
|
|
await asyncio.sleep(86400)
|
|
|
|
# Main entry point after booting
|
|
async def Main():
|
|
set_global_exception()
|
|
Logger.LogMessage("Entering MainLoop")
|
|
boottime = time.time()
|
|
loop = asyncio.get_event_loop()
|
|
ShowText("Booting [3/3]", "API Setup: key", secrets['api'])
|
|
Logger.LogMessage("Setting up API on port " + str(configs['api_port']) + " with key " + secrets['api'])
|
|
loop.create_task(asyncio.start_server(APIHandling, Networking.GetIPAddress(), configs['api_port']))
|
|
Logger.LogMessage("API started")
|
|
ShowText("TCS<->FHEM", "Firmware:", version)
|
|
Logger.LogMessage("Booting complete with Firmware " + version)
|
|
loop.create_task(UiHandling())
|
|
loop.create_task(TCSBusReader())
|
|
loop.create_task(Housekeeper())
|
|
loop.run_forever()
|
|
|
|
# Booting the device
|
|
def Boot():
|
|
ShowText("Booting [1/3]", "Conn. Wifi:", secrets['ssid'] + "...")
|
|
ShowText("Booting [1/3]", "Conn. Wifi: MAC", Networking.GetMACAddress())
|
|
Networking.Connect(configs['disable_wifi_powersavingmode'])
|
|
if (Networking.Status()):
|
|
ShowText("Booting [1/3]", "Conn. Wifi: IP", Networking.GetIPAddress())
|
|
ShowText("Booting [2/3]", "NTP time:", configs['ntp_host'])
|
|
if (NTP.SetRTCTimeFromNTP(configs['ntp_host'], configs['gmt_offset'], configs['auto_summertime'])):
|
|
Logger.DisableTempLogfile()
|
|
ShowText("Booting [2/3]", "NTP time:", TimeUtils.DateTimeNow())
|
|
else:
|
|
ShowText("Booting [1/3]", "Conn. Wifi:", "ERROR!")
|
|
time.sleep(3)
|
|
Reboot()
|
|
|
|
#####################################################################
|
|
|
|
Boot()
|
|
try:
|
|
asyncio.run(Main())
|
|
except KeyboardInterrupt:
|
|
Logger.LogMessage("Shutdown.")
|
|
Oled.fill(0x0000)
|
|
Oled.show()
|
|
finally:
|
|
asyncio.new_event_loop() |