emmental/curds/test_notify.py

97 lines
3.8 KiB
Python

# Copyright 2019 (c) Anna Schumaker.
from . import notify
import gi
import threading
import unittest
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
class TestNotify(unittest.TestCase):
def setUp(self):
notify.clear()
self.test_count = 0
self.test_arg1 = None
self.test_arg2 = None
self.task_queued = False
def tearDown(self):
notify.clear()
def notify_thread(self, event, *args):
thread = threading.Thread(target=notify.notify, args=(event, *args))
thread.start()
thread.join()
def on_test1(self, arg1, arg2): self.test_count += 1
def on_test2(self, arg1, arg2): self.test_arg1 = arg1
def on_test3(self, arg1, arg2): self.test_arg2 = arg2
def on_task_queued(self): self.task_queued = True
def on_test_queue(self, arg1): self.test_count += 1; self.test_arg1 = arg1
def test_notify_init(self):
self.assertEqual(notify.events, {})
self.assertEqual(notify.idle_queue, [])
self.assertFalse(notify.event_lock.locked())
def test_notify_register(self):
notify.register("on-test", self.on_test1)
self.assertEqual(notify.events, {"on-test" : [ (self.on_test1, False) ]})
notify.register("on-test", self.on_test1, True)
notify.register("on-test", self.on_test2)
notify.register("on-test", self.on_test3, True)
self.assertEqual(notify.events, {"on-test" : [ (self.on_test1, False),
(self.on_test2, False),
(self.on_test3, True ) ]})
def test_notify_clear(self):
notify.events = { "abc" : [ (self.on_test1, False) ]}
notify.main_q = [ 1, 2, 3 ]
notify.clear()
self.assertEqual(notify.events, {})
self.assertEqual(notify.idle_queue, [])
def test_notify_cancel(self):
notify.events = { "on-test" : [ (self.on_test1, False),
(self.on_test2, False),
(self.on_test3, True) ]}
notify.cancel("on-test", self.on_test2)
self.assertEqual(notify.events, { "on-test" : [ (self.on_test1, False),
(self.on_test3, True) ]})
notify.cancel("on-test", self.on_test1)
self.assertEqual(notify.events, { "on-test" : [ (self.on_test3, True) ] })
notify.cancel("on-test", self.on_test3)
self.assertEqual(notify.events, { })
def test_notify_main_thread(self):
notify.register("on-test", self.on_test1)
notify.register("on-test", self.on_test2)
notify.register("on-test", self.on_test3, True)
notify.notify("on-test", "It Worked", "CoolCoolCool")
self.assertEqual(self.test_count, 1)
self.assertEqual(self.test_arg1, "It Worked")
self.assertEqual(self.test_arg2, None)
while Gtk.events_pending(): Gtk.main_iteration_do(True)
self.assertEqual(self.test_arg2, "CoolCoolCool")
def test_notify_bg_thread(self):
notify.register("on-test", self.on_test1, True)
notify.register("on-test", self.on_test2)
self.notify_thread("on-test", "It Worked", "CoolCoolCool")
self.assertEqual(self.test_count, 0)
self.assertEqual(self.test_arg1, "It Worked")
self.assertEqual(notify.idle_queue, [ (self.on_test1, ("It Worked", "CoolCoolCool")) ])
self.notify_thread("on-test", "It Worked", "CoolCoolCool")
self.assertEqual(self.test_count, 0)
self.assertEqual(notify.idle_queue, [ (self.on_test1, ("It Worked", "CoolCoolCool")) ])
while Gtk.events_pending(): Gtk.main_iteration_do(True)
self.assertEqual(self.test_count, 1)
self.assertEqual(notify.idle_queue, [ ])
self.assertFalse(notify.notify_idle())