Source code for upsies.jobs.custom

"""
Custom job that provide the result of a coroutine
"""

import collections

from . import JobBase

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class CustomJob(JobBase): """ Wrapper around coroutine function This job adds the following signals to :attr:`~.JobBase.signal`: ``indicate_activity`` Emitted by :meth:`indicate_activity`. Registered callbacks get the `indicate_activity` and `text` arguments as positional arguments. """ @property def name(self): return self._name @property def label(self): return self._label
[docs] def initialize(self, *, name, label, worker, catch=()): """ Set internal state :param name: Name for internal use :param label: Name for user-facing use :param worker: Coroutine function that takes the job instance as a positional argument and returns this job's output If `worker` returns any iterable that isn't a string, each item is passed to :attr:`add_output`. If `worker` returns `None`, it is interpreted as "no output" and the job fails. An empty string is valid output and does not mean the job failed. :param catch: Sequence of :class:`Exception` classes to catch from `worker` and pass to :meth:`~.JobBase.error` This job finishes when `worker` returns. :class:`~.asyncio.CancelledError` from `worker` is ignored. Any other exceptions are passed to :meth:`~.JobBase.exception`. """ self._name = str(name) self._label = str(label) self._worker = worker self._expected_exceptions = tuple(catch) self.signal.add('indicate_activity')
[docs] def indicate_activity(self, indicate_activity=None, *, text=None): """ Emit the ``indicate_activity`` :attr:`~.JobBase.signal` :param indicate_activity: `True`, `False` or `None` (i.e. do not change the current state) :param text: :class:`str` or `None` (i.e. do not change the current state) """ self.signal.emit('indicate_activity', indicate_activity, text)
[docs] async def run(self): try: result = await self._worker(self) except self._expected_exceptions as e: self.error(e) else: self._handle_worker_result(result)
def _handle_worker_result(self, result): if result is not None: if ( not isinstance(result, str) and isinstance(result, collections.abc.Iterable) ): # Worker returned list/tuple/etc for r in result: self.add_output(r) else: self.add_output(result)