db: Add an idle Queue

This is intended to be used as a base class for other task-specific
Queues, and runs each task under a single transaction. Queue
implementations should override the do_run_task() function for their
implementation-specific work.

The push_many() function can be used to efficiently add several tasks to
the Queue at the same time.

Signed-off-by: Anna Schumaker <Anna@NoWheyCreamery.com>
This commit is contained in:
Anna Schumaker 2022-09-13 10:38:42 -04:00
parent 1832a56786
commit e502a7e8cb
2 changed files with 272 additions and 0 deletions

88
emmental/db/idle.py Normal file
View File

@ -0,0 +1,88 @@
# Copyright 2022 (c) Anna Schumaker
"""Idle queues to assid with large database operations."""
import typing
from gi.repository import GObject
from gi.repository import GLib
class Queue(GObject.GObject):
"""A base class Idle Queue."""
total = GObject.Property(type=int)
progress = GObject.Property(type=float)
running = GObject.Property(type=bool, default=False)
enabled = GObject.Property(type=bool, default=True)
def __init__(self, **kwargs):
"""Initialize an Idle Queue."""
super().__init__(**kwargs)
self._tasks = []
self._idle_id = None
def __getitem__(self, n: int) -> tuple:
"""Get the n-th task in the queue."""
return self._tasks[n] if n < len(self._tasks) else None
def __run_next_task(self) -> None:
task = self._tasks[0]
if task[0](*task[1:]):
self._tasks.pop(0)
def __start(self) -> None:
if not self.running:
self.running = True
self._idle_id = GLib.idle_add(self.run_task)
self.__update_counters()
def __update_counters(self) -> bool:
if (pending := len(self._tasks)) == 0:
self.cancel()
return GLib.SOURCE_REMOVE
self.progress = 1 - (pending / self.total)
return GLib.SOURCE_CONTINUE
def cancel(self) -> None:
"""Cancel all pending tasks."""
if self._idle_id is not None:
GLib.source_remove(self._idle_id)
self._tasks.clear()
self.progress = 0.0
self.total = 0
self.running = False
self._idle_id = None
def complete(self) -> None:
"""Complete all pending tasks."""
if self.running:
while len(self._tasks) > 0:
self.__run_next_task()
self.cancel()
def push(self, func: typing.Callable, *args,
now: bool = False) -> bool | None:
"""Add a task to the Idle Queue."""
if not self.enabled or now:
return func(*args)
self._tasks.append((func, *args))
self.total += 1
self.__start()
def push_many(self, func: typing.Callable, args: list[tuple[any]],
now: bool = False) -> None:
"""Add several tasks to the Idle Queue."""
if not self.enabled or now:
for arg in args:
func(*arg)
else:
self._tasks.extend([(func, *arg) for arg in args])
self.total += len(args)
self.__start()
def run_task(self) -> bool:
"""Manually run the next task."""
if len(self._tasks) > 0:
self.__run_next_task()
return self.__update_counters()
return GLib.SOURCE_REMOVE

184
tests/db/test_idle.py Normal file
View File

@ -0,0 +1,184 @@
# Copyright 2022 (c) Anna Schumaker
"""Tests our idle queues."""
import unittest
import unittest.mock
import emmental.db.idle
from gi.repository import GObject
from gi.repository import GLib
@unittest.mock.patch("gi.repository.GLib.source_remove")
@unittest.mock.patch("gi.repository.GLib.idle_add", return_value=42)
class TestIdleQueue(unittest.TestCase):
"""Test the base Idle Queue."""
def setUp(self):
"""Set up common variables."""
self.queue = emmental.db.idle.Queue()
def test_init(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test that the Idle Queue is initialized properly."""
self.assertIsInstance(self.queue, GObject.GObject)
self.assertEqual(self.queue.total, 0)
self.assertEqual(self.queue.progress, 0.0)
self.assertFalse(self.queue.running)
self.assertTrue(self.queue.enabled)
self.assertListEqual(self.queue._tasks, [])
self.assertIsNone(self.queue._idle_id)
def test_getitem(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test the __getitem__() function."""
self.queue._tasks = [1, 2, 3]
self.assertEqual(self.queue[0], 1)
self.assertEqual(self.queue[1], 2)
self.assertEqual(self.queue[2], 3)
self.assertIsNone(self.queue[3])
def test_cancel(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test canceling queued tasks."""
self.queue.push(1)
self.queue.progress = 0.42
self.queue.cancel()
mock_source_removed.assert_called_with(42)
self.assertListEqual(self.queue._tasks, [])
self.assertIsNone(self.queue._idle_id)
self.assertEqual(self.queue.total, 0)
self.assertEqual(self.queue.progress, 0.0)
def test_complete(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test completing queued tasks."""
func = unittest.mock.Mock(return_value=True)
self.queue.push(func, 1)
self.queue.push(func, 2)
self.queue.complete()
func.assert_has_calls([unittest.mock.call(1),
unittest.mock.call(2)])
self.assertFalse(self.queue.running)
self.assertListEqual(self.queue._tasks, [])
mock_source_removed.assert_called_with(42)
func.reset_mock()
self.queue.complete()
func.assert_not_called()
def test_push_enabled(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test adding an idle task to an enabled idle queue."""
func = unittest.mock.Mock(return_value=True)
self.assertIsNone(self.queue.push(func, 1))
mock_idle_add.assert_called_with(self.queue.run_task)
func.assert_not_called()
self.assertListEqual(self.queue._tasks, [(func, 1)])
self.assertEqual(self.queue.total, 1)
self.assertEqual(self.queue._idle_id, 42)
self.assertTrue(self.queue.running)
mock_idle_add.reset_mock()
self.assertIsNone(self.queue.push(func, 2))
self.assertListEqual(self.queue._tasks, [(func, 1), (func, 2)])
self.assertEqual(self.queue.total, 2)
mock_idle_add.assert_not_called()
def test_push_disabled(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test adding an idle task to a disabled idle queue."""
self.queue.enabled = False
func = unittest.mock.Mock(return_value=True)
self.assertTrue(self.queue.push(func, 1))
self.assertListEqual(self.queue._tasks, [])
self.assertEqual(self.queue.total, 0)
self.assertFalse(self.queue.running)
self.assertIsNone(self.queue._idle_id)
mock_idle_add.assert_not_called()
func.assert_called_with(1)
func = unittest.mock.Mock(return_value=False)
self.assertFalse(self.queue.push(func, 2))
mock_idle_add.assert_not_called()
func.assert_called_with(2)
def test_push_now(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test pushing an idle task with now=True."""
func = unittest.mock.Mock(return_value=True)
self.assertTrue(self.queue.push(func, 1, now=True))
self.assertListEqual(self.queue._tasks, [])
self.assertEqual(self.queue.total, 0)
self.assertFalse(self.queue.running)
self.assertIsNone(self.queue._idle_id)
mock_idle_add.assert_not_called()
func.assert_called_with(1)
def test_push_many_enabled(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test adding several calls to one function at one time."""
func = unittest.mock.Mock(return_value=True)
self.queue.push_many(func, [(1,), (2,), (3,)])
self.assertListEqual(self.queue._tasks, [(func, 1),
(func, 2),
(func, 3)])
self.assertEqual(self.queue.total, 3)
self.assertEqual(self.queue._idle_id, 42)
self.assertTrue(self.queue.running)
mock_idle_add.assert_called_with(self.queue.run_task)
func.assert_not_called()
def test_push_many_disabled(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test push_many() with the idle queue disabled."""
self.queue.enabled = False
func = unittest.mock.Mock(return_value=True)
self.queue.push_many(func, [(1,), (2,), (3,)])
func.assert_has_calls([unittest.mock.call(1),
unittest.mock.call(2),
unittest.mock.call(3)])
mock_idle_add.assert_not_called()
def test_push_many_now(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test push_many() with now=True."""
func = unittest.mock.Mock(return_value=True)
self.queue.push_many(func, [(1,), (2,), (3,)], now=True)
func.assert_has_calls([unittest.mock.call(1),
unittest.mock.call(2),
unittest.mock.call(3)])
mock_idle_add.assert_not_called()
def test_run_task(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test manually running idle tasks."""
func = unittest.mock.Mock(return_value=True)
self.queue.push_many(func, [(i,) for i in range(5)])
self.assertEqual(self.queue.total, 5)
for i in range(5):
expected = GLib.SOURCE_REMOVE if i == 4 else GLib.SOURCE_CONTINUE
with self.subTest(i=i, expected=expected):
self.assertEqual(self.queue.run_task(), expected)
func.assert_called_with(i)
self.assertAlmostEqual(self.queue.progress,
0 if i == 4 else (i + 1) / 5)
self.assertEqual(self.queue.running, i != 4)
self.assertIsNone(self.queue._idle_id)
self.queue.run_task()
def test_rerun_task(self, mock_idle_add: unittest.mock.Mock,
mock_source_removed: unittest.mock.Mock):
"""Test rerunning a task."""
func = unittest.mock.Mock(return_value=False)
self.queue.push(func, 1)
self.queue.push(func, 2)
self.queue.run_task()
self.assertListEqual(self.queue._tasks, [(func, 1), (func, 2)])