124 lines
3.4 KiB
Python
124 lines
3.4 KiB
Python
from time import sleep
|
|
from flask import Flask, render_template, request
|
|
from blinker import Namespace
|
|
import threading
|
|
import RPi.GPIO as GPIO
|
|
import copy
|
|
import os
|
|
from pprint import pprint
|
|
from datetime import datetime
|
|
from tinydb import TinyDB, Query
|
|
|
|
#RPI_LGPIO_REVISION="800032" entr python app.p
|
|
#ledpin = 4 # set output LED pin
|
|
#pushpin = 17 # set input push button pin
|
|
pushpins = [17, 27, 22, 10, 9, 11, 0, 5]
|
|
|
|
app = Flask(__name__)
|
|
app.jinja_options['variable_start_string'] = '[['
|
|
app.jinja_options['variable_end_string'] = ']]'
|
|
|
|
my_signals = Namespace()
|
|
value_toggled = my_signals.signal('value-toggled')
|
|
|
|
|
|
db = TinyDB(os.path.dirname(__file__) + '/db.json')
|
|
DbState = Query()
|
|
|
|
class DoorState:
|
|
door1 = True
|
|
states = {}
|
|
heartbeat = 0
|
|
|
|
def __init__(self):
|
|
for i in range(0, 8):
|
|
self.states[i] = {
|
|
'closed': False,
|
|
'date': False,
|
|
'index': i,
|
|
}
|
|
db_item = db.get(DbState.index == i)
|
|
if db_item:
|
|
self.states[i] = db_item
|
|
else:
|
|
db.insert(self.states[i])
|
|
|
|
door_state = DoorState()
|
|
|
|
|
|
@value_toggled.connect
|
|
def on_value_toggled(sender, pin_states):
|
|
#door_state.door1 = not door_state.door1
|
|
#door_state.door1 = pin_state
|
|
#pprint(pin_states)
|
|
for i in pin_states:
|
|
#print(i)
|
|
#print(pin_states[i])
|
|
if door_state.states[i]['closed'] == pin_states[i]:
|
|
door_state.states[i]['closed'] = not pin_states[i]
|
|
door_state.states[i]['date'] = datetime.now().isoformat()
|
|
db.update(door_state.states[i], DbState.index == i)
|
|
|
|
#pprint(door_state.states)
|
|
|
|
thread_event = threading.Event()
|
|
|
|
def backgroundTask(signal):
|
|
while thread_event.is_set():
|
|
#pin_state = GPIO.input(pushpin)
|
|
pin_states = {}
|
|
for i, val in enumerate(pushpins):
|
|
pin_states[i] = GPIO.input(val)
|
|
signal.send(app, pin_states=pin_states)
|
|
sleep(1)
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/details')
|
|
def details():
|
|
return render_template('details.html')
|
|
|
|
@app.route('/status', methods=['GET', 'POST'])
|
|
def status():
|
|
data = request.get_json()
|
|
heartbeat = not data['heartbeat']
|
|
states = copy.deepcopy(door_state.states)
|
|
for i in states:
|
|
if not states[i]['date']:
|
|
continue
|
|
date = datetime.fromisoformat(
|
|
states[i]['date']
|
|
)
|
|
date = date.strftime('%d.%m.%Y %H:%M:%S')
|
|
states[i]['date'] = date
|
|
return {
|
|
'states': states,
|
|
'heartbeat': heartbeat,
|
|
}
|
|
|
|
@app.route('/toggle')
|
|
def toggle():
|
|
value_toggled.send(app)
|
|
return 'OK'
|
|
|
|
if __name__ == '__main__':
|
|
GPIO.setmode(GPIO.BCM)
|
|
#GPIO.setup(ledpin, GPIO.OUT) # set ledpin as an output
|
|
#GPIO.setup(pushpin, GPIO.IN, pull_up_down=GPIO.PUD_UP) # with pull up resistor
|
|
#GPIO.setup(pushpin, GPIO.IN) # with pull up resistor
|
|
for i, val in enumerate(pushpins):
|
|
GPIO.setup(val, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
thread_event.set()
|
|
thread = threading.Thread(target=backgroundTask, args =(value_toggled, ))
|
|
thread.daemon = True
|
|
thread.start()
|
|
app.run(use_reloader=False, debug=True, host='0.0.0.0')
|
|
thread_event.clear()
|
|
|
|
#while True:
|
|
# #GPIO.output(ledpin, GPIO.input(pushpin))
|
|
# print(GPIO.input(pushpin))
|
|
# sleep(1)
|