53 lines
1.8 KiB
Python
53 lines
1.8 KiB
Python
import datetime
|
|
import json
|
|
from urllib import request
|
|
from urllib.parse import urlencode
|
|
from urllib.error import HTTPError
|
|
|
|
class ZohoToken:
|
|
def __init__(self, client_id, client_secret, refresh_token):
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.refresh_token = refresh_token
|
|
self._access_token = None
|
|
self.expires_at = None
|
|
self.update_token()
|
|
|
|
def update_token(self):
|
|
url = "https://accounts.zoho.com/oauth/v2/token?%s" % urlencode({
|
|
"refresh_token": self.refresh_token,
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"grant_type": "refresh_token",
|
|
})
|
|
req = request.Request(url, method="POST")
|
|
with request.urlopen(req) as f:
|
|
data = json.loads(f.read().decode())
|
|
self._access_token = data["access_token"]
|
|
self.expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(data["expires_in"]))
|
|
|
|
def get_token(self):
|
|
if self._access_token is None or self.expires_at <= datetime.datetime.now():
|
|
self.update_token()
|
|
return self._access_token
|
|
|
|
def authenticate(token, key):
|
|
ret = False
|
|
|
|
url = "https://www.zohoapis.com/crm/v2/Contacts/search?%s" % urlencode({
|
|
"criteria": "(Key_ID:equals:%s)" % key,
|
|
})
|
|
req = request.Request(url, method="GET", headers={
|
|
"Authorization": "Zoho-oauthtoken %s" % token.get_token(),
|
|
})
|
|
print("Pinging server with key: %s (%s)" % (key, url))
|
|
try:
|
|
with request.urlopen(req) as response:
|
|
response_body = response.read()
|
|
if len(response_body) > 0:
|
|
ret = True
|
|
except HTTPError as e:
|
|
print("Error pinging server: %s" % e)
|
|
|
|
return ret
|