Source code for upsies.utils.predbs.base

"""
Abstract base class for scene release databases
"""

import abc
import asyncio
import collections
import copy

from ... import constants, errors, utils

import logging  # isort:skip
_log = logging.getLogger(__name__)

natsort = utils.LazyModule(module='natsort', namespace=globals())


[docs] class PredbApiBase(abc.ABC): """Base class for scene release database APIs""" def __init__(self, config=None): self._config = copy.deepcopy(self.default_config) if config is not None: self._config.update(config.items()) @property @abc.abstractmethod def name(self): """Unique name of the scene release database""" @property @abc.abstractmethod def label(self): """User-facing name of the scene release database""" @property def config(self): """ User configuration This is a deep copy of :attr:`default_config` that is updated with the `config` argument from initialization. """ return self._config @property @abc.abstractmethod def default_config(self): """Default user configuration as a dictionary""" @abc.abstractmethod async def _search(self, query): """ Perform search :param SceneQuery query: Search keywords """
[docs] async def search(self, query, *, only_existing_releases=True): """ Search for scene release If there are no results and `query` is a directory path that looks like a season pack, perform one search per video file in that directory or any subdirectory. This is necessary to find mixed season packs. :param query: :class:`~.SceneQuery` object or :class:`str` to pass to :meth:`~.SceneQuery.from_string` or :class:`collections.abc.Mapping` to pass to :meth:`~.SceneQuery.from_release` :param bool only_existing_releases: If this is truthy, imaginary season pack releases are created and added to the search results. :return: :class:`list` of release names as :class:`str` :raise RequestError: if the search request fails """ path = None if isinstance(query, str): path = query query = utils.predbs.SceneQuery.from_string(query) elif isinstance(query, collections.abc.Mapping): query = utils.predbs.SceneQuery.from_release(query) try: results = list(await self._search(query)) except NotImplementedError as e: raise errors.RequestError(f'{self.name} does not support searching') from e if not results and path: # Maybe `path` points to season pack? # Find episodes and search for them individually. return await self._search_for_episodes(path, only_existing_releases) else: return self._postprocess_search_results(results, query, only_existing_releases)
async def _search_for_episodes(self, path, only_existing_releases): combined_results = [] for episode_query in self._generate_episode_queries(path): results = await self.search(episode_query, only_existing_releases=only_existing_releases) combined_results.extend(results) return combined_results def _generate_episode_queries(self, path): info = utils.release.ReleaseInfo(path) if info['type'] is utils.release.ReleaseType.season: # Create SceneQuery from each episode path episode_paths = utils.fs.file_list(path, extensions=constants.VIDEO_FILE_EXTENSIONS) for episode_path in episode_paths: if not utils.predbs.is_abbreviated_filename(episode_path): _log.debug('Generating query for episode: %r', episode_path) # guessit prefers getting the group name from the parent # directory, but the group in the parent directory is likely # "MiXED", so we definitely want the group from the file. filename = utils.fs.basename(episode_path) yield utils.predbs.SceneQuery.from_string(filename) def _postprocess_search_results(self, results, query, only_existing_releases): def sorted_and_deduped(results): return natsort.natsorted(set(results), key=str.casefold) if not query.episodes: # _log.debug('No episodes queried: %r', query.episodes) return sorted_and_deduped(results) else: # _log.debug('Episodes queried: %r', query.episodes) def get_wanted_episodes(season): # Combine episodes from any season with episodes from given # season (season being empty string means "any season") eps = None if '' in query.episodes: eps = query.episodes[''] if season in query.episodes: eps = (eps or []) + query.episodes[season] return eps # Translate single episodes into season packs. matches = [] for result in results: for result_season, result_eps in utils.release.Episodes.from_string(result).items(): wanted_episodes = get_wanted_episodes(result_season) # [] means season pack if wanted_episodes == []: if only_existing_releases: # Add episode from wanted season pack _log.debug('Adding episode from season pack: %r', result) matches.append(result) else: season_pack = utils.predbs.common.get_season_pack_name(result) if season_pack not in matches: _log.debug('Adding season pack: %r', season_pack) matches.append(season_pack) elif wanted_episodes is not None: for ep in result_eps: if ep in wanted_episodes: matches.append(result) break return sorted_and_deduped(matches) @abc.abstractmethod async def _release_files(self, release_name): pass
[docs] async def release_files(self, release_name): """ Map release file names to file information If this is not implemented by the subclass, :class:`NotImplementedError` is raised. Each file information is a dictionary that contains at least the keys ``release_name``, ``file_name`` and ``size``. More keys may be available depending on the subclass implementation. If `release_name` is a season pack, information the relevant episode releases is returned. :param str release_name: Exact name of the release :raise RequestError: if request fails or `release_name` is not found """ try: files = await self._release_files(release_name) except NotImplementedError as e: raise errors.RequestError(f'{self.name} does not provide file information') from e if files: return files else: _log.debug('No such release: %r', release_name) files = {} # If scene released "Foo.S01E0{1,2,3,...}.720p.BluRay-BAR" and we're # searching for "Foo.S01.720p.BluRay-BAR", we most likely don't get any # results. But we can get release names of individual episodes by # searching for the season pack, and then we can call release_files() # for each episode. release_info = utils.release.ReleaseInfo(release_name) if release_info['type'] is utils.release.ReleaseType.season: results = await self.search(release_info, only_existing_releases=True) if results: files = await asyncio.gather( *(self._release_files(result) for result in results) ) # Flatten sequence of dictionaries into single dictionary files = { file_name: file_info for files_ in files for file_name, file_info in files_.items() } _log.debug('Season pack from multiple episode releases: %r', files) # If scene released season pack (e.g. Extras or Bonus content) and we're # searching for a single episode, we most likely don't get any results. # Search for the season pack to get all files. elif release_info['type'] is utils.release.ReleaseType.episode: # Remove single episodes from seasons release_info['episodes'].remove_specific_episodes() results = await self.search(release_info) if len(results) == 1: _log.debug('Getting files from single result: %r', results[0]) files = await self._release_files(results[0]) # Go through all files and find the exact release name we're looking for. # Don't do this exclusively for episodes because not all multi-file releases # are a list of episodes (e.g. extras may not contain any "Exx"). for file_name, file_info in files.items(): if utils.fs.strip_extension(release_name) == utils.fs.strip_extension(file_name): files = {file_name: file_info} _log.debug('Single file from season pack release: %r', files) break return files