Source code for upsies.utils.predbs.srrdb

import os
import re
from base64 import b64decode

from ... import errors, utils
from . import base

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class SrrdbApi(base.PredbApiBase): name = 'srrdb' label = 'srrDB' default_config = {} _url_base = b64decode('YXBpLnNycmRiLmNvbQ==').decode('ascii') _search_url = f'https://{_url_base}/v1/search' _details_url = f'https://{_url_base}/v1/details' _keyword_separators_regex = re.compile(r'[^a-zA-Z0-9]') # We can skip over 50_000 results and get 1000 results per request # https://www.srrdb.com/help _page_size = 1000 _max_skip = 50_000 @staticmethod def _make_release_name_from_path(path): return utils.fs.strip_extension(utils.fs.basename(path)) async def _search(self, query): # Because srrdb search is so slow, we first try to find the exact # release name with the "search/r:RELEASE_NAME" API call. If that yields # a result, we know it's a properly named scene release. If there are no # results, we have to do a slow keyword search because it might still be # a renamed scene release. if query.release_info: results = await self._search_for_exact_movie_or_episode(query) if results: return results # Because there are no scene season packs, we have to do one # "search/r:RELEASE_NAME" call per episode. results = await self._search_for_exact_season(query) if results: return results # Default to slow keyword search. Request as many results as possible. # They should get filtered somewhere upstream by our callers. combined_results = [] for skip in range(0, self._max_skip + 1, self._page_size): keywords_path = self._get_keywords_path(query.keywords, query.group, skip) results = await self._request_search_page(keywords_path) combined_results.extend(results) if len(results) < self._page_size: # We didn't get a full page, so we assume this is the last page break return combined_results async def _search_for_exact_movie_or_episode(self, query): if query.release_info['type'] in ( utils.release.ReleaseType.movie, utils.release.ReleaseType.episode, ): release_name = self._make_release_name_from_path(query.release_info.path) results = await self._search_for_exact_release_name(release_name) return results async def _search_for_exact_season(self, query): if os.path.isdir(query.release_info.path) and query.release_info['type'] is utils.release.ReleaseType.season: _log.debug('Searching for exact season pack: %r', query.release_info.path) combined_results = [] for video_file in utils.fs.find_main_videos(query.release_info.path): release_name = self._make_release_name_from_path(video_file) results = await self._search_for_exact_release_name(release_name) if not results: # Single non-scene episode makes the season pack non-scene. # Is there a better way to deal with this situation? _log.debug('Non-scene episode: %r', video_file) combined_results.clear() break else: combined_results.extend(results) return combined_results async def _search_for_exact_release_name(self, release_name): search_url = f'{self._search_url}/r:{release_name}' _log.debug('Quicksearch URL: %r', search_url) try: response = (await utils.http.get(search_url, cache=True)).json() except errors.RequestError as e: # SrrDB returns HTTP errors if there are illegal characters in the # release name or the request is otherwise unexpected. For example, # if there are spaces in `release_name`, we get "403 Forbidden". _log.debug('Quicksearch error: %r', e) return () else: results = response.get('results', []) return tuple(r['release'] for r in results) async def _request_search_page(self, keywords_path): search_url = f'{self._search_url}/{keywords_path}' _log.debug('Search URL: %s', search_url) response = (await utils.http.get(search_url, cache=True)).json() results = response.get('results', []) return tuple(r['release'] for r in results) def _get_keywords_path(self, keywords, group, skip): def sanitize_keyword(kw): return kw.lower() keywords_sanitized = [ sanitize_keyword(kw) for keyword in keywords for kw in self._keyword_separators_regex.split(keyword) ] if group: keywords_sanitized.append(sanitize_keyword(group)) # The docs say keywords are separated by "/", but then we don't find some releases. # Separating by "_" seems to do the trick. keywords_sanitized = ['_'.join(keywords_sanitized)] # Get most recent results keywords_sanitized.append('order:date-desc') # Skip over `skip` results assert (isinstance(skip, int) and 0 <= skip <= self._max_skip), skip keywords_sanitized.append(f'skipr:{skip}.{self._page_size}') return '/'.join(keywords_sanitized) async def _release_files(self, release_name): """ Map file names to dictionaries with the keys ``release_name``, ``file_name``, ``size`` and ``crc`` If no files for `release_name` are found, return an empty :class:`dict`. :param str release_name: Exact name of the release :raise RequestError: if request fails """ details_url = f'{self._details_url}/{release_name}' _log.debug('Details URL: %s', details_url) response = await utils.http.get(details_url, cache=True) # Response may be empty string if response: # Response may be empty list info = response.json() if info: # If info is not an empty list, it should be a dictionary files = info['archived-files'] release_name = info['name'] return { f['name']: { 'release_name': release_name, 'file_name': f['name'], 'size': f['size'], 'crc': f['crc'], } for f in sorted(files, key=lambda f: f['name'].casefold()) } return {}