Source code for upsies.utils.predbs.multi

import asyncio
import collections
import difflib
import functools
import os
import re

import async_lru

from ... import constants, errors, utils
from . import SceneCheckResult

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class MultiPredbApi: """ Wrapper around multiple :class:`~.PredbApiBase` instances Each method loops over every provided predb and calls the same method on it. The first call that does not raise :class:`~.RequestError` or :class:`NotImplementedError` is returned. If no call succeeds and any exceptions were raised, they are combined into a single :class:`~.RequestError`, which is raised. If every call raised :class:`NotImplementedError`, a combined :class:`NotImplementedError` is raised. :param predbs: Sequence of :class:`~.PredbApiBase` instances """ DEFAULT_PREDB_NAMES = ( 'srrdb', # 'corruptnet', 'predbclub', 'predbnet', ) def __init__(self, predbs=None): if predbs: self._predbs = predbs else: self._predbs = [ utils.predbs.predb(name) for name in type(self).DEFAULT_PREDB_NAMES ] @functools.cached_property def predbs(self): """Sequence of :class:`~.PredbApiBase` that are used""" return tuple(self._predbs) async def _call(self, predb, method_name, *args, **kwargs): method = getattr(predb, method_name) return await method(*args, **kwargs) def _raise(self, exceptions, method_name): if not exceptions: raise NotImplementedError( '|'.join(predb.name for predb in self._predbs) + f'.{method_name}' ) elif len(exceptions) == 1: raise exceptions[0] else: raise errors.RequestError( 'All queries failed: ' + ', '.join(str(e) for e in exceptions) ) async def _for_each_predb(self, predbs, method_name, *args, **kwargs): # Try calling `method_name` on each predb sequentially and return the # first successful response. exceptions = [] for predb in predbs: _log.debug('Trying %s.%s', predb.name, method_name) try: return await self._call(predb, method_name, *args, **kwargs) except (NotImplementedError, errors.RequestError) as e: _log.debug('Collecting exception: %r', e) exceptions.append(e) # All predbs are unreachable or don't have `method_name` implemented. self._raise(exceptions, method_name) async def _for_all_predbs(self, predbs, method_name, *args, **kwargs): # Call `method_name` on all predbs concurrently and yield the return # value or RequestError/NotImplementedError as they occur. method_calls = [ asyncio.create_task( self._call(predb, method_name, *args, **kwargs) ) for predb in predbs ] try: for coro in asyncio.as_completed(method_calls): try: result = await coro except (NotImplementedError, errors.RequestError) as e: yield e else: yield result finally: # If our caller stops iterating over our yielded `result`s, we may # still have unfinished tasks that will continue to run in the # background. We cancel them here because # 1. nobody cares about their return values anymore # # 2. if one of the tasks raises an exception, asyncio will # eventually complain about an unhandled exception. Cancelling # the tasks seems to prevents that. # # NOTE: This is untested functionality because I couldn't reproduce # this issue in a test case. for task in method_calls: task.cancel()
[docs] @async_lru.alru_cache async def search(self, *args, **kwargs): """ Search multiple predbs concurrently and return first non-empty results See :meth:`~.PredbApiBase.search`. """ # Because srrdb always blocks search requests for a long time (~50 seconds), we don't use it # if we have alternatives. This is less accurate because some releases are only listed on # srrdb (see the commented-out tests in the same commit as this comment), but the long wait # times are too annoying. predbs = self._predbs if len(predbs) >= 2: predbs = tuple( predb for predb in predbs if predb.name != 'srrdb' ) _log.debug('Searching predbs: %r', tuple(p.name for p in predbs)) exceptions = [] async for results in self._for_all_predbs(predbs, 'search', *args, **kwargs): if isinstance(results, Exception): exceptions.append(results) # Return non-empty search results or wait for next response elif results: return results if exceptions: # All predbs are unreachable or don't have `method_name` implemented. # Raise combined RequestErrors. self._raise(exceptions, method_name='search') else: # No search results return []
[docs] @async_lru.alru_cache async def release_files(self, *args, **kwargs): """See :meth:`~.PredbApiBase.release_files`""" return await self._for_each_predb(self._predbs, 'release_files', *args, **kwargs)
_nogroup_regexs = ( re.compile(r'^$'), re.compile(r'^(?i:nogroup)$'), re.compile(r'^(?i:nogrp)$'), )
[docs] @async_lru.alru_cache async def is_scene_release(self, release): """ Return whether `release` is a scene release or not .. note:: A renamed or otherwise altered scene release is still considered a scene release. :param release: Release name, path to release :return: :class:`~.types.SceneCheckResult` """ if isinstance(release, str): release_info = utils.release.ReleaseInfo(release) else: release_info = release # Empty group or names like "NOGROUP" are non-scene if ( # Any NOGROUP-equivalent means it's not scene any(regex.search(release_info['group']) for regex in self._nogroup_regexs) # Abbreviated file names also have an empty group, but ReleaseInfo() # can pick up information from the parent directory and we can do a # successful search for that. and not utils.predbs.is_abbreviated_filename(release_info.path) ): return SceneCheckResult.false # We know at least `title` and `group`. If there are any search results, # it should be a scene release. results = await self.search(release) if results: return SceneCheckResult.true else: return SceneCheckResult.false
[docs] async def verify_release_name(self, content_path, release_name): """ Raise if release was renamed :param content_path: Path to release file or directory :param release_name: Known exact release name, e.g. from :meth:`search` results :raise SceneRenamedError: if release was renamed :raise SceneError: if `release_name` is not a scene release :raise NotImplementedError: if :meth:`release_files` raises it """ content_path = content_path.strip(os.sep) _log.debug('Verifying release name: %r =? %r', content_path, release_name) # TODO: Rewrite this comment: We MUST first check if release_files() is implemented. If we check # is_scene_release() first and an incomplete predb wrongly reports "not # a scene release", MultiPredbApi can move on the next predb. files = await self.release_files(release_name) # TODO: Rewrite this comment: We check is_scene_release() a) because we can fail early before we do # all the work below and b) because we don't want to raise # SceneRenamedError below if this is not a scene release. if not await self.is_scene_release(release_name): raise errors.SceneError(f'Not a scene release: {release_name}') # Figure out which file is the actual payload. Note that the release may not # contain any files, e.g. Wrecked.2011.DiRFiX.LIMITED.FRENCH.720p.BluRay.X264-LOST. main_release_file = None if files: content_file_extension = utils.fs.file_extension(content_path) if content_file_extension: # `content_path` points to a file, not a directory content_file = utils.fs.basename(content_path) # Find the closest match in the released files # (`content_file` may have been renamed) filename_matches = difflib.get_close_matches(content_file, files) if filename_matches: main_release_file = filename_matches[0] if not main_release_file and files: # Default to the largest file main_release_file = sorted( (info for info in files.values()), key=lambda info: info['size'], )[0]['file_name'] # No files in this release, default to `release_name` if not main_release_file: main_release_file = release_name _log.debug('Main release file: %r', main_release_file) # Generate set of paths that are valid for this release acceptable_paths = {release_name} # Properly named directory that contains the released file. This covers # abbreviated files and all other files. for file in files: acceptable_paths.add(os.path.join(release_name, file)) # Any non-abbreviated files may exist outside of a properly named parent # directory for file in files: if not utils.predbs.is_abbreviated_filename(file): acceptable_paths.add(file) # If `release_name` is an episode, it may be inside a season pack parent # directory. This only matters if we're dealing with an abbreviated file # name; normal file names are independent of their parent directory name. if utils.predbs.is_abbreviated_filename(content_path): season_pack_name = utils.predbs.common.get_season_pack_name(release_name) for file in files: acceptable_paths.add(f'{season_pack_name}/{file}') # Standalone file is also ok if it is named `release_name` with the same # file extension as the main file main_release_file_extension = utils.fs.file_extension(main_release_file) acceptable_paths.add(f'{release_name}.{main_release_file_extension}') # Release is correctly named if `content_path` ends with any acceptable path for path in (p.strip(os.sep) for p in acceptable_paths): if re.search(rf'(?:^|{re.escape(os.sep)}){re.escape(path)}$', content_path): return # All attempts to match `content_path` against `release_name` have failed. # Produce a useful error message. if utils.predbs.is_abbreviated_filename(content_path): # Abbreviated files should never be handled without a parent original_name = os.path.join(release_name, main_release_file) elif utils.fs.file_extension(content_path): # Assume `content_path` refers to a file, not a directory # NOTE: We can't use os.path.isdir(), `content_path` may not exist. original_name = main_release_file else: # Assume `content_path` refers to directory original_name = release_name # Use the same number of parent directories for original/existing path. If # original_name contains the parent directory, we also want the parent # directory in existing_name. original_name_parts_count = original_name.count(os.sep) content_path_parts = content_path.split(os.sep) existing_name = os.sep.join(content_path_parts[-original_name_parts_count - 1:]) raise errors.SceneRenamedError( original_name=original_name, existing_name=existing_name, )
[docs] async def verify_release_files(self, content_path, release_name): """ Check if existing files have the correct size :param content_path: Path to release file or directory :param release_name: Known exact release name, e.g. from :meth:`search` results The return value is a sequence of :class:`~.errors.SceneError` exceptions: * :class:`~.errors.SceneFileSizeError` if a file has the wrong size * :class:`~.errors.SceneMissingInfoError` if the correct file size of file cannot be found * :class:`~.errors.SceneError` if `release_name` is not a scene release """ exceptions = [] # TODO: Rewrite this comment: We MUST first check if release_files() is implemented. If we check # is_scene_release() first and an incomplete predb wrongly reports "not # a scene release", MultiPredbApi can move on the next predb. fileinfos = await self.release_files(release_name) # TODO: Rewrite this comment: We check is_scene_release() a) because we can fail early before we do # all the work below and b) because we don't want to raise # SceneRenamedError below if this is not a scene release. if not await self.is_scene_release(release_name): exceptions.append(errors.SceneError(f'Not a scene release: {release_name}')) if exceptions: return tuple(exceptions) def get_release_filesize(filename): return fileinfos.get(filename, {}).get('size', None) # Map file paths to expected file sizes if os.path.isdir(content_path): # Map each file in content_path to its correct size exp_filesizes = { filepath: get_release_filesize(utils.fs.basename(filepath)) for filepath in utils.fs.file_list(content_path) } # `content_path` is file elif len(fileinfos) == 1: # Original release is also a single file, but it may be in a directory filename = tuple(fileinfos)[0] exp_filesize = get_release_filesize(filename) exp_filesizes = { # Title.2015.720p.BluRay.x264-FOO.mkv content_path: exp_filesize, # Title.2015.720p.BluRay.x264-FOO/foo-title.mkv os.path.join(utils.fs.strip_extension(content_path), filename): exp_filesize, } else: # Original release is multiple files (e.g. Extras or Bonus) filename = utils.fs.basename(content_path) exp_filesizes = {content_path: get_release_filesize(filename)} # Compare expected file sizes to actual file sizes for filepath, exp_size in exp_filesizes.items(): filename = utils.fs.basename(filepath) actual_size = utils.fs.file_size(filepath) _log.debug('Checking file size: %s: %r ?= %r', filename, actual_size, exp_size) if exp_size is None: _log.debug('No info: %s', filename) exceptions.append(errors.SceneMissingInfoError(filename)) elif actual_size is not None: if actual_size != exp_size: _log.debug('Wrong size: %s', filename) exceptions.append( errors.SceneFileSizeError( filename=filename, original_size=exp_size, existing_size=actual_size, ) ) else: _log.debug('Correct size: %s', filepath) else: _log.debug('No such file: %s', filepath) return tuple(e for e in exceptions if e)
[docs] async def verify_release(self, content_path, release_name=None): """ Find matching scene releases and apply :meth:`verify_release_name` and :meth:`verify_release_files` :param content_path: Path to release file or directory :param release_name: Known exact release name or `None` to :meth:`search` for `content_path` :return: :class:`~.types.SceneCheckResult` enum from :meth:`is_scene_release` and sequence of :class:`~.errors.SceneError` exceptions from :meth:`verify_release_name` and :meth:`verify_release_files` """ # If we know the exact release name, this is easy. if release_name: return await self._verify_release(content_path, release_name) # Find possible `release_name` values. For season packs that were released # as single episodes, this will get us a sequence of episode release names. existing_release_names = await self.search(content_path) if not existing_release_names: return SceneCheckResult.false, () # Maybe `content_path` was released by scene as it is (as file or directory) for existing_release_name in existing_release_names: is_scene_release, exceptions = await self._verify_release(content_path, existing_release_name) if is_scene_release and not exceptions: return SceneCheckResult.true, () # Maybe `content_path` is a directory (season pack) and scene released # single files (episodes). return await self._verify_release_per_file(content_path)
async def _verify_release(self, content_path, release_name): _log.debug('Verifying %r against release: %r', content_path, release_name) # Stop other checks if this is not a scene release is_scene = await self.is_scene_release(release_name) if is_scene in (SceneCheckResult.false, SceneCheckResult.unknown): return is_scene, () # Combined exceptions from verify_release_name() and verify_release_files() exceptions = [] # verify_release_name() can only produce one exception, so it is raised try: await self.verify_release_name(content_path, release_name) except errors.SceneError as e: exceptions.append(e) # verify_release_files() can produce multiple exceptions, so it returns them exceptions.extend(await self.verify_release_files(content_path, release_name)) return is_scene, tuple(exceptions) async def _verify_release_per_file(self, content_path): _log.debug('Verifying each file beneath %r', content_path) is_scene_releases = [] combined_exceptions = collections.defaultdict(list) filepaths = utils.fs.file_list(content_path, extensions=constants.VIDEO_FILE_EXTENSIONS) for filepath in filepaths: existing_release_names = await self.search(filepath) _log.debug('Search results for %r: %r', filepath, existing_release_names) # If there are no search results, default to "not a scene release" is_scene_release = SceneCheckResult.false # Match each existing_release_name against filepath for existing_release_name in existing_release_names: is_scene_release, exceptions = await self._verify_release(filepath, existing_release_name) _log.debug('Verified %r against %r: %r, %r', filepath, existing_release_name, is_scene_release, exceptions) if is_scene_release and not exceptions: # Match found, don't check other existing_release_names break elif is_scene_release: # Remember exceptions per file (makes debugging easier) combined_exceptions[filepath].extend(exceptions) # Remember the SceneCheckResult when the for loop ended. True if we # found a scene release at any point, other it's the value of the last # existing_release_name. is_scene_releases.append(is_scene_release) # Collapse `is_scene_releases` into a single value if is_scene_releases and all(isr is SceneCheckResult.true for isr in is_scene_releases): _log.debug('All files are scene releases') is_scene_release = SceneCheckResult.true elif is_scene_releases and all(isr is SceneCheckResult.false for isr in is_scene_releases): _log.debug('All files are non-scene releases') is_scene_release = SceneCheckResult.false else: _log.debug('Uncertain scene status: %r', is_scene_releases) is_scene_release = SceneCheckResult.unknown return is_scene_release, tuple( exception for exceptions in combined_exceptions.values() for exception in exceptions )