Source code for upsies.uis.tui.commands.base

"""
Abstract base class for commands
"""

import abc
import argparse
import collections
import re
import sys
import textwrap

from .... import __description__, __project_name__, __version__, constants, defaults, errors, utils
from .. import utils as tuiutils

import logging  # isort:skip
_log = logging.getLogger(__name__)


class _MyHelpFormatter(argparse.HelpFormatter):
    """Use "\n" to separate and preserve paragraphs and limit line width"""

    MAX_WIDTH = 90

    def __init__(self, *args, **kwargs):
        import shutil
        width_available = shutil.get_terminal_size().columns
        super().__init__(*args,
                         width=min(width_available, self.MAX_WIDTH),
                         **kwargs)

    def _fill_text(self, text, width, indent):
        return '\n'.join(self.__wrap_paragraphs(text, width))

    def _split_lines(self, text, width):
        return self.__wrap_paragraphs(text, width)

    def __wrap_paragraphs(self, text, width, indent=''):
        def wrap(text):
            if not text:
                return ['']
            else:
                return textwrap.wrap(
                    text=text,
                    width=min(width, self.MAX_WIDTH),
                    replace_whitespace=False,
                    initial_indent=indent,
                    subsequent_indent=indent,
                )

        if tuiutils.is_tty():
            text = re.sub(r'``(.*?)``', '\x1b[3m\\1\x1b[23m', text)
        else:
            text = re.sub(r'``(.*?)``', r'"\1"', text)

        return [line
                for paragraph in text.split('\n')
                for line in wrap(paragraph)]


[docs] def PrintText(text_getter): """ Print text returned by callable and exit with exit code 0 :param text_getter: Callable that takes no arguments and returns the text to print """ class PrintText(argparse.Action): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._text_getter = text_getter def __call__(self, *args, **kwargs): sys.stdout.write(str(self._text_getter()).strip() + '\n') sys.exit(0) def __repr__(self): return f'{type(self).__name__}({self._text_getter!r})' return PrintText
[docs] class CommandBase(abc.ABC): """ Base class for all commands :meth:`register` must be called on all subclasses. Instead of instantiating subclasses normally, the class method :meth:`run` should be used to get instances. :param args: Command line arguments :param config: :class:`~.config.base.ConfigBase` object """ _keybindings_description = ( "You can cancel any command with Escape, Ctrl+c or Ctrl+g.\n" "\n" "In the TUI, the usual readline/Emacs keybindings are supported. Here is an incomplete list:\n" "\n" " Ctrl+f Move forward character-wise\n" " Ctrl+b Move backward character-wise\n" " Alt+f Move forward word-wise\n" " Alt+b Move backward word-wise\n" " Ctrl+a Move leftmost position\n" " Ctrl+e Move rightmost position\n" " Ctrl+d Delete character under cursor\n" " Alt+d Delete from cursor to rightmost end of word\n" " Alt+Backspace Delete from cursor to leftmost end of word\n" " Ctrl+k Delete from cursor to end of line\n" " Ctrl+Space Start marking region\n" " Ctrl+w Cut region between mark (Ctrl+Space) and cursor\n" " Alt+w Copy region between mark (Ctrl+Space) and cursor\n" " Ctrl+y Paste region\n" " Ctrl+/ Undo most recent change\n" ) _argparser = argparse.ArgumentParser( description=__description__, epilog='keybindings:\n' + '\n'.join( '\n'.join(textwrap.wrap( text=paragraph, width=90, replace_whitespace=False, initial_indent=' ', subsequent_indent=' ', )) for paragraph in _keybindings_description.strip().split('\n') ), formatter_class=_MyHelpFormatter, ) _argparser.add_argument('--version', action='version', version=f'{__project_name__} {__version__}') _argparser.add_argument('--debug', '-d', metavar='FILE', help='Write debugging messages to FILE') _argparser.add_argument('--config-file', '-f', help='General configuration file path', default=constants.CONFIG_FILEPATH) _argparser.add_argument('--trackers-file', '-t', help='Trackers configuration file path', default=constants.TRACKERS_FILEPATH) _argparser.add_argument('--imghosts-file', '-i', help='Image hosting services configuration file path', default=constants.IMGHOSTS_FILEPATH) _argparser.add_argument('--clients-file', '-c', help='BitTorrent clients configuration file path', default=constants.CLIENTS_FILEPATH) _argparser.add_argument('--ignore-cache', '-C', help='Ignore results from previous calls', action='store_true') _argparser.add_argument('--clear-cache', help='Remove cache directory (config.main.cache_directory)', action='store_true') # Commands _subparsers = _argparser.add_subparsers(title='commands') # Mutually exclusive arguments _mutex_groups = collections.defaultdict(dict) names = NotImplemented """ Sequence of command names The first name is the full name and the rest are short aliases. """ description = '' """ Extended description The class docstring is the main description. Use this class attribute to generate text programmatically. """ cli_arguments = NotImplemented """ CLI argument definitions for this command This is a :class:`dict` in which keys are option names or flags (e.g. ``"PATH"`` or ``("--path", "-p")`` and each value is a :class:`dict` with keyword arguments for :meth:`argparse.ArgumentParser.add_argument`. Additionally, you may specify a ``group`` in each keyword argument dictionary. Arguments with the same ``group`` value are mutually exclusive, meaning the user can only specify one of them. """ subcommands = {} """ Subcommands of commands (or subsubcommands from a CLI point of view) This is a :class:`dict` where keys are subcommand names and values are :attr:`cli_arguments`. """ subcommand_descriptions = {} """ Descriptions of subcommands For each key in :attr:`subcommands`, this :class:`dict` may provide a longer multiline (or even multiparagraph) description. """ subcommand_name = 'PLEASE_GIVE_THIS_SUBCOMMAND_A_PROPER_METAVAR' """Reference to expected subcommand in help texts"""
[docs] @classmethod def register(cls): """ Add the command and its arguments to the internal :class:`argparse.ArgumentParser` instance This classmethod must be called on every subclass. """ # Get long text in output of "upsies <subcommand> --help" if cls.description: description = textwrap.dedent(cls.__doc__.strip('\n')) + '\n\n' + cls.description else: description = textwrap.dedent(cls.__doc__.strip('\n')) if cls.subcommands: description += f'\n\nRun ``upsies {cls.names[0]} {cls.subcommand_name} --help`` for more information.' # Get short text in output of "upsies --help" help = description.split('\n', 1)[0] parser = cls._subparsers.add_parser( cls.names[0], aliases=cls.names[1:], help=help, description=description, formatter_class=_MyHelpFormatter, ) # Map the subclass we are registering to a "command" attribute of the parser. This allows us # to find the proper class based on the subcommand name later. parser.set_defaults(subclass=cls) # Add options and flags for this command cls._add_args(parser, cls.cli_arguments, cls._mutex_groups[None]) # Add subcommands if cls.subcommands: subparsers = parser.add_subparsers( metavar=cls.subcommand_name, help='Valid values: ' + ', '.join(sorted(cls.subcommands)), # Generate the expected error message instead of the cryptic # AttributeError: 'Namespace' object has no attribute 'subcommand' required=True, ) for subcmd_name, subcmd_info in cls.subcommands.items(): description = subcmd_info.get('description', '') args = subcmd_info.get('cli', {}) subparser = subparsers.add_parser( subcmd_name, description=description, formatter_class=_MyHelpFormatter, ) subparser.set_defaults(subcommand=subcmd_name) cls._add_args(subparser, args, cls._mutex_groups[subcmd_name])
@staticmethod def _add_args(parser, cli_arguments, mutex_groups): for argname, argopts in cli_arguments.items(): names = (argname,) if isinstance(argname, str) else argname group_name = argopts.pop('group', None) if group_name: # Put arguments with the same argopts["group"] in a mutually # exclusive group. # FIXME: https://bugs.python.org/issue41854 if not names[0].startswith('-') and argopts.get('default') is None: raise RuntimeError('Default values of mutually exclusive positional arguments ' 'must not be None. See: https://bugs.python.org/issue41854') if group_name not in mutex_groups: mutex_groups[group_name] = parser.add_mutually_exclusive_group() mutex_groups[group_name].add_argument(*names, **argopts) else: # Allow using REMAINDER flag without importing argparse # everywhere. This also introduces an abstraction layer in case # we ever use something else than argparse. if argopts.get('nargs') == 'REMAINDER': argopts['nargs'] = argparse.REMAINDER parser.add_argument(*names, **argopts)
[docs] @classmethod def run(cls, args): """ Execute command :param args: Sequence of CLI arguments or `None` to use `sys.argv` """ try: if args is None: main_args, _remaining_args = cls._argparser.parse_known_args() else: main_args, _remaining_args = cls._argparser.parse_known_args(args) except SystemExit as e: # argparse has sys.exit(2) hardcoded for CLI errors. raise SystemExit(e.code if e.code in (0, 1) else 1) from e # Debugging if main_args.debug: logging.basicConfig( format='%(asctime)s: %(name)s: %(message)s', filename=main_args.debug, ) logging.getLogger(__project_name__).setLevel(level=logging.DEBUG) _log.debug('Started logging with %s version %s', __project_name__, __version__) # Read config files try: cfg = defaults.Config() _log.debug('Loaded defaults:\n%s', cfg.dump_defaults()) # Ideally, we would ignore a missing default config file and error if the user gave us a # non-existing custom file path. Something like this: # # ignore_missing=bool(main_args.config_file == constants.CONFIG_FILEPATH) # # But if the user wants to dump a fresh config to a non-existing file, we don't want to # throw an error. So for now, we simply ignore all missing config files, even if the # user provided one. cfg.read('config', main_args.config_file, ignore_missing=True) cfg.read('clients', main_args.clients_file, ignore_missing=True) cfg.read('imghosts', main_args.imghosts_file, ignore_missing=True) cfg.read('trackers', main_args.trackers_file, ignore_missing=True) _log.debug('Loaded configuration:\n%s', cfg.dump()) except errors.ConfigError as e: raise errors.UiError(e) from e if main_args.clear_cache: utils.fs.clear_cache(cfg['config']['main']['cache_directory']) if hasattr(main_args, 'subclass'): # `main_args.subclass` is set by `CommandBase.register()` to a subclass that implements # a concrete subcommand, e.g. `submit`. return main_args.subclass(args=args, config=cfg) elif not hasattr(main_args, 'subclass') and not main_args.clear_cache: cls._argparser.print_help()
def __init__(self, args, config): self._args = self._argparser.parse_args(args) self._config = config @property @abc.abstractmethod def jobs(self): """ Iterable of :class:`~.jobs.base.JobBase` instances Each :class:`~.jobs.base.JobBase` instance may also be `None` for inactive jobs. """ @property def jobs_active(self): """Same as :attr:`jobs` but without `None` values""" return tuple(job for job in self.jobs if job is not None) @property def main_job(self): """ The main job in :attr:`jobs_active` This job's output is the output of the command. In the default implementation, this is the last job in :attr:`jobs_active` that :attr:`~.JobBase.is_enabled`. """ for j in reversed(self.jobs_active): if j.is_enabled: return j @property def args(self): """Parsed CLI arguments as :class:`argparse.Namespace` object""" return self._args @property def config(self): """Config file options as :class:`~.config.base.ConfigBase` object""" return self._config
[docs] def get_options(self, section_name, subsection_name): """ Combine section in config file with CLI arguments CLI arguments take precedence unless their value is `None`. :param str section_name: Config file name with out the "INI" extension :param str subsection_name: Name of a section in the config file :return: :class:`dict` """ config = self.config[section_name][subsection_name] args = vars(self.args) options = {} options.update(config) options.update( (k, v) for k, v in args.items() if v is not None ) return options
@property def home_directory(self): """ Passed as `home_directory` argument to :class:`.jobs.base.JobBase` instances that are instantiated by this class The default implementation passes the ``CONTENT`` or ``RELEASE`` argument and ``config.main.cache_directory`` to :func:`.fs.projectdir`. If no ``CONTENT`` or ``RELEASE`` argument exists, return ``config.main.cache_directory`` directly. """ if hasattr(self.args, 'CONTENT'): content_path = self.args.CONTENT elif hasattr(self.args, 'RELEASE'): content_path = self.args.RELEASE else: return self.config['config']['main']['cache_directory'] return utils.fs.projectdir( content_path=content_path, base=self.config['config']['main']['cache_directory'], ) @property def cache_directory(self): """ Passed as `cache_directory` argument to :class:`.jobs.base.JobBase` instances that are instantiated by this class The default implementation returns :attr:`home_directory` (e.g. ``~/.cache/upsies/Foo.2012-ASDF.upsies/``) if the ``CONTENT`` or ``RELEASE`` argument exists and returns ``config.main.cache_directory`` otherwise. """ if hasattr(self.args, 'CONTENT') or hasattr(self.args, 'RELEASE'): return self.home_directory else: return self.config['config']['main']['cache_directory']