95 lines
2.8 KiB
Python
95 lines
2.8 KiB
Python
# Copyright 2024 (c) Anna Schumaker.
|
|
"""A Thread class designed to easily sync up with the main thread."""
|
|
import threading
|
|
|
|
|
|
class Data:
|
|
"""A class for holding generic fields inspired by SimpleNamespace."""
|
|
|
|
def __init__(self, values_dict: dict = {}, **kwargs):
|
|
"""Initialize our Data class."""
|
|
self.__dict__.update(values_dict | kwargs)
|
|
|
|
def __eq__(self, rhs: any) -> bool:
|
|
"""Compare two Data classes."""
|
|
if isinstance(rhs, Data):
|
|
return self.__dict__ == rhs.__dict__
|
|
elif isinstance(rhs, dict):
|
|
return self.__dict__ == rhs
|
|
return False
|
|
|
|
def __repr__(self) -> str:
|
|
"""Get a string representation of the Data."""
|
|
items = (f"{k}={v!r}" for k, v in self.__dict__.items())
|
|
return f"{type(self).__name__}({', '.join(items)})"
|
|
|
|
|
|
class Thread(threading.Thread):
|
|
"""A worker Thread class that is easy to sync up with the main thread."""
|
|
|
|
def __init__(self):
|
|
"""Initialize our worker Thread object."""
|
|
super().__init__()
|
|
self.ready = threading.Event()
|
|
|
|
self._condition = threading.Condition()
|
|
self._task = None
|
|
self._result = None
|
|
|
|
self.start()
|
|
|
|
def do_get_result(self, result: Data, **kwargs) -> Data:
|
|
"""Get the result of the task."""
|
|
return self._result
|
|
|
|
def do_run_task(self, task: Data) -> None:
|
|
"""Run the task."""
|
|
self.set_result()
|
|
|
|
def do_stop(self) -> None:
|
|
"""Extra work when stopping the thread."""
|
|
|
|
def get_result(self, **kwargs) -> Data:
|
|
"""Get the result of the current task."""
|
|
with self._condition:
|
|
if not self.ready.is_set() or self._result is None:
|
|
return None
|
|
|
|
res = self.do_get_result(self._result, **kwargs)
|
|
self._result = None
|
|
return res
|
|
|
|
def run(self) -> None:
|
|
"""Wait for a task to run."""
|
|
with self._condition:
|
|
self.ready.set()
|
|
|
|
while self._condition.wait():
|
|
if self._task is None:
|
|
self.do_stop()
|
|
break
|
|
|
|
self.do_run_task(self._task)
|
|
|
|
def set_result(self, **kwargs: dict) -> None:
|
|
"""Set the result of the task."""
|
|
self._result = Data(kwargs)
|
|
self.ready.set()
|
|
|
|
def __set_task(self, task: Data | None) -> None:
|
|
"""Set the task to be run by the thread."""
|
|
with self._condition:
|
|
self.ready.clear()
|
|
self._task = task
|
|
self._result = None
|
|
self._condition.notify()
|
|
|
|
def set_task(self, **kwargs: dict) -> None:
|
|
"""Set the task to be run by the thread."""
|
|
self.__set_task(Data(kwargs))
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the thread."""
|
|
self.__set_task(None)
|
|
self.join()
|