From a3fd248c9405e6fcece8c929c6b15a855c83c705 Mon Sep 17 00:00:00 2001 From: Anna Schumaker Date: Sun, 3 Mar 2019 19:53:40 -0500 Subject: [PATCH] curds: Create a ThreadQueue class We'll use this for scanning tracks, searching musicbrainz, and fetching album art. The web services are ratelimited, so it doesn't make sense to use more than one thread in this case. Additionally, scanning tracks works best as a single thread since we end up holding a lock the entire function anyway to prevent duplicates. Signed-off-by: Anna Schumaker --- curds/test_threadqueue.py | 49 +++++++++++++++++++++++++++++++++++++++ curds/threadqueue.py | 26 +++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 curds/test_threadqueue.py create mode 100644 curds/threadqueue.py diff --git a/curds/test_threadqueue.py b/curds/test_threadqueue.py new file mode 100644 index 0000000..215e04d --- /dev/null +++ b/curds/test_threadqueue.py @@ -0,0 +1,49 @@ +# Copyright 2019 (c) Anna Schumaker. +import queue +import unittest +import threading +import threadqueue + +test_a = 0 +test_b = 0 +test_c = 0 +test_l = threading.Lock() + +def set_abc(a, b, c): + global test_a, test_b, test_c + with test_l: + test_a = a + test_b = b + test_c = c + +class TestThreadQueue(unittest.TestCase): + def setUp(self): + set_abc(0, 0, 0) + + def test_threadqueue_init(self): + tq = threadqueue.ThreadQueue() + self.assertIsInstance(tq, queue.Queue) + self.assertIsInstance(tq, threading.Thread) + self.assertIsInstance(tq.stop_event, threading.Event) + self.assertTrue(tq.is_alive()) + tq.stop() + self.assertFalse(tq.is_alive()) + + def test_threadqueue_push(self): + tq = threadqueue.ThreadQueue() + with test_l: + tq.push(set_abc, 1, 2, 3) + + tq.join() + self.assertEqual(test_a, 1) + self.assertEqual(test_b, 2) + self.assertEqual(test_c, 3) + + for i in range(1000): + tq.push(set_abc, i, i+1, i+2) + + tq.join() + self.assertEqual(test_a, 999) + self.assertEqual(test_b, 1000) + self.assertEqual(test_c, 1001) + tq.stop() diff --git a/curds/threadqueue.py b/curds/threadqueue.py new file mode 100644 index 0000000..25c3463 --- /dev/null +++ b/curds/threadqueue.py @@ -0,0 +1,26 @@ +# Copyright 2019 (c) Anna Schumaker. +import queue +import threading + +class ThreadQueue(queue.Queue, threading.Thread): + def __init__(self): + queue.Queue.__init__(self) + threading.Thread.__init__(self) + self.stop_event = threading.Event() + self.start() + + def push(self, func, *args): + self.put((func, args)) + + def run(self): + while not self.stop_event.is_set(): + try: + (func, args) = self.get(block=True, timeout=0.1) + func(*args) + self.task_done() + except queue.Empty: + pass + + def stop(self): + self.stop_event.set() + threading.Thread.join(self)