#!/usr/bin/python import socket ################## # # # Default values # # # ################## CONTROLLER = socket.gethostname() SERVER = "localhost" RFID_PATH = "/dev/ttyUSB0" YELLOW_LED = 7 GREEN_LED = 11 RED_LED = 13 DOOR_STRIKE = 15 ZOHO_CLIENT_ID = "Ask George" ZOHO_CLIENT_SECRET = "Or Check" ZOHO_REFRESH_TOKEN = "The Wiki" import os #################### # # # Read doorrc file # # # #################### for conf in [ "/etc/doorsrc", "./doorsrc" ]: if os.path.exists(conf): with open(conf) as f: exec(compile(f.read(), "doorsrc", 'exec')) import atexit import RPi.GPIO as GPIO #################### # # # Set up GPIO Pins # # # #################### GPIO.setmode(GPIO.BOARD) atexit.register(GPIO.cleanup) GPIO.setup(DOOR_STRIKE, GPIO.OUT) GPIO.setup(RED_LED, GPIO.OUT) GPIO.setup(GREEN_LED, GPIO.OUT) GPIO.setup(YELLOW_LED, GPIO.OUT) GPIO.output(YELLOW_LED, GPIO.OUT) OFF = GPIO.LOW ON = GPIO.HIGH def unlock_door(): print("Door is unlocked"); GPIO.output(DOOR_STRIKE, ON) GPIO.output(RED_LED, OFF) GPIO.output(GREEN_LED, ON) def lock_door(): print("Door is locked"); GPIO.output(DOOR_STRIKE, OFF) GPIO.output(RED_LED, ON) GPIO.output(GREEN_LED, OFF) lock_door() # Turn all LEDs on def leds_on(): GPIO.output(RED_LED, ON) GPIO.output(GREEN_LED, ON) # Turn all LEDs off def leds_off(): GPIO.output(RED_LED, OFF) GPIO.output(GREEN_LED, OFF) def blink_leds(): print("Blinking LEDs") for i in range(5): if (i % 2) == 0: leds_on() else: leds_off() time.sleep(1) lock_door() import datetime import json from http import HTTPStatus from urllib import request from urllib.parse import urlencode from urllib.error import HTTPError class ZohoToken: """ Provides fresh API access tokens from Zoho """ def __init__(self, client_id, client_secret, refresh_token): self.client_id = client_id self.client_secret = client_secret self.refresh_token = refresh_token self._access_token = None self.expires_at = None def update_token(self): url = "https://accounts.zoho.com/oauth/v2/token?%s" % urlencode({ "refresh_token": self.refresh_token, "client_id": self.client_id, "client_secret": self.client_secret, "grant_type": "refresh_token", }) req = request.Request(url, method="POST") with request.urlopen(req) as f: data = json.loads(f.read().decode()) self._access_token = data["access_token"] self.expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(data["expires_in"])) def get_token(self): if self._access_token is None or self.expires_at <= datetime.datetime.now(): self.update_token() return self._access_token def authenticate(token, key): """ Authenticate a key against the ZOHO CRM API """ url = "https://www.zohoapis.com/crm/v2/Contacts/search?%s" % urlencode({ "criteria": "(Key_ID:equals:%s)" % key, }) req = request.Request(url, method="GET", headers={ "Authorization": "Zoho-oauthtoken %s" % token.get_token(), }) print("Pinging server with key: %s (%s)" % (key, url)) try: with request.urlopen(req) as response: return response.status == HTTPStatus.OK except HTTPError as e: print("Error pinging server: %s" % e) return False import time from slacker import Slacker ############################################# # # # Verify a key with openings.workantile.com # # # ############################################# CACHED_KEYS = set() TODAYS_KEYS = set() NEED_FLUSH = [] THIS_MONTH = datetime.date.today().month THIS_DAY = datetime.date.today().day zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN) def ping_server(key): GPIO.output(YELLOW_LED, ON) authorized = authenticate(zoho_token, key) GPIO.output(YELLOW_LED, OFF) if authorized: CACHED_KEYS.add(key) elif key in CACHED_KEYS: CACHED_KEYS.remove(key) return authorized def notify_slack(count): slack = Slacker(SLACK_TOKEN) message = f"{count} people have entered today." if count == 1: message = f"{count} person has entered today." slack.chat.post_message(SLACK_CHANNEL, message) def clear_cache(): global THIS_MONTH today = datetime.date.today() if today.month != THIS_MONTH: print("Clearing key cache") CACHED_KEYS.clear() THIS_MONTH = today.month def clear_member_count(): global THIS_DAY today = datetime.date.today() if today.day != THIS_DAY: print("Clearing member count") TODAYS_KEYS.clear() THIS_DAY = today.day def verify_key(key): print("Verifying key: %s" % key) clear_cache() if key in CACHED_KEYS: print("Using cached key") NEED_FLUSH.append(key) return True return ping_server(key) def check_today_count(key): clear_member_count() if key not in TODAYS_KEYS: print("New key:", key) TODAYS_KEYS.add(key) notify_slack(len(TODAYS_KEYS)) def flush_keys(): if len(NEED_FLUSH) > 0: ping_server(NEED_FLUSH[0]) NEED_FLUSH.pop(0) #python-pyserial package. Not sure we need this. Grabbed based on #http://allenmlabs.blogspot.se/2013/01/raspberry-pi-parallax-rfid-reader.html import serial ################### # # # Run RFID Reader # # # ################### RFID_SERIAL = serial.Serial(RFID_PATH, 2400, timeout=1) def read_key(): RFID_SERIAL.reset_input_buffer() # Clear input buffer before reading string = RFID_SERIAL.read(12) if len(string) == 12 and string[0] == 0xA and string[-1] == 0xD: key = string[1:11].decode() #exclude start 0xA and stop 0xD bytes if key.isalnum(): return key; return None def read_rfid(): try: key = read_key() if key and verify_key(key): unlock_door() time.sleep(5) # block for 5 seconds before resetting door lock_door() check_today_count(key) flush_keys() except Exception as e: print(e) lock_door() blink_leds() def loop(): while True: read_rfid() if __name__ == "__main__": loop()