curds: Push some notifications onto a queue
Gtk really doesn't like updates coming from outside of the main thread, so let's add a way to register callbacks that can be queued up and run later. Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
parent
5c267c8a9d
commit
e38e6ae1b4
|
@ -2,23 +2,36 @@
|
|||
|
||||
class Notify:
|
||||
notifications = {}
|
||||
queue = [ ]
|
||||
|
||||
def __init__(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def clear():
|
||||
Notify.notifications.clear()
|
||||
Notify.queue.clear()
|
||||
|
||||
def notify(name, *args):
|
||||
for func in Notify.notifications.get(name, []):
|
||||
func(*args)
|
||||
for (func, queue) in Notify.notifications.get(name, []):
|
||||
if queue == True:
|
||||
Notify.queue.append((func, args))
|
||||
else:
|
||||
func(*args)
|
||||
|
||||
def notify_cancel(name, func):
|
||||
def notify_cancel(name, func, queue=False):
|
||||
cb = Notify.notifications.get(name, [])
|
||||
if func in cb:
|
||||
cb.remove(func)
|
||||
if (func, queue) in cb:
|
||||
cb.remove((func, queue))
|
||||
|
||||
def notify_me(name, func):
|
||||
def notify_me(name, func, queue=False):
|
||||
cb = Notify.notifications.setdefault(name, [])
|
||||
if not func in cb:
|
||||
cb.append(func)
|
||||
if func in [ n[0] for n in cb ]:
|
||||
return
|
||||
cb.append((func, queue))
|
||||
|
||||
def run_queued():
|
||||
if len(Notify.queue) > 0:
|
||||
(func, args) = Notify.queue.pop(0)
|
||||
func(*args)
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -18,25 +18,35 @@ class TestNotify(unittest.TestCase):
|
|||
def test_notify(self):
|
||||
self.assertRaises(NotImplementedError, notify.Notify)
|
||||
self.assertEqual(notify.Notify.notifications, {})
|
||||
self.assertEqual(notify.Notify.queue, [])
|
||||
|
||||
notify.Notify.notify_me("on-test", self.on_test1)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ self.on_test1 ]})
|
||||
notify.Notify.notify_me("on-test", self.on_test1)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ (self.on_test1, False) ]})
|
||||
notify.Notify.notify_me("on-test", self.on_test1, True)
|
||||
notify.Notify.notify_me("on-test", self.on_test2)
|
||||
notify.Notify.notify_me("on-test", self.on_test3)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ self.on_test1, self.on_test2, self.on_test3 ]})
|
||||
notify.Notify.notify_me("on-test", self.on_test3, True)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ (self.on_test1, False),
|
||||
(self.on_test2, False),
|
||||
(self.on_test3, True ) ]})
|
||||
|
||||
notify.Notify.notify("on-test", "It Worked", "CoolCoolCool")
|
||||
self.assertEqual(self.test_count, 1)
|
||||
self.assertEqual(self.test_arg1, "It Worked")
|
||||
self.assertEqual(self.test_arg2, None)
|
||||
|
||||
self.assertEqual(notify.Notify.queue, [ (self.on_test3, ("It Worked", "CoolCoolCool")) ])
|
||||
self.assertTrue(notify.Notify.run_queued())
|
||||
self.assertEqual(self.test_arg2, "CoolCoolCool")
|
||||
|
||||
notify.Notify.notify("no-cb", "Please", "Don't", "Crash")
|
||||
self.assertFalse(notify.Notify.run_queued())
|
||||
|
||||
notify.Notify.notify_cancel("on-test", self.on_test2)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ self.on_test1, self.on_test3 ]})
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ (self.on_test1, False),
|
||||
(self.on_test3, True) ]})
|
||||
notify.Notify.notify_cancel("on-test", self.on_test1)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ self.on_test3 ]})
|
||||
notify.Notify.notify_cancel("on-test", self.on_test3)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ (self.on_test3, True) ]})
|
||||
notify.Notify.notify_cancel("on-test", self.on_test3, True)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ ]})
|
||||
notify.Notify.notify_cancel("on-test", self.on_test1)
|
||||
self.assertEqual(notify.Notify.notifications, {"on-test" : [ ]})
|
||||
|
|
Loading…
Reference in New Issue