emmental/emmental/thread.py

95 lines
2.8 KiB
Python

# Copyright 2024 (c) Anna Schumaker.
"""A Thread class designed to easily sync up with the main thread."""
import threading
class Data:
"""A class for holding generic fields inspired by SimpleNamespace."""
def __init__(self, values_dict: dict = {}, **kwargs):
"""Initialize our Data class."""
self.__dict__.update(values_dict | kwargs)
def __eq__(self, rhs: any) -> bool:
"""Compare two Data classes."""
if isinstance(rhs, Data):
return self.__dict__ == rhs.__dict__
elif isinstance(rhs, dict):
return self.__dict__ == rhs
return False
def __repr__(self) -> str:
"""Get a string representation of the Data."""
items = (f"{k}={v!r}" for k, v in self.__dict__.items())
return f"{type(self).__name__}({', '.join(items)})"
class Thread(threading.Thread):
"""A worker Thread class that is easy to sync up with the main thread."""
def __init__(self):
"""Initialize our worker Thread object."""
super().__init__()
self.ready = threading.Event()
self._condition = threading.Condition()
self._task = None
self._result = None
self.start()
def do_get_result(self, result: Data, **kwargs) -> Data:
"""Get the result of the task."""
return self._result
def do_run_task(self, task: Data) -> None:
"""Run the task."""
self.set_result()
def do_stop(self) -> None:
"""Extra work when stopping the thread."""
def get_result(self, **kwargs) -> Data:
"""Get the result of the current task."""
with self._condition:
if not self.ready.is_set() or self._result is None:
return None
res = self.do_get_result(self._result, **kwargs)
self._result = None
return res
def run(self) -> None:
"""Wait for a task to run."""
with self._condition:
self.ready.set()
while self._condition.wait():
if self._task is None:
self.do_stop()
break
self.do_run_task(self._task)
def set_result(self, **kwargs: dict) -> None:
"""Set the result of the task."""
self._result = Data(kwargs)
self.ready.set()
def __set_task(self, task: Data | None) -> None:
"""Set the task to be run by the thread."""
with self._condition:
self.ready.clear()
self._task = task
self._result = None
self._condition.notify()
def set_task(self, **kwargs: dict) -> None:
"""Set the task to be run by the thread."""
self.__set_task(Data(kwargs))
def stop(self) -> None:
"""Stop the thread."""
self.__set_task(None)
self.join()