"""
Concrete :class:`~.TrackerJobsBase` subclass for SC
"""
import functools
import re
from ... import errors, jobs, uis, utils
from ..base import TrackerJobsBase
from . import metadata
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class ScTrackerJobs(TrackerJobsBase):
@functools.cached_property
def jobs_before_upload(self):
return (
self.login_job,
# Interactive jobs
self.playlists_job,
self.imdb_job,
self.tmdb_job,
self.group_id_job,
self.title_original_job,
self.title_english_job,
self.year_job,
self.medium_job,
self.countries_job,
self.languages_job,
self.runtime_job,
self.tags_job,
self.artists_job,
self.trailer_job,
self.plot_job,
self.scene_check_job,
# Background jobs
self.create_torrent_job,
self.mediainfo_job,
self.bdinfo_job,
self.screenshots_job,
self.upload_screenshots_job,
self.poster_job,
self.description_job,
self.edition_job,
self.rules_job,
self.confirm_submission_job,
)
@property
def isolated_jobs(self):
if self.options.get('only_description', False):
return self.get_job_and_dependencies(self.description_job)
else:
# Activate all jobs
return ()
@functools.cached_property
def imdb_job(self):
imdb_job = super().imdb_job
imdb_job.no_id_ok = True
return imdb_job
@functools.cached_property
def tmdb_job(self):
tmdb_job = super().tmdb_job
tmdb_job.no_id_ok = True
tmdb_job.prejobs += (self.imdb_job,)
tmdb_job.precondition = self.make_precondition('tmdb_job', precondition=self.no_imdb_id_available)
return tmdb_job
def no_imdb_id_available(self):
return not bool(self.imdb_id)
@functools.cached_property
def group_id_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('group_id'),
label='Group ID',
precondition=self.make_precondition('group_id_job'),
prejobs=self.get_job_and_dependencies(
self.imdb_job,
self.login_job,
),
text=self.autodetect_group_id,
warn_exceptions=(
errors.RequestError,
),
normalizer=self.normalize_group_id,
validator=self.validate_group_id,
finish_on_success=True,
# We must ignore the cache because a group ID can be created or maybe even change.
**self.common_job_args(ignore_cache=True),
)
async def autodetect_group_id(self):
if self.imdb_id:
if group_id := await self.tracker.get_group_id(imdb_id=self.imdb_id):
# Successfully translated IMDb ID to SC group ID.
return group_id
else:
# SC does not have a group for this IMDb ID.
return ''
else:
# Without an IMDb ID, we can't autodetect the SC group ID, but we need it so we don't
# create a duplicate group. Returning `None` indicates that the user is prompted.
self.group_id_job.info = 'Enter group ID or nothing if this movie does not exist on SC.'
return None
def normalize_group_id(self, text):
return text.strip()
def validate_group_id(self, text):
if text:
try:
group_id = int(text)
except ValueError as e:
raise ValueError('Group ID is not a number.') from e
else:
if group_id < 0:
raise ValueError('Invalid group ID.')
@property
def group_id(self):
# Must return `None` if group_id_job is not finished or group ID is an empty string.
if group_id := self.get_job_output(self.group_id_job, slice=0, default=None):
return group_id
[docs]
async def get_info(self, name, *, empty_ok=False):
"""
Get metadata from SC, IMDb or TMDb
If no information is found, return `None`. This means the user has to enter something
manually.
:param str method_name: Name of a :class:`~.WebDbApiBase` method
:param bool empty_ok: Whether an empty string/sequence is a valid value
For example, the English title should be empty for native English movies, but the
original title must not be empty.
"""
await self.login_job.wait_finished()
await self.group_id_job.wait_finished()
info_getters = []
if self.group_id:
info_getters.append((self.tracker.get_sc_info, self.group_id, name))
info_getters.append((self._get_webdb_info, name))
for info_getter, *args in info_getters:
if (value := await info_getter(*args)) or empty_ok:
_log.debug('Got %s from %r: %r', name, info_getter, value)
# An empty string/list is not always valid (e.g. original title: not ok, English title: ok).
return value
# Return `None` to indicate the user has to enter something manually.
return None
async def _get_webdb_info(self, method_name):
for job, get_id, webdb in (
(self.imdb_job, lambda: self.imdb_id, self.imdb),
(self.tmdb_job, lambda: self.tmdb_id, self.tmdb),
):
await job.wait_finished()
if id := get_id():
method = getattr(webdb, method_name)
try:
return await method(id)
except errors.RequestError as e:
_log.debug('Failed to get info from %s: %r: %r', webdb.name, method_name, e)
@functools.cached_property
def title_original_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('title_original'),
label='Original Title',
precondition=self.make_precondition('title_original_job'),
text=self.autodetect_title_original,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_title_original,
validator=self.validate_title_original,
default=self.release_name.title,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_title_original(self):
return await self.get_info('title_original', empty_ok=False)
def normalize_title_original(self, text):
return text.strip()
def validate_title_original(self, text):
if not text:
raise ValueError('Original Title must not be empty.')
@functools.cached_property
def title_english_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('title_english'),
label='English Title',
precondition=self.make_precondition('title_english_job'),
text=self.autodetect_title_english,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_title_english,
default=self.release_name.title_aka,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_title_english(self):
return await self.get_info('title_english', empty_ok=True)
def normalize_title_english(self, text):
return text.strip()
@functools.cached_property
def year_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('year'),
label='Year',
precondition=self.make_precondition('year_job'),
text=self.autodetect_year,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_year,
validator=self.validate_year,
default=self.release_name.year,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_year(self):
return await self.get_info('year', empty_ok=False)
def normalize_year(self, text):
return text.strip()
def validate_year(self, text):
if not text:
raise ValueError('Year must not be empty.')
else:
utils.types.ReleaseYear(text)
@functools.cached_property
def medium_job(self):
return jobs.dialog.ChoiceJob(
name=self.get_job_name('medium'),
label='Medium',
precondition=self.make_precondition('medium_job'),
autodetect=self.autodetect_medium,
autofinish=True,
options=tuple(metadata.media),
**self.common_job_args(),
)
async def autodetect_medium(self, job):
for medium, is_medium in metadata.media.items():
if is_medium(self.release_name):
return medium
@functools.cached_property
def countries_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('countries'),
label='Countries',
precondition=self.make_precondition('countries_job'),
text=self.autodetect_countries,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_countries,
default=self.release_name.country,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_countries(self):
if countries := await self.get_info('countries', empty_ok=False):
return ', '.join(countries)
def normalize_countries(self, text):
return utils.string.normalize_comma_separated_list(text, lower=False, sort=False)
@functools.cached_property
def languages_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('languages'),
label='Languages',
precondition=self.make_precondition('languages_job'),
text=self.autodetect_languages,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_languages,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_languages(self):
if languages := await self.get_info('languages', empty_ok=False):
return ', '.join(languages)
def normalize_languages(self, text):
return utils.string.normalize_comma_separated_list(text, lower=False, sort=False)
@functools.cached_property
def runtime_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('runtime'),
label='Runtime',
precondition=self.make_precondition('runtime_job'),
text=self.autodetect_runtime,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_runtime,
validator=self.validate_runtime,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_runtime(self):
# Runtime of the "main" cut, not the Director's Cut or whatever.
if (
(runtimes := await self.get_info('runtimes', empty_ok=False))
and 'default' in runtimes
):
return str(runtimes['default'])
def normalize_runtime(self, text):
return text.strip()
def validate_runtime(self, text):
if not text:
raise ValueError('Runtime must not be empty.')
try:
runtime = int(text)
except ValueError as e:
raise ValueError('Runtime is not a number.') from e
else:
if runtime < 0:
raise ValueError('Runtime is not reasonable.')
@functools.cached_property
def tags_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('tags'),
label='Tags',
precondition=self.make_precondition('tags_job'),
text=self.autodetect_tags,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
normalizer=self.normalize_tags,
validator=self.validate_tags,
finish_on_success=True,
**self.common_job_args(),
)
async def autodetect_tags(self):
if genres := await self.get_info('genres', empty_ok=False):
# Only autodetect tags that are listed in the upload form.
return ', '.join(
genre
for genre in genres
if genre in metadata.tags
)
def normalize_tags(self, text):
return utils.string.normalize_comma_separated_list(text, lower=True, sort=False)
def validate_tags(self, text):
if not text:
raise ValueError('You must provide at least one tag.')
for tag in (tag.strip() for tag in text.split(',')):
if tag not in metadata.tags:
raise ValueError(f'Invalid tag: {tag}')
@functools.cached_property
def plot_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('plot'),
label='Plot',
precondition=self.make_precondition('plot_job'),
prejobs=self.get_job_and_dependencies(
self.imdb_job,
),
text=self.autodetect_plot,
warn_exceptions=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
finish_on_success=True,
read_only=True,
**self.common_job_args(),
)
async def autodetect_plot(self):
return await self.get_info('summary', empty_ok=False)
@functools.cached_property
def poster_job(self):
job = super().poster_job
job.prejobs += self.get_job_and_dependencies(self.imdb_job, self.tmdb_job)
return job
[docs]
async def get_poster_from_tracker(self):
await self.group_id_job.wait_finished()
if self.group_id:
info = await self.tracker.get_sc_info(self.group_id)
if info.get('poster_url'):
poster = {
'poster': info['poster_url'],
'width': None,
'height': None,
'imagehosts': (),
'write_to': None,
}
_log.debug('Poster from SC: %r', poster)
return poster
@functools.cached_property
def artists_job(self):
return jobs.custom.CustomJob(
name=self.get_job_name('artists'),
label='Artists',
precondition=self.make_precondition('artists_job'),
prejobs=self.get_job_and_dependencies(self.imdb_job),
worker=self.get_artists,
catch=(
errors.RequestError, # From get_sc_info() or _get_webdb_info().
),
**self.common_job_args(),
)
async def get_artists(self, job):
await self._autodetect_artists()
# If we couldn't find any artists, ask the user to enter some manually.
if not job.output:
await self._get_artists_from_user()
def _add_artist(self, artist):
self.artists_job.add_output(f'{artist["importance"]}: {artist["name"]}')
async def _autodetect_artists(self):
limit = 10
for importance, method_name in (
(metadata.ArtistImportance.DIRECTOR, 'directors'),
(metadata.ArtistImportance.ACTOR, 'cast'),
):
if names := await self.get_info(method_name, empty_ok=True):
for name in names[:limit]:
self._add_artist({'importance': importance, 'name': name})
async def _get_artists_from_user(self):
# Loop over artist prompts until the user stops the loop.
while True:
if artist := await self._get_one_artist_from_user():
self._add_artist(artist)
else:
break
async def _get_one_artist_from_user(self):
# Keep asking for the name or other ID until we get a valid one.
if (
(importance := await self._get_artist_importance())
and (name := await self._get_artist_name())
):
return {'importance': importance, 'name': name}
async def _get_artist_importance(self):
_label, importance = await self.artists_job.add_prompt(uis.prompts.RadioListPrompt(
options=(
*(
(f'Add {str(importance).lower()}', importance)
for importance in metadata.ArtistImportance
),
('Stop adding artists', None),
),
))
return importance
async def _get_artist_name(self):
return (await self.artists_job.add_prompt(uis.prompts.TextPrompt(
question='Enter name or nothing to stop adding artists:',
))).strip()
@property
def _post_data_artists(self):
# Sequence of artist names.
artists = []
# Sequence of artist type (actor, director, etc)..
importances = []
line_regex = re.compile(
'^'
r'(?P<importance>.+?):\s*'
r'(?P<name>.+)'
'$'
)
lines = self.artists_job.output
for line in lines:
if match := line_regex.search(line):
name = match.group('name')
importance = metadata.ArtistImportance.from_string(match.group('importance'))
artists.append(name)
importances.append(importance.value)
else:
raise RuntimeError(f'Unexpected line: {line!r}')
# Sanity check: Make sure all lists have the same length.
artists_count = len(lines)
for lst in (artists, importances):
assert len(lst) == artists_count, (lst, artists_count)
return {
'artists[]': artists,
'importance[]': importances,
}
@functools.cached_property
def trailer_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('trailer'),
label='Trailer',
precondition=self.make_precondition('trailer_job'),
prejobs=self.get_job_and_dependencies(
self.group_id_job,
),
text=self.autodetect_trailer,
warn_exceptions=(
errors.RequestError, # From get_sc_info().
),
finish_on_success=True,
**self.common_job_args(ignore_cache=True),
)
async def autodetect_trailer(self):
# Get trailer from user option (e.g. --trailer).
if trailer_url := self.options.get('trailer'):
try:
trailer_service, trailer_id = self._get_trailer_info(trailer_url)
_log.debug('Trailer from user: %r', (trailer_service, trailer_id))
except errors.ValueError as e:
# If user provided an invalid trailer, allow user to fix it.
self.trailer_job.set_text(trailer_url)
self.trailer_job.warn(e)
return None
else:
if trailer_service and trailer_id:
return f'{trailer_service}/{trailer_id}'
# Get trailer from SC.
assert self.group_id_job.is_finished
sc_info = await self.tracker.get_sc_info(self.group_id)
if (
(trailer_service := sc_info.get('trailer_service'))
and (trailer_id := sc_info.get('trailer_id'))
):
_log.debug('Trailer from SC group: %r', (trailer_service, trailer_id))
return f'{trailer_service}/{trailer_id}'
# Do not prompt the user to provide a trailer.
return ''
def _get_trailer_info(self, string):
if vimeo_id := utils.string.get_vimeo_id(string):
return 'Vimeo', vimeo_id
elif youtube_id := utils.string.get_youtube_id(string):
return 'YouTube', youtube_id
elif string:
raise errors.ValueError('Trailer must be URL or "vimeo/<ID>" or "youtube/<ID>".')
else:
return None, None
@functools.cached_property
def _post_data_trailer(self):
assert self.trailer_job.is_finished
if output := self.get_job_output(self.trailer_job, slice=0):
trailer_service, trailer_id = self._get_trailer_info(output)
else:
trailer_service = trailer_id = None
return {
'embedded_content[Provider]': trailer_service,
'embedded_content[Id]': trailer_id,
}
@functools.cached_property
def edition_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('edition'),
label='Edition',
precondition=self.make_precondition('edition_job'),
text=self.autodetect_edition,
finish_on_success=True,
read_only=True,
# Don't cache job output because the number of screenshots can be changed by the user
# between runs.
**self.common_job_args(ignore_cache=True),
)
def autodetect_edition(self):
edition = {}
if edition_string := self.options.get('edition'):
edition.update(self._get_edition_info(edition_string))
for name in ('year', 'title', 'distributor', 'catalogue_number'):
if value := self.options.get(f'edition_{name}'):
edition[name] = value
if edition:
return '\n'.join(
f'{name.capitalize().replace("_", " ")}: {value}'
for name, value in edition.items()
)
else:
# Do not prompt the user for edition information.
return ''
def _get_edition_info(self, string):
parts = string.split(',', maxsplit=3)
return {
name: (parts.pop(0).strip() if parts else '')
for name in ('year', 'title', 'distributor', 'catalogue_number')
}
@functools.cached_property
def _post_data_edition(self):
assert self.edition_job.is_finished
if output := self.get_job_output(self.edition_job, slice=0):
edition = {
parts[0]: parts[1].strip() or None
for line in output.split('\n')
if (parts := line.split(':', maxsplit=1))
}
return {
'remaster': 'on',
'remaster_year': edition.get('Year'),
'remaster_title': edition.get('Title'),
'remaster_record_label': edition.get('Distributor'),
'remaster_catalogue_number': edition.get('Catalogue number'),
}
else:
return {}
@functools.cached_property
def description_job(self):
return jobs.dialog.TextFieldJob(
name=self.get_job_name('description'),
label='Description',
precondition=self.make_precondition('description_job'),
prejobs=self.get_job_and_dependencies(
self.playlists_job,
self.mediainfo_job,
self.bdinfo_job,
self.screenshots_job,
self.upload_screenshots_job,
),
text=self.generate_description,
finish_on_success=True,
read_only=True,
hidden=True,
# Don't cache job output because the number of screenshots can be changed by the user
# between runs.
**self.common_job_args(ignore_cache=True),
)
document_all_videos = False
mediainfo_required_for_bdmv = False
async def generate_description(self):
def get_mediainfo_title(mediainfo):
if re.search(r'^Complete name\s*:\s.*\.IFO$', mediainfo, flags=re.MULTILINE):
return 'MediaInfo (IFO)'
elif re.search(r'^Complete name\s*:\s.*\.VOB$', mediainfo, flags=re.MULTILINE):
return 'MediaInfo (VOB)'
else:
return 'MediaInfo'
sections = []
section = []
for info in self.video_info.values():
if info['screenshot_urls']:
section.append('\n\n'.join(
f'[img]{screenshot_url}[/img]'
for screenshot_url in info['screenshot_urls']
))
section.extend(
f'[hide={get_mediainfo_title(mediainfo)}][plain]{mediainfo.strip()}[/plain][/hide]'
for mediainfo in info['mediainfos']
)
section.extend(
f'[hide=BDInfo][plain]{bdinfo.quick_summary.strip()}[/plain][/hide]'
for bdinfo in info['bdinfos']
)
sections.append('\n\n'.join(section))
section.clear()
description = '\n\n'.join(sections)
return (
description
+ (f'\n{promo}' if (promo := self.generate_promotion_bbcode()) else '')
)
@property
def post_data(self):
return {
'submit': 'true',
'groupid': self.group_id, # Will be removed if it is `None`.
'type': metadata.types['Movies'],
'releasetype': '21', # Not sure what this means, but it seems hardcoded.
'catalogue_number': self.imdb_job.selected.get('id', None),
'title': self.get_job_output(self.title_original_job, slice=0),
'alternate_title': self.get_job_output(self.title_english_job, slice=0),
'year': self.get_job_output(self.year_job, slice=0),
'country': self.get_job_output(self.countries_job, slice=0),
'language': self.get_job_output(self.languages_job, slice=0),
'runtime': self.get_job_output(self.runtime_job, slice=0),
'media': self.get_job_attribute(self.medium_job, 'choice'),
'tags': self.get_job_output(self.tags_job, slice=0),
'image': self.get_job_output(self.poster_job, slice=0),
'release_desc': self.get_job_output(self.description_job, slice=0),
'album_desc': self.get_job_output(self.plot_job, slice=0),
**self._post_data_artists,
**self._post_data_trailer,
**self._post_data_edition,
}
@property
def post_files(self):
return {
'file_input': {
'file': self.torrent_filepath,
'mimetype': 'application/x-bittorrent',
},
}