forked from anna/doors
Post to slack when somebody enters the space
But only the first time they enter that day. Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
This commit is contained in:
parent
7296900ddf
commit
bc6740ca63
29
doors.py
29
doors.py
|
@ -93,14 +93,17 @@ def blink_leds():
|
|||
import time
|
||||
import datetime
|
||||
from urllib import request
|
||||
from slacker import Slacker
|
||||
#############################################
|
||||
# #
|
||||
# Verify a key with openings.workantile.com #
|
||||
# #
|
||||
#############################################
|
||||
CACHED_KEYS = set()
|
||||
TODAYS_KEYS = set()
|
||||
NEED_FLUSH = []
|
||||
THIS_MONTH = datetime.date.today().month
|
||||
THIS_DAY = datetime.date.today().day
|
||||
|
||||
def ping_server(key):
|
||||
ret = False
|
||||
|
@ -116,6 +119,13 @@ def ping_server(key):
|
|||
GPIO.output(YELLOW_LED, OFF);
|
||||
return ret
|
||||
|
||||
def notify_slack(count):
|
||||
slack = Slacker(SLACK_TOKEN)
|
||||
message = f"{count} people have entered today."
|
||||
if count == 1:
|
||||
message = f"{count} person has entered today."
|
||||
slack.chat.post_message(SLACK_CHANNEL, message)
|
||||
|
||||
def clear_cache():
|
||||
global THIS_MONTH
|
||||
|
||||
|
@ -125,6 +135,16 @@ def clear_cache():
|
|||
CACHED_KEYS.clear()
|
||||
THIS_MONTH = today.month
|
||||
|
||||
def clear_member_count():
|
||||
global THIS_DAY
|
||||
|
||||
today = datetime.date.today()
|
||||
if today.day != THIS_DAY:
|
||||
print("Clearing member count")
|
||||
TODAYS_KEYS.clear()
|
||||
THIS_DAY = today.day
|
||||
|
||||
|
||||
def verify_key(key):
|
||||
print("Verifying key: %s" % key)
|
||||
clear_cache()
|
||||
|
@ -134,6 +154,14 @@ def verify_key(key):
|
|||
return True
|
||||
return ping_server(key)
|
||||
|
||||
def check_today_count(key):
|
||||
clear_member_count()
|
||||
if key not in TODAYS_KEYS:
|
||||
print("New key:", key)
|
||||
TODAYS_KEYS.add(key)
|
||||
notify_slack(len(TODAYS_KEYS))
|
||||
|
||||
|
||||
def flush_keys():
|
||||
if len(NEED_FLUSH) > 0:
|
||||
ping_server(NEED_FLUSH[0])
|
||||
|
@ -167,6 +195,7 @@ def read_rfid():
|
|||
unlock_door()
|
||||
time.sleep(5) # block for 5 seconds before resetting door
|
||||
lock_door()
|
||||
check_today_count(key)
|
||||
flush_keys()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import importlib
|
||||
import os
|
||||
import sys
|
||||
import urllib
|
||||
#######################################################
|
||||
# #
|
||||
# Add simulator/ directory to the module search path. #
|
||||
|
@ -10,6 +11,7 @@ import sys
|
|||
sys.path.insert(0, os.path.join(os.getcwd(), "simulator"))
|
||||
|
||||
|
||||
import fake_urllib
|
||||
import doors
|
||||
from doors import *
|
||||
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
print("Using fake urllib module.")
|
||||
|
||||
import random
|
||||
|
||||
class Request:
|
||||
def __init__(self, url):
|
||||
self.url = url
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, tb):
|
||||
pass
|
||||
|
||||
def read(self):
|
||||
val = random.randint(0, 5)
|
||||
if val == 0:
|
||||
return "ERROR".encode()
|
||||
elif val < 4:
|
||||
return "OK".encode()
|
||||
raise Exception("Test Network Exception")
|
||||
|
||||
|
||||
def urlopen(url):
|
||||
print("> URLLIB: opening url %s" % url)
|
||||
return Request(url)
|
Loading…
Reference in New Issue