94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
# Copyright 2022 (c) Anna Schumaker
|
|
"""Idle queues to assid with large database operations."""
|
|
import typing
|
|
from gi.repository import GObject
|
|
from gi.repository import GLib
|
|
|
|
|
|
class Queue(GObject.GObject):
|
|
"""A base class Idle Queue."""
|
|
|
|
total = GObject.Property(type=int)
|
|
progress = GObject.Property(type=float)
|
|
running = GObject.Property(type=bool, default=False)
|
|
enabled = GObject.Property(type=bool, default=True)
|
|
|
|
def __init__(self, **kwargs):
|
|
"""Initialize an Idle Queue."""
|
|
super().__init__(**kwargs)
|
|
self._tasks = []
|
|
self._idle_id = None
|
|
|
|
def __getitem__(self, n: int) -> tuple:
|
|
"""Get the n-th task in the queue."""
|
|
return self._tasks[n] if n < len(self._tasks) else None
|
|
|
|
def __run_next_task(self) -> None:
|
|
task = self._tasks[0]
|
|
if task[0](*task[1:]):
|
|
self._tasks.pop(0)
|
|
|
|
def __start(self) -> None:
|
|
if not self.running:
|
|
self.running = True
|
|
self._idle_id = GLib.idle_add(self.run_task)
|
|
self.__update_counters()
|
|
|
|
def __update_counters(self) -> bool:
|
|
if (pending := len(self._tasks)) == 0:
|
|
self.cancel()
|
|
return GLib.SOURCE_REMOVE
|
|
self.progress = 1 - (pending / self.total)
|
|
return GLib.SOURCE_CONTINUE
|
|
|
|
def cancel(self) -> None:
|
|
"""Cancel all pending tasks."""
|
|
if self._idle_id is not None:
|
|
GLib.source_remove(self._idle_id)
|
|
|
|
self._tasks.clear()
|
|
self.progress = 0.0
|
|
self.total = 0
|
|
self.running = False
|
|
self._idle_id = None
|
|
|
|
def cancel_task(self, func: typing.Callable) -> None:
|
|
"""Remove all instances of a specific task from the Idle Queue."""
|
|
self._tasks = [t for t in self._tasks if t[0] != func]
|
|
self.__update_counters()
|
|
|
|
def complete(self) -> None:
|
|
"""Complete all pending tasks."""
|
|
if self.running:
|
|
while len(self._tasks) > 0:
|
|
self.__run_next_task()
|
|
self.cancel()
|
|
|
|
def push(self, func: typing.Callable, *args,
|
|
now: bool = False) -> bool | None:
|
|
"""Add a task to the Idle Queue."""
|
|
if not self.enabled or now:
|
|
return func(*args)
|
|
|
|
self._tasks.append((func, *args))
|
|
self.total += 1
|
|
self.__start()
|
|
|
|
def push_many(self, func: typing.Callable, args: list[tuple[any]],
|
|
now: bool = False) -> None:
|
|
"""Add several tasks to the Idle Queue."""
|
|
if not self.enabled or now:
|
|
for arg in args:
|
|
func(*arg)
|
|
else:
|
|
self._tasks.extend([(func, *arg) for arg in args])
|
|
self.total += len(args)
|
|
self.__start()
|
|
|
|
def run_task(self) -> bool:
|
|
"""Manually run the next task."""
|
|
if len(self._tasks) > 0:
|
|
self.__run_next_task()
|
|
return self.__update_counters()
|
|
return GLib.SOURCE_REMOVE
|