Source code for upsies.trackers.sc.jobs

"""
Concrete :class:`~.TrackerJobsBase` subclass for SC
"""

import functools
import re

from ... import errors, jobs, uis, utils
from ..base import TrackerJobsBase
from . import metadata

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class ScTrackerJobs(TrackerJobsBase): @functools.cached_property def jobs_before_upload(self): return ( self.login_job, # Interactive jobs self.playlists_job, self.imdb_job, self.tmdb_job, self.group_id_job, self.title_original_job, self.title_english_job, self.year_job, self.medium_job, self.countries_job, self.languages_job, self.runtime_job, self.tags_job, self.artists_job, self.trailer_job, self.plot_job, self.scene_check_job, # Background jobs self.create_torrent_job, self.mediainfo_job, self.bdinfo_job, self.screenshots_job, self.upload_screenshots_job, self.poster_job, self.description_job, self.edition_job, self.rules_job, self.confirm_submission_job, ) @property def isolated_jobs(self): if self.options.get('only_description', False): return self.get_job_and_dependencies(self.description_job) else: # Activate all jobs return () @functools.cached_property def imdb_job(self): imdb_job = super().imdb_job imdb_job.no_id_ok = True return imdb_job @functools.cached_property def tmdb_job(self): tmdb_job = super().tmdb_job tmdb_job.no_id_ok = True tmdb_job.prejobs += (self.imdb_job,) tmdb_job.precondition = self.make_precondition('tmdb_job', precondition=self.no_imdb_id_available) return tmdb_job def no_imdb_id_available(self): return not bool(self.imdb_id) @functools.cached_property def group_id_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('group_id'), label='Group ID', precondition=self.make_precondition('group_id_job'), prejobs=self.get_job_and_dependencies( self.imdb_job, self.login_job, ), text=self.autodetect_group_id, warn_exceptions=( errors.RequestError, ), normalizer=self.normalize_group_id, validator=self.validate_group_id, finish_on_success=True, # We must ignore the cache because a group ID can be created or maybe even change. **self.common_job_args(ignore_cache=True), ) async def autodetect_group_id(self): if self.imdb_id: if group_id := await self.tracker.get_group_id(imdb_id=self.imdb_id): # Successfully translated IMDb ID to SC group ID. return group_id else: # SC does not have a group for this IMDb ID. return '' else: # Without an IMDb ID, we can't autodetect the SC group ID, but we need it so we don't # create a duplicate group. Returning `None` indicates that the user is prompted. self.group_id_job.info = 'Enter group ID or nothing if this movie does not exist on SC.' return None def normalize_group_id(self, text): return text.strip() def validate_group_id(self, text): if text: try: group_id = int(text) except ValueError as e: raise ValueError('Group ID is not a number.') from e else: if group_id < 0: raise ValueError('Invalid group ID.') @property def group_id(self): # Must return `None` if group_id_job is not finished or group ID is an empty string. if group_id := self.get_job_output(self.group_id_job, slice=0, default=None): return group_id
[docs] async def get_info(self, name, *, empty_ok=False): """ Get metadata from SC, IMDb or TMDb If no information is found, return `None`. This means the user has to enter something manually. :param str method_name: Name of a :class:`~.WebDbApiBase` method :param bool empty_ok: Whether an empty string/sequence is a valid value For example, the English title should be empty for native English movies, but the original title must not be empty. """ await self.login_job.wait_finished() await self.group_id_job.wait_finished() info_getters = [] if self.group_id: info_getters.append((self.tracker.get_sc_info, self.group_id, name)) info_getters.append((self._get_webdb_info, name)) for info_getter, *args in info_getters: if (value := await info_getter(*args)) or empty_ok: _log.debug('Got %s from %r: %r', name, info_getter, value) # An empty string/list is not always valid (e.g. original title: not ok, English title: ok). return value # Return `None` to indicate the user has to enter something manually. return None
async def _get_webdb_info(self, method_name): for job, get_id, webdb in ( (self.imdb_job, lambda: self.imdb_id, self.imdb), (self.tmdb_job, lambda: self.tmdb_id, self.tmdb), ): await job.wait_finished() if id := get_id(): method = getattr(webdb, method_name) try: return await method(id) except errors.RequestError as e: _log.debug('Failed to get info from %s: %r: %r', webdb.name, method_name, e) @functools.cached_property def title_original_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('title_original'), label='Original Title', precondition=self.make_precondition('title_original_job'), text=self.autodetect_title_original, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_title_original, validator=self.validate_title_original, default=self.release_name.title, finish_on_success=True, **self.common_job_args(), ) async def autodetect_title_original(self): return await self.get_info('title_original', empty_ok=False) def normalize_title_original(self, text): return text.strip() def validate_title_original(self, text): if not text: raise ValueError('Original Title must not be empty.') @functools.cached_property def title_english_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('title_english'), label='English Title', precondition=self.make_precondition('title_english_job'), text=self.autodetect_title_english, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_title_english, default=self.release_name.title_aka, finish_on_success=True, **self.common_job_args(), ) async def autodetect_title_english(self): return await self.get_info('title_english', empty_ok=True) def normalize_title_english(self, text): return text.strip() @functools.cached_property def year_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('year'), label='Year', precondition=self.make_precondition('year_job'), text=self.autodetect_year, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_year, validator=self.validate_year, default=self.release_name.year, finish_on_success=True, **self.common_job_args(), ) async def autodetect_year(self): return await self.get_info('year', empty_ok=False) def normalize_year(self, text): return text.strip() def validate_year(self, text): if not text: raise ValueError('Year must not be empty.') else: utils.types.ReleaseYear(text) @functools.cached_property def medium_job(self): return jobs.dialog.ChoiceJob( name=self.get_job_name('medium'), label='Medium', precondition=self.make_precondition('medium_job'), autodetect=self.autodetect_medium, autofinish=True, options=tuple(metadata.media), **self.common_job_args(), ) async def autodetect_medium(self, job): for medium, is_medium in metadata.media.items(): if is_medium(self.release_name): return medium @functools.cached_property def countries_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('countries'), label='Countries', precondition=self.make_precondition('countries_job'), text=self.autodetect_countries, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_countries, default=self.release_name.country, finish_on_success=True, **self.common_job_args(), ) async def autodetect_countries(self): if countries := await self.get_info('countries', empty_ok=False): return ', '.join(countries) def normalize_countries(self, text): return utils.string.normalize_comma_separated_list(text, lower=False, sort=False) @functools.cached_property def languages_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('languages'), label='Languages', precondition=self.make_precondition('languages_job'), text=self.autodetect_languages, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_languages, finish_on_success=True, **self.common_job_args(), ) async def autodetect_languages(self): if languages := await self.get_info('languages', empty_ok=False): return ', '.join(languages) def normalize_languages(self, text): return utils.string.normalize_comma_separated_list(text, lower=False, sort=False) @functools.cached_property def runtime_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('runtime'), label='Runtime', precondition=self.make_precondition('runtime_job'), text=self.autodetect_runtime, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_runtime, validator=self.validate_runtime, finish_on_success=True, **self.common_job_args(), ) async def autodetect_runtime(self): # Runtime of the "main" cut, not the Director's Cut or whatever. if ( (runtimes := await self.get_info('runtimes', empty_ok=False)) and 'default' in runtimes ): return str(runtimes['default']) def normalize_runtime(self, text): return text.strip() def validate_runtime(self, text): if not text: raise ValueError('Runtime must not be empty.') try: runtime = int(text) except ValueError as e: raise ValueError('Runtime is not a number.') from e else: if runtime < 0: raise ValueError('Runtime is not reasonable.') @functools.cached_property def tags_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('tags'), label='Tags', precondition=self.make_precondition('tags_job'), text=self.autodetect_tags, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), normalizer=self.normalize_tags, validator=self.validate_tags, finish_on_success=True, **self.common_job_args(), ) async def autodetect_tags(self): if genres := await self.get_info('genres', empty_ok=False): # Only autodetect tags that are listed in the upload form. return ', '.join( genre for genre in genres if genre in metadata.tags ) def normalize_tags(self, text): return utils.string.normalize_comma_separated_list(text, lower=True, sort=False) def validate_tags(self, text): if not text: raise ValueError('You must provide at least one tag.') for tag in (tag.strip() for tag in text.split(',')): if tag not in metadata.tags: raise ValueError(f'Invalid tag: {tag}') @functools.cached_property def plot_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('plot'), label='Plot', precondition=self.make_precondition('plot_job'), prejobs=self.get_job_and_dependencies( self.imdb_job, ), text=self.autodetect_plot, warn_exceptions=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), finish_on_success=True, read_only=True, **self.common_job_args(), ) async def autodetect_plot(self): return await self.get_info('summary', empty_ok=False) @functools.cached_property def poster_job(self): job = super().poster_job job.prejobs += self.get_job_and_dependencies(self.imdb_job, self.tmdb_job) return job
[docs] async def get_poster_from_tracker(self): await self.group_id_job.wait_finished() if self.group_id: info = await self.tracker.get_sc_info(self.group_id) if info.get('poster_url'): poster = { 'poster': info['poster_url'], 'width': None, 'height': None, 'imagehosts': (), 'write_to': None, } _log.debug('Poster from SC: %r', poster) return poster
@functools.cached_property def artists_job(self): return jobs.custom.CustomJob( name=self.get_job_name('artists'), label='Artists', precondition=self.make_precondition('artists_job'), prejobs=self.get_job_and_dependencies(self.imdb_job), worker=self.get_artists, catch=( errors.RequestError, # From get_sc_info() or _get_webdb_info(). ), **self.common_job_args(), ) async def get_artists(self, job): await self._autodetect_artists() # If we couldn't find any artists, ask the user to enter some manually. if not job.output: await self._get_artists_from_user() def _add_artist(self, artist): self.artists_job.add_output(f'{artist["importance"]}: {artist["name"]}') async def _autodetect_artists(self): limit = 10 for importance, method_name in ( (metadata.ArtistImportance.DIRECTOR, 'directors'), (metadata.ArtistImportance.ACTOR, 'cast'), ): if names := await self.get_info(method_name, empty_ok=True): for name in names[:limit]: self._add_artist({'importance': importance, 'name': name}) async def _get_artists_from_user(self): # Loop over artist prompts until the user stops the loop. while True: if artist := await self._get_one_artist_from_user(): self._add_artist(artist) else: break async def _get_one_artist_from_user(self): # Keep asking for the name or other ID until we get a valid one. if ( (importance := await self._get_artist_importance()) and (name := await self._get_artist_name()) ): return {'importance': importance, 'name': name} async def _get_artist_importance(self): _label, importance = await self.artists_job.add_prompt(uis.prompts.RadioListPrompt( options=( *( (f'Add {str(importance).lower()}', importance) for importance in metadata.ArtistImportance ), ('Stop adding artists', None), ), )) return importance async def _get_artist_name(self): return (await self.artists_job.add_prompt(uis.prompts.TextPrompt( question='Enter name or nothing to stop adding artists:', ))).strip() @property def _post_data_artists(self): # Sequence of artist names. artists = [] # Sequence of artist type (actor, director, etc).. importances = [] line_regex = re.compile( '^' r'(?P<importance>.+?):\s*' r'(?P<name>.+)' '$' ) lines = self.artists_job.output for line in lines: if match := line_regex.search(line): name = match.group('name') importance = metadata.ArtistImportance.from_string(match.group('importance')) artists.append(name) importances.append(importance.value) else: raise RuntimeError(f'Unexpected line: {line!r}') # Sanity check: Make sure all lists have the same length. artists_count = len(lines) for lst in (artists, importances): assert len(lst) == artists_count, (lst, artists_count) return { 'artists[]': artists, 'importance[]': importances, } @functools.cached_property def trailer_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('trailer'), label='Trailer', precondition=self.make_precondition('trailer_job'), prejobs=self.get_job_and_dependencies( self.group_id_job, ), text=self.autodetect_trailer, warn_exceptions=( errors.RequestError, # From get_sc_info(). ), finish_on_success=True, **self.common_job_args(ignore_cache=True), ) async def autodetect_trailer(self): # Get trailer from user option (e.g. --trailer). if trailer_url := self.options.get('trailer'): try: trailer_service, trailer_id = self._get_trailer_info(trailer_url) _log.debug('Trailer from user: %r', (trailer_service, trailer_id)) except errors.ValueError as e: # If user provided an invalid trailer, allow user to fix it. self.trailer_job.set_text(trailer_url) self.trailer_job.warn(e) return None else: if trailer_service and trailer_id: return f'{trailer_service}/{trailer_id}' # Get trailer from SC. assert self.group_id_job.is_finished sc_info = await self.tracker.get_sc_info(self.group_id) if ( (trailer_service := sc_info.get('trailer_service')) and (trailer_id := sc_info.get('trailer_id')) ): _log.debug('Trailer from SC group: %r', (trailer_service, trailer_id)) return f'{trailer_service}/{trailer_id}' # Do not prompt the user to provide a trailer. return '' def _get_trailer_info(self, string): if vimeo_id := utils.string.get_vimeo_id(string): return 'Vimeo', vimeo_id elif youtube_id := utils.string.get_youtube_id(string): return 'YouTube', youtube_id elif string: raise errors.ValueError('Trailer must be URL or "vimeo/<ID>" or "youtube/<ID>".') else: return None, None @functools.cached_property def _post_data_trailer(self): assert self.trailer_job.is_finished if output := self.get_job_output(self.trailer_job, slice=0): trailer_service, trailer_id = self._get_trailer_info(output) else: trailer_service = trailer_id = None return { 'embedded_content[Provider]': trailer_service, 'embedded_content[Id]': trailer_id, } @functools.cached_property def edition_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('edition'), label='Edition', precondition=self.make_precondition('edition_job'), text=self.autodetect_edition, finish_on_success=True, read_only=True, # Don't cache job output because the number of screenshots can be changed by the user # between runs. **self.common_job_args(ignore_cache=True), ) def autodetect_edition(self): edition = {} if edition_string := self.options.get('edition'): edition.update(self._get_edition_info(edition_string)) for name in ('year', 'title', 'distributor', 'catalogue_number'): if value := self.options.get(f'edition_{name}'): edition[name] = value if edition: return '\n'.join( f'{name.capitalize().replace("_", " ")}: {value}' for name, value in edition.items() ) else: # Do not prompt the user for edition information. return '' def _get_edition_info(self, string): parts = string.split(',', maxsplit=3) return { name: (parts.pop(0).strip() if parts else '') for name in ('year', 'title', 'distributor', 'catalogue_number') } @functools.cached_property def _post_data_edition(self): assert self.edition_job.is_finished if output := self.get_job_output(self.edition_job, slice=0): edition = { parts[0]: parts[1].strip() or None for line in output.split('\n') if (parts := line.split(':', maxsplit=1)) } return { 'remaster': 'on', 'remaster_year': edition.get('Year'), 'remaster_title': edition.get('Title'), 'remaster_record_label': edition.get('Distributor'), 'remaster_catalogue_number': edition.get('Catalogue number'), } else: return {} @functools.cached_property def description_job(self): return jobs.dialog.TextFieldJob( name=self.get_job_name('description'), label='Description', precondition=self.make_precondition('description_job'), prejobs=self.get_job_and_dependencies( self.playlists_job, self.mediainfo_job, self.bdinfo_job, self.screenshots_job, self.upload_screenshots_job, ), text=self.generate_description, finish_on_success=True, read_only=True, hidden=True, # Don't cache job output because the number of screenshots can be changed by the user # between runs. **self.common_job_args(ignore_cache=True), ) document_all_videos = False mediainfo_required_for_bdmv = False async def generate_description(self): def get_mediainfo_title(mediainfo): if re.search(r'^Complete name\s*:\s.*\.IFO$', mediainfo, flags=re.MULTILINE): return 'MediaInfo (IFO)' elif re.search(r'^Complete name\s*:\s.*\.VOB$', mediainfo, flags=re.MULTILINE): return 'MediaInfo (VOB)' else: return 'MediaInfo' sections = [] section = [] for info in self.video_info.values(): if info['screenshot_urls']: section.append('\n\n'.join( f'[img]{screenshot_url}[/img]' for screenshot_url in info['screenshot_urls'] )) section.extend( f'[hide={get_mediainfo_title(mediainfo)}][plain]{mediainfo.strip()}[/plain][/hide]' for mediainfo in info['mediainfos'] ) section.extend( f'[hide=BDInfo][plain]{bdinfo.quick_summary.strip()}[/plain][/hide]' for bdinfo in info['bdinfos'] ) sections.append('\n\n'.join(section)) section.clear() description = '\n\n'.join(sections) return ( description + (f'\n{promo}' if (promo := self.generate_promotion_bbcode()) else '') ) @property def post_data(self): return { 'submit': 'true', 'groupid': self.group_id, # Will be removed if it is `None`. 'type': metadata.types['Movies'], 'releasetype': '21', # Not sure what this means, but it seems hardcoded. 'catalogue_number': self.imdb_job.selected.get('id', None), 'title': self.get_job_output(self.title_original_job, slice=0), 'alternate_title': self.get_job_output(self.title_english_job, slice=0), 'year': self.get_job_output(self.year_job, slice=0), 'country': self.get_job_output(self.countries_job, slice=0), 'language': self.get_job_output(self.languages_job, slice=0), 'runtime': self.get_job_output(self.runtime_job, slice=0), 'media': self.get_job_attribute(self.medium_job, 'choice'), 'tags': self.get_job_output(self.tags_job, slice=0), 'image': self.get_job_output(self.poster_job, slice=0), 'release_desc': self.get_job_output(self.description_job, slice=0), 'album_desc': self.get_job_output(self.plot_job, slice=0), **self._post_data_artists, **self._post_data_trailer, **self._post_data_edition, } @property def post_files(self): return { 'file_input': { 'file': self.torrent_filepath, 'mimetype': 'application/x-bittorrent', }, }