Use Zoho for auth

This commit is contained in:
George Hotelling 2023-02-04 16:29:33 -05:00
parent bc6740ca63
commit 6ed0f49cce
4 changed files with 74 additions and 15 deletions

View File

@ -15,6 +15,9 @@ YELLOW_LED = 7
GREEN_LED = 11
RED_LED = 13
DOOR_STRIKE = 15
ZOHO_CLIENT_ID = "Ask George"
ZOHO_CLIENT_SECRET = "Or Check"
ZOHO_REFRESH_TOKEN = "The Wiki"
@ -92,7 +95,7 @@ def blink_leds():
import time
import datetime
from urllib import request
from zoho_auth import ZohoToken, authenticate
from slacker import Slacker
#############################################
# #
@ -105,19 +108,19 @@ NEED_FLUSH = []
THIS_MONTH = datetime.date.today().month
THIS_DAY = datetime.date.today().day
def ping_server(key):
ret = False
zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN)
print("Pinging server with key: %s (%s/%s)" % (key, SERVER, key))
GPIO.output(YELLOW_LED, ON);
with request.urlopen("%s/%s" % (SERVER, key)) as f:
ret = f.read().decode() == "OK"
if ret:
CACHED_KEYS.add(key)
else:
CACHED_KEYS.remove(key)
GPIO.output(YELLOW_LED, OFF);
return ret
def ping_server(key):
GPIO.output(YELLOW_LED, ON)
authorized = authenticate(zoho_token, key)
GPIO.output(YELLOW_LED, OFF)
if authorized:
CACHED_KEYS.add(key)
elif key in CACHED_KEYS:
CACHED_KEYS.remove(key)
return authorized
def notify_slack(count):
slack = Slacker(SLACK_TOKEN)

View File

@ -15,3 +15,7 @@ YELLOW_LED = 7
GREEN_LED = 11
RED_LED = 13
DOOR_STRIKE = 15
ZOHO_CLIENT_ID = "Ask George"
ZOHO_CLIENT_SECRET = "Ask George"
ZOHO_REFRESH_TOKEN = "Ask George"

View File

@ -2,7 +2,6 @@
import importlib
import os
import sys
import urllib
#######################################################
# #
# Add simulator/ directory to the module search path. #
@ -11,7 +10,6 @@ import urllib
sys.path.insert(0, os.path.join(os.getcwd(), "simulator"))
import fake_urllib
import doors
from doors import *

54
zoho_auth.py Normal file
View File

@ -0,0 +1,54 @@
import datetime
import json
from urllib import request
from urllib.parse import urlencode
from urllib.error import HTTPError
class ZohoToken:
def __init__(self, client_id, client_secret, refresh_token):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self.access_token = None
self.expires_at = None
self.update_token()
def update_token(self):
url = "https://accounts.zoho.com/oauth/v2/token?%s" % urlencode({
"refresh_token": self.refresh_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
})
req = request.Request(url, method="POST")
with request.urlopen(req) as f:
data = json.loads(f.read().decode())
self.access_token = data["access_token"]
self.expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(data["expires_in"]))
def get_token(self):
if self.access_token is None or self.expires_at <= datetime.datetime.now():
self.update_token()
return self.access_token
zoho_token = ZohoToken(ZOHO_CLIENT_ID, ZOHO_CLIENT_SECRET, ZOHO_REFRESH_TOKEN)
def authenticate(token, key):
ret = False
url = "https://www.zohoapis.com/crm/v2/Contacts/search?%s" % urlencode({
"criteria": "(Key_ID:equals:%s)" % key,
})
req = request.Request(url, method="GET", headers={
"Authorization": "Zoho-oauthtoken %s" % token.get_token(),
})
print("Pinging server with key: %s (%s)" % (key, url))
try:
with request.urlopen(req) as response:
response_body = response.read()
if len(response_body) > 0:
ret = True
except HTTPError as e:
print("Error pinging server: %s" % e)
return ret