Source code for upsies.jobs.jobrunner

"""
Manage a bunch of jobs
"""

import collections

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class JobRunner(collections.abc.Mapping): """ Wrapper around a bunch of :class:`~.JobBase` subclasses to manage them collectively This class subclasses :class:`~.collections.abc.Mapping` for easy access to jobs. Keys are job :attr:`~.JobBase.name`\\ s and values are the :class:`~.JobBase` instances. :param jobs: :class:`collections.abc.Iterable` of :class:`~.JobBase` subclasses :param str id: Any identifier (only used for debugging) """ def __init__(self, jobs=(), id='anonymous'): self._id = str(id) self._jobs = {} self.add(*jobs) def __getitem__(self, job_name): return self._jobs[job_name] def __iter__(self): return iter(self._jobs) def __len__(self): return len(self._jobs)
[docs] def add(self, *jobs): """ Add `jobs` Every job must have a unique :class:`~.JobBase.name`. :param jobs: :class:`~.JobBase` subclasses """ for job in jobs: if job.name in self._jobs: if job is not self._jobs[job.name]: raise RuntimeError(f'Conflicting job name: {job.name!r}') else: _log.debug('%s: Adding job: %r', self._id, job) self._jobs[job.name] = job
@property def all_jobs(self): """All :meth:`add`\\ ed jobs as a flat sequence""" return tuple(self._jobs.values()) @property def enabled_jobs(self): """Same as :attr:`all_jobs`, but without any jobs that are not :attr:`~.JobBase.is_enabled`""" return tuple(job for job in self._jobs.values() if job.is_enabled)
[docs] def start_more_jobs(self): """ :meth:`~.JobBase.start` all jobs that are ready to start Jobs that have :attr:`~.JobBase.autostart` unset are not started. Jobs may also not be started if they are :attr:`disabled <upsies.jobs.base.JobBase.is_enabled>` or otherwise unstartable. See :meth:`~.JobBase.start`. .. note:: This method must be called while an asyncio event loop is running because JobBase.start() calls JobBase.add_task(), and a task must be added to a loop to run. """ for job in self.all_jobs: if job.autostart: job.start()
[docs] async def wait(self): """ Wait for all jobs that are started and not finished .. note:: This does not raise exceptions from jobs. See :attr:`exceptions` after calling this method. """ def get_running_jobs(): return tuple( job for job in self.all_jobs if job.is_started and not job.is_finished ) while running_jobs := get_running_jobs(): _log.debug('%s: Waiting for running jobs:', self._id) for job in running_jobs: _log.debug('%s: * %r', self._id, job) for job in running_jobs: await job.wait_finished() _log.debug('%s: Job finished: %r', self._id, job) _log.debug('%s: All running jobs finished', self._id)
[docs] def terminate(self, reason=None): """Terminate :attr:`all_jobs`""" _log.debug('%s: Terminating all jobs', self._id) for job in self.all_jobs: job.terminate(reason=reason)
@property def all_jobs_finished(self): """Whether all :attr:`enabled_jobs` are finished""" return all(job.is_finished for job in self.enabled_jobs) @property def exceptions(self): """Sequence of :class:`Exception` instances :attr:`~.JobBase.raised` by any job""" return tuple( job.raised for job in self.all_jobs if job.raised )