doors/doors.py

268 lines
6.5 KiB
Python
Executable File

#!/usr/bin/python
import socket
##################
# #
# Default values #
# #
##################
CONTROLLER = socket.gethostname()
SERVER = "localhost"
RFID_PATH = "/dev/ttyUSB0"
YELLOW_LED = 7
GREEN_LED = 11
RED_LED = 13
DOOR_STRIKE = 15
ZOHO_CLIENT_ID = "Ask George"
ZOHO_CLIENT_SECRET = "Or Check"
ZOHO_REFRESH_TOKEN = "The Wiki"
import os
####################
# #
# Read doorrc file #
# #
####################
for conf in [ "/etc/doorsrc", "./doorsrc" ]:
if os.path.exists(conf):
with open(conf) as f:
exec(compile(f.read(), "doorsrc", 'exec'))
import atexit
import RPi.GPIO as GPIO
####################
# #
# Set up GPIO Pins #
# #
####################
GPIO.setmode(GPIO.BOARD)
atexit.register(GPIO.cleanup)
GPIO.setup(DOOR_STRIKE, GPIO.OUT)
GPIO.setup(RED_LED, GPIO.OUT)
GPIO.setup(GREEN_LED, GPIO.OUT)
GPIO.setup(YELLOW_LED, GPIO.OUT)
GPIO.output(YELLOW_LED, GPIO.OUT)
OFF = GPIO.LOW
ON = GPIO.HIGH
def unlock_door():
print("Door is unlocked");
GPIO.output(DOOR_STRIKE, ON)
GPIO.output(RED_LED, OFF)
GPIO.output(GREEN_LED, ON)
def lock_door():
print("Door is locked");
GPIO.output(DOOR_STRIKE, OFF)
GPIO.output(RED_LED, ON)
GPIO.output(GREEN_LED, OFF)
lock_door()
# Turn all LEDs on
def leds_on():
GPIO.output(RED_LED, ON)
GPIO.output(GREEN_LED, ON)
# Turn all LEDs off
def leds_off():
GPIO.output(RED_LED, OFF)
GPIO.output(GREEN_LED, OFF)
def blink_leds():
print("Blinking LEDs")
for i in range(5):
if (i % 2) == 0:
leds_on()
else:
leds_off()
time.sleep(1)
lock_door()
import datetime
import json
from http import HTTPStatus
from urllib import request
from urllib.parse import urlencode
from urllib.error import HTTPError
class ZohoToken:
"""
Provides fresh API access tokens from Zoho
"""
def __init__(self, client_id, client_secret, refresh_token):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self._access_token = None
self.expires_at = None
def update_token(self):
url = "https://accounts.zoho.com/oauth/v2/token?%s" % urlencode({
"refresh_token": self.refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
})
req = request.Request(url, method="POST")
with request.urlopen(req) as f:
data = json.loads(f.read().decode())
self._access_token = data["access_token"]
self.expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(data["expires_in"]))
def get_token(self):
if self._access_token is None or self.expires_at <= datetime.datetime.now():
self.update_token()
return self._access_token
def authenticate(token, key):
"""
Authenticate a key against the ZOHO CRM API
"""
url = "https://www.zohoapis.com/crm/v2/Contacts/search?%s" % urlencode({
"criteria": "(Key_ID:equals:%s)" % key,
})
req = request.Request(url, method="GET", headers={
"Authorization": "Zoho-oauthtoken %s" % token.get_token(),
})
print("Pinging server with key: %s (%s)" % (key, url))
try:
with request.urlopen(req) as response:
return response.status == HTTPStatus.OK
except HTTPError as e:
print("Error pinging server: %s" % e)
return False
import time
from slacker import Slacker
#############################################
# #
# Verify a key with openings.workantile.com #
# #
#############################################
CACHED_KEYS = set()
TODAYS_KEYS = set()
NEED_FLUSH = []
THIS_MONTH = datetime.date.today().month
THIS_DAY = datetime.date.today().day
zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN)
def ping_server(key):
GPIO.output(YELLOW_LED, ON)
authorized = authenticate(zoho_token, key)
GPIO.output(YELLOW_LED, OFF)
if authorized:
CACHED_KEYS.add(key)
elif key in CACHED_KEYS:
CACHED_KEYS.remove(key)
return authorized
def notify_slack(count):
slack = Slacker(SLACK_TOKEN)
message = f"{count} people have entered today."
if count == 1:
message = f"{count} person has entered today."
slack.chat.post_message(SLACK_CHANNEL, message)
def clear_cache():
global THIS_MONTH
today = datetime.date.today()
if today.month != THIS_MONTH:
print("Clearing key cache")
CACHED_KEYS.clear()
THIS_MONTH = today.month
def clear_member_count():
global THIS_DAY
today = datetime.date.today()
if today.day != THIS_DAY:
print("Clearing member count")
TODAYS_KEYS.clear()
THIS_DAY = today.day
def verify_key(key):
print("Verifying key: %s" % key)
clear_cache()
if key in CACHED_KEYS:
print("Using cached key")
NEED_FLUSH.append(key)
return True
return ping_server(key)
def check_today_count(key):
clear_member_count()
if key not in TODAYS_KEYS:
print("New key:", key)
TODAYS_KEYS.add(key)
notify_slack(len(TODAYS_KEYS))
def flush_keys():
if len(NEED_FLUSH) > 0:
ping_server(NEED_FLUSH[0])
NEED_FLUSH.pop(0)
#python-pyserial package. Not sure we need this. Grabbed based on
#http://allenmlabs.blogspot.se/2013/01/raspberry-pi-parallax-rfid-reader.html
import serial
###################
# #
# Run RFID Reader #
# #
###################
RFID_SERIAL = serial.Serial(RFID_PATH, 2400, timeout=1)
def read_key():
RFID_SERIAL.reset_input_buffer() # Clear input buffer before reading
string = RFID_SERIAL.read(12)
if len(string) == 12 and string[0] == 0xA and string[-1] == 0xD:
key = string[1:11].decode() #exclude start 0xA and stop 0xD bytes
if key.isalnum():
return key;
return None
def read_rfid():
try:
key = read_key()
if key and verify_key(key):
unlock_door()
time.sleep(5) # block for 5 seconds before resetting door
lock_door()
check_today_count(key)
flush_keys()
except Exception as e:
print(e)
lock_door()
blink_leds()
def loop():
while True:
read_rfid()
if __name__ == "__main__":
loop()