curds: Create a ThreadQueue class

We'll use this for scanning tracks, searching musicbrainz, and fetching
album art. The web services are ratelimited, so it doesn't make sense to
use more than one thread in this case. Additionally, scanning tracks
works best as a single thread since we end up holding a lock the entire
function anyway to prevent duplicates.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2019-03-03 19:53:40 -05:00
parent 29608caddb
commit a3fd248c94
2 changed files with 75 additions and 0 deletions

49
curds/test_threadqueue.py Normal file
View File

@ -0,0 +1,49 @@
# Copyright 2019 (c) Anna Schumaker.
import queue
import unittest
import threading
import threadqueue
test_a = 0
test_b = 0
test_c = 0
test_l = threading.Lock()
def set_abc(a, b, c):
global test_a, test_b, test_c
with test_l:
test_a = a
test_b = b
test_c = c
class TestThreadQueue(unittest.TestCase):
def setUp(self):
set_abc(0, 0, 0)
def test_threadqueue_init(self):
tq = threadqueue.ThreadQueue()
self.assertIsInstance(tq, queue.Queue)
self.assertIsInstance(tq, threading.Thread)
self.assertIsInstance(tq.stop_event, threading.Event)
self.assertTrue(tq.is_alive())
tq.stop()
self.assertFalse(tq.is_alive())
def test_threadqueue_push(self):
tq = threadqueue.ThreadQueue()
with test_l:
tq.push(set_abc, 1, 2, 3)
tq.join()
self.assertEqual(test_a, 1)
self.assertEqual(test_b, 2)
self.assertEqual(test_c, 3)
for i in range(1000):
tq.push(set_abc, i, i+1, i+2)
tq.join()
self.assertEqual(test_a, 999)
self.assertEqual(test_b, 1000)
self.assertEqual(test_c, 1001)
tq.stop()

26
curds/threadqueue.py Normal file
View File

@ -0,0 +1,26 @@
# Copyright 2019 (c) Anna Schumaker.
import queue
import threading
class ThreadQueue(queue.Queue, threading.Thread):
def __init__(self):
queue.Queue.__init__(self)
threading.Thread.__init__(self)
self.stop_event = threading.Event()
self.start()
def push(self, func, *args):
self.put((func, args))
def run(self):
while not self.stop_event.is_set():
try:
(func, args) = self.get(block=True, timeout=0.1)
func(*args)
self.task_done()
except queue.Empty:
pass
def stop(self):
self.stop_event.set()
threading.Thread.join(self)