Source code for upsies.uis.tui.tui

"""
Interactive text user interface and job manager
"""

import asyncio

from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout import Layout
from prompt_toolkit.layout.containers import HSplit, Window, to_container
from prompt_toolkit.output import create_output

from ...jobs import JobRunner
from . import jobwidgets, style

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class TUI: def __init__(self): self._jobrunner = JobRunner(id='TUI') self._widgets = {} # Map job names to JobWidgetBase instances. self._containers = {} # Map job names to prompttoolkit.Container instances. self._focused_job_name = None self._app = self._make_app() self._loop = asyncio.get_event_loop() self._unhandled_exception = None self._loop.set_exception_handler(self._handle_exception) def _handle_exception(self, loop, context): exception = context.get('exception') if exception: _log.debug('Caught unhandled exception: %r', exception) _log.debug('Unhandled exception context: %r', context) if not self._unhandled_exception: self._unhandled_exception = exception self._jobrunner.terminate(reason=f'Unhandled exception: {exception!r}') def _make_app(self): self._jobs_container = HSplit( # FIXME: Layout does not accept an empty list of children, so we add an initial empty # Window that doesn't display anything. # https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1257 children=[Window()], style='class:default', ) self._layout = Layout(self._jobs_container) kb = KeyBindings() @kb.add('escape') @kb.add('c-g') @kb.add('c-q') @kb.add('c-c') def _(_event, self=self): self._jobrunner.terminate(reason='User terminated application') @kb.add('escape', 'I') def _(_event, self=self): _log.debug('=== CURRENT JOBS ===') for job in self._jobrunner.all_jobs: _log.debug(' %s (%d tasks):', job, len(job._tasks)) for task in job._tasks: _log.debug(' %r', task) _log.debug('Focused job: %r: %r', self._focused_job_name, self._focused_job) _log.debug('Focused widget [layout ]: %r', self._layout.current_control) _log.debug('Focused widget [focused]: %r', self._widgets.get(self._focused_job_name, None)) _log.debug('Focused container: %r', self._containers.get(self._focused_job_name, None)) _log.debug('Layout.has_focus(%r): %r', self._layout.current_control, self._layout.has_focus(self._layout.current_control)) return Application( # Write TUI to stderr if stdout is redirected. This is useful for allowing the user to # make decisions in the TUI (e.g. selecting an item from search results) while # redirecting the final output (e.g. an IMDb ID). output=create_output(always_prefer_tty=True), layout=self._layout, key_bindings=kb, style=style.style, full_screen=False, erase_when_done=False, mouse_support=False, # Determine the currently active job *after* Application was invalidated. At the time of # this writing, this is important so we don't try to focus widgets from a job that has # just finished, which can result in RuntimeErrors because signals cannot be emitted on # finished jobs. before_render=self._update_jobs_container, ) def _add_jobs(self, jobs): for job in jobs: self._add_job(job) # Add job widgets to the main container widget. self._update_jobs_container() # Register signal callbacks. It's probably best to do this after all jobs were added so that # signals aren't emitted before all jobs are available. self._connect_jobs(jobs) def _add_job(self, job): self._jobrunner.add(job) self._widgets[job.name] = jobwidgets.JobWidget(job, self._app) _log.debug('Job widget: %r: widget=%r', job.name, self._widgets[job.name]) self._containers[job.name] = to_container(self._widgets[job.name]) _log.debug('Job widget: %r: container=%r', job.name, self._containers[job.name]) def _connect_jobs(self, jobs): for job in jobs: # Every time a job finishes, other jobs can become enabled due to the dependencies on # other jobs or other conditions. We also want to display the next interactive job when # an interactive job is done. job.signal.register('finished', self._handle_job_finished) # A job can also signal explicitly that we should update the job widgets, e.g. to start # previously disabled jobs. job.signal.register('refresh_ui', self._refresh) def _handle_job_finished(self, finished_job): assert finished_job.is_finished, f'{finished_job.name} is actually not finished' # Start enabled but not yet started jobs and display the next interactive job. This also # generates the regular output if output from all jobs was read from cache and the TUI exits # immediately. self._refresh() # Terminate all jobs and exit if job finished with non-zero exit code. if finished_job.exit_code != 0: self._jobrunner.terminate(reason=f'Job failed: {finished_job.name}: {finished_job.raised!r}') def _refresh(self): self._jobrunner.start_more_jobs() self._update_jobs_container() self._app.invalidate() @property def _focused_job(self): if self._focused_job_name: return self._jobrunner[self._focused_job_name] # We accept one argument because `before_render` calls this method with the Application # instance. def _update_jobs_container(self, _=None): # Unfocus focused job if it is finished. if self._focused_job and self._focused_job.is_finished: # _log.debug('UPDATE JOB CONTAINER: Unfocusing: %r', self._focused_job_name) self._focused_job_name = None enabled_jobs = self._jobrunner.enabled_jobs # _log.debug('UPDATE JOB CONTAINER: Enabled jobs: %r', [job.name for job in enabled_jobs]) # Don't change focus if we already have a focused job. If another job becomes interactive # asynchronously (e.g. because a background job finished), it must not steal focus from the # currently focused job. if not self._focused_job_name: # Focus next interactive job. for job in enabled_jobs: if ( job.is_started and not job.is_finished and self._widgets[job.name].is_interactive ): # _log.debug('UPDATE JOB CONTAINER: Focusing next interactive job: %s', job.name) self._focused_job_name = job.name break # else: # _log.debug('UPDATE JOB CONTAINER: No focusable widget found: %r', [ # { # 'name': job.name, # 'is_started': job.is_started, # 'is_finished': job.is_finished, # 'is_interactive': self._widgets[job.name].is_interactive, # } # for job in enabled_jobs # ]) # else: # _log.debug('UPDATE JOB CONTAINER: Preserving focus: %r', self._focused_job_name) # Display focused job, finished jobs and all background jobs. self._jobs_container.children[:] = ( self._containers[job.name] for job in enabled_jobs if ( job.name == self._focused_job_name or job.is_finished or not self._widgets[job.name].is_interactive ) ) if self._focused_job_name: # Actually focus the focused job. try: self._layout.focus(self._containers[self._focused_job_name]) except ValueError: # A job may hardcode `is_interactive = True` even though it currently is not # focusable. This happens, for example, if the job is still autodetecting before the # user can fix or confirm the autodetected value. In that case, we can either wait # for the job to become focusable or focus another interactive job in the meantime. pass
[docs] async def run(self, jobs): """ Block while `jobs` are running and the user interface is up :param jobs: Iterable of :class:`~.jobs.base.JobBase` instances :raise: any :class:`Exception` from any job or the user interface :return: :attr:`~.JobBase.exit_code` from the first failed job or ``0`` if all jobs succeeded """ # Run the TUI in a background task that can be cancelled. This makes it easier to terminate # self._app.run_async() and get an exception from it. self._app_task = self._loop.create_task( self._app.run_async(set_exception_handler=False) ) _log.debug('============ TUI is now running ============') # Add jobs to our JobRunner and create TUI widgets for them. self._add_jobs(jobs) # Start initially enabled jobs. This must be done asynchronously because we need a running # asyncio event loop for JobBase.add_task(), which is called by JobBase.start(). self._jobrunner.start_more_jobs() # Wait for jobs to finish. await self._jobrunner.wait() _log.debug('All jobs finished:') for job in self._jobrunner.all_jobs: _log.debug(' * %r', job) # Wait for application task. self._app_task.cancel() try: await self._app_task except asyncio.CancelledError: pass # self._jobrunner.terminate(reason=f'Application was cancelled: {self._app_task!r}') _log.debug('============ TUI has stopped running ============') # Raise any stored exception. self._maybe_raise_exception() # Return application exit code (e.g. 0=success, 1=failure). return self._get_exit_code()
def _maybe_raise_exception(self): if self._unhandled_exception: # Some task raised an exception that wasn't handled properly. _log.debug('Raising unhandled exception: %r', self._unhandled_exception) raise self._unhandled_exception if self._jobrunner.exceptions: # One or more jobs raised an exception via JobBase.exception(). _log.debug('Raising first job exception: %r', self._jobrunner.exceptions) raise self._jobrunner.exceptions[0] def _get_exit_code(self): for job in self._jobrunner.all_jobs: _log.debug('Exit code of %r: %r', job.name, job.exit_code) # First non-zero exit_code is the application exit_code for job in self._jobrunner.all_jobs: if job.exit_code not in (0, None): _log.debug('Exiting with exit code from %s: %r', job.name, job.exit_code) return job.exit_code return 0