From 6ed0f49cce67d40306c133f46af142d04e6c7518 Mon Sep 17 00:00:00 2001 From: George Hotelling Date: Sat, 4 Feb 2023 16:29:33 -0500 Subject: [PATCH] Use Zoho for auth --- doors.py | 29 ++++++++++++----------- doorsrc | 4 ++++ simulator/__init__.py | 2 -- zoho_auth.py | 54 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 15 deletions(-) create mode 100644 zoho_auth.py diff --git a/doors.py b/doors.py index f91ae2a..26bc4d1 100755 --- a/doors.py +++ b/doors.py @@ -15,6 +15,9 @@ YELLOW_LED = 7 GREEN_LED = 11 RED_LED = 13 DOOR_STRIKE = 15 +ZOHO_CLIENT_ID = "Ask George" +ZOHO_CLIENT_SECRET = "Or Check" +ZOHO_REFRESH_TOKEN = "The Wiki" @@ -92,7 +95,7 @@ def blink_leds(): import time import datetime -from urllib import request +from zoho_auth import ZohoToken, authenticate from slacker import Slacker ############################################# # # @@ -105,19 +108,19 @@ NEED_FLUSH = [] THIS_MONTH = datetime.date.today().month THIS_DAY = datetime.date.today().day -def ping_server(key): - ret = False +zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN) - print("Pinging server with key: %s (%s/%s)" % (key, SERVER, key)) - GPIO.output(YELLOW_LED, ON); - with request.urlopen("%s/%s" % (SERVER, key)) as f: - ret = f.read().decode() == "OK" - if ret: - CACHED_KEYS.add(key) - else: - CACHED_KEYS.remove(key) - GPIO.output(YELLOW_LED, OFF); - return ret +def ping_server(key): + GPIO.output(YELLOW_LED, ON) + authorized = authenticate(zoho_token, key) + GPIO.output(YELLOW_LED, OFF) + + if authorized: + CACHED_KEYS.add(key) + elif key in CACHED_KEYS: + CACHED_KEYS.remove(key) + + return authorized def notify_slack(count): slack = Slacker(SLACK_TOKEN) diff --git a/doorsrc b/doorsrc index 083918a..a0f1a10 100644 --- a/doorsrc +++ b/doorsrc @@ -15,3 +15,7 @@ YELLOW_LED = 7 GREEN_LED = 11 RED_LED = 13 DOOR_STRIKE = 15 + +ZOHO_CLIENT_ID = "Ask George" +ZOHO_CLIENT_SECRET = "Ask George" +ZOHO_REFRESH_TOKEN = "Ask George" \ No newline at end of file diff --git a/simulator/__init__.py b/simulator/__init__.py index fe58bf7..3529614 100644 --- a/simulator/__init__.py +++ b/simulator/__init__.py @@ -2,7 +2,6 @@ import importlib import os import sys -import urllib ####################################################### # # # Add simulator/ directory to the module search path. # @@ -11,7 +10,6 @@ import urllib sys.path.insert(0, os.path.join(os.getcwd(), "simulator")) -import fake_urllib import doors from doors import * diff --git a/zoho_auth.py b/zoho_auth.py new file mode 100644 index 0000000..f9e09b6 --- /dev/null +++ b/zoho_auth.py @@ -0,0 +1,54 @@ +import datetime +import json +from urllib import request +from urllib.parse import urlencode +from urllib.error import HTTPError + +class ZohoToken: + def __init__(self, client_id, client_secret, refresh_token): + self.client_id = client_id + self.client_secret = client_secret + self.refresh_token = refresh_token + self.access_token = None + self.expires_at = None + self.update_token() + + def update_token(self): + url = "https://accounts.zoho.com/oauth/v2/token?%s" % urlencode({ + "refresh_token": self.refresh_token, + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "refresh_token", + }) + req = request.Request(url, method="POST") + with request.urlopen(req) as f: + data = json.loads(f.read().decode()) + self.access_token = data["access_token"] + self.expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(data["expires_in"])) + + def get_token(self): + if self.access_token is None or self.expires_at <= datetime.datetime.now(): + self.update_token() + return self.access_token + +zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN) + +def authenticate(token, key): + ret = False + + url = "https://www.zohoapis.com/crm/v2/Contacts/search?%s" % urlencode({ + "criteria": "(Key_ID:equals:%s)" % key, + }) + req = request.Request(url, method="GET", headers={ + "Authorization": "Zoho-oauthtoken %s" % token.get_token(), + }) + print("Pinging server with key: %s (%s)" % (key, url)) + try: + with request.urlopen(req) as response: + response_body = response.read() + if len(response_body) > 0: + ret = True + except HTTPError as e: + print("Error pinging server: %s" % e) + + return ret \ No newline at end of file