"""
Concrete :class:`~.TrackerBase` subclass for SC
"""
import re
import urllib
import async_lru
from ... import __project_name__, errors, utils
from ..base import TrackerBase
from . import config, metadata, rules
from .jobs import ScTrackerJobs
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class ScTracker(TrackerBase):
name = 'sc'
label = 'SC'
torrent_source_field = 'SC'
setup_howto_template = (
'{howto.introduction}\n'
'\n'
'{howto.next_section}. Login Credentials\n'
'\n'
' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.username USERNAME\n'
' {howto.current_section}.2 $ upsies set trackers.{tracker.name}.password PASSWORD\n'
' {howto.current_section}.3 Store the login session cookie. (optional)\n'
' $ upsies set trackers.{tracker.name}.cookies_filepath "~/.cache/upsies/{tracker.name}.cookies"\n'
' WARNING: Anyone with acces to that file has full control over your {tracker.label} account.\n'
'\n'
'{howto.next_section}. Announce URL (Optional)\n'
'\n'
' The announce URL is fetched from the website on demand, but you can\n'
' also configure it explicitly.\n'
'\n'
' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.announce_url ANNOUNCE_URL\n'
'\n'
'{howto.screenshots}\n'
'\n'
'{howto.autoseed}\n'
'\n'
'{howto.reuse_torrents}\n'
'\n'
'{howto.upload}\n'
)
TrackerJobs = ScTrackerJobs
TrackerConfig = config.ScTrackerConfig
cli_arguments = config.cli_arguments
rules = rules
@property
def _base_url(self):
return self.options['base_url']
@property
def _login_url(self):
return urllib.parse.urljoin(self._base_url, '/login.php')
@property
def _logout_url(self):
return urllib.parse.urljoin(self._base_url, '/logout.php')
@property
def _upload_url(self):
return urllib.parse.urljoin(self._base_url, '/upload.php')
@property
def _torrents_url(self):
return urllib.parse.urljoin(self._base_url, '/torrents.php')
async def _request(self, method, *args, error_prefix='', **kwargs):
try:
# `method` is "GET" or "POST"
return await getattr(utils.http, method.lower())(
*args,
user_agent=True,
cookies=self.cookies_filepath,
follow_redirects=False,
**kwargs,
)
except errors.RequestError as e:
if error_prefix:
raise errors.RequestError(f'{error_prefix}: {e}') from e
else:
raise e
async def _login(self):
if not self.options.get('username'):
raise errors.RequestError('Login failed: No username configured')
elif not self.options.get('password'):
raise errors.RequestError('Login failed: No password configured')
post_data = {
'username': self.options['username'],
'password': self.options['password'],
# Tell the server to remember our user session if the user wants to store it.
'keeplogged': '1' if self.cookies_filepath else None,
'login': 'Log in',
}
response = await self._request(
'POST',
self._login_url,
cache=False,
data=post_data,
error_prefix='Login failed',
)
self._maybe_raise_login_error(response)
def _maybe_raise_login_error(self, html):
doc = utils.html.parse(html)
_log.debug('FINDING ERROR:\n%s', doc.prettify())
msg = None
# Find: "Your username or password was incorrect."
if form_tag := doc.find('form', action='login.php'):
# Remove actual <form> to leave us with only the error message.
form_tag.table.extract()
msg = utils.html.as_text(form_tag)
if msg:
raise errors.RequestError(f'Login failed: {msg}')
[docs]
async def confirm_logged_in(self):
response = await self._request('GET', self._base_url, cache=False)
doc = utils.html.parse(response)
auth_regex = re.compile(r'logout\.php\?.*\bauth=([0-9a-zA-Z]+)')
logout_link_tag = doc.find('a', href=auth_regex)
if logout_link_tag:
logout_link_href = logout_link_tag['href']
match = auth_regex.search(logout_link_href)
self._auth = match.group(1)
else:
raise errors.RequestError('Login failed for unknown reason')
async def _logout(self):
try:
await self._request(
method='GET',
url=self._logout_url,
cache=False,
params={'auth': self._auth},
error_prefix='Logout failed',
)
finally:
delattr(self, '_auth')
[docs]
async def get_announce_url(self):
if self.options.get('announce_url'):
_log.debug('Getting announce URL from config file: %r', self.options['announce_url'].get_secret_value())
return self.options['announce_url'].get_secret_value()
elif not self.is_logged_in:
raise RuntimeError('Cannot get announce URL from website if not logged in')
else:
_log.debug('Getting announce URL from upload.php: %r', self._upload_url)
response = await self._request(
method='GET',
url=self._upload_url,
cache=False,
)
doc = utils.html.parse(response)
announce_url_tag = doc.find('input', value=re.compile(r'^https?://.*/announce\b'))
if announce_url_tag:
return announce_url_tag['value']
else:
cmd = f'{__project_name__} set trackers.{self.name}.announce_url YOUR_URL'
raise errors.RequestError(f'Failed to find announce URL - set it manually: {cmd}')
async def get_group_id(self, *, imdb_id):
assert self.is_logged_in, 'Not logged in'
# Find group via IMDb ID.
response = await self._request(
method='GET',
url=self._torrents_url,
params={'cataloguenumber': imdb_id},
# Cache must be disabled because group can be created at any time.
cache=False,
)
# Find group ID in search results.
doc = utils.html.parse(response)
if torrent_card_tag := doc.find('div', class_='torrent_card'):
torrent_link_regex = re.compile(r'torrents\.php\?.*?id=(\d+)')
if a_tag := torrent_card_tag.find('a', href=torrent_link_regex):
group_id = torrent_link_regex.search(a_tag['href']).group(1)
_log.debug('Group ID for %s: %r', imdb_id, group_id)
return group_id
_log.debug('No group ID for %s', imdb_id)
@async_lru.alru_cache
async def get_sc_info(self, group_id, name=None):
assert self.is_logged_in, 'Not logged in'
_log.debug('Getting SC %s for group: %r', name or 'info', group_id)
response = await self._request(
method='GET',
url=self._upload_url,
params={'groupid': group_id},
cache=True,
)
doc = utils.html.parse(response)
if upload_form_tag := doc.find('form', id="upload_table"):
def get_value(tag_name, **identifiers):
if tag := upload_form_tag.find(tag_name, identifiers):
return utils.html.as_text(tag.get('value', ''))
return ''
def get_text(tag_name, **identifiers):
if tag := upload_form_tag.find(tag_name, identifiers):
return utils.html.as_text(tag)
return ''
def get_option(**identifiers):
if select_tag := upload_form_tag.find('select', identifiers):
if selected_option_tag := select_tag.find('option', selected=True):
return selected_option_tag.get('value', '')
if default_option_tag := select_tag.find('option'):
return default_option_tag.get('value', '')
return ''
def get_artists(importance):
return tuple(
artist_name
for artist_tag in upload_form_tag.find_all('input', id='artist')
if (
(importance_tag := artist_tag.find_next_sibling('select', id='importance'))
and (selected_tag := importance_tag.find('option', selected=True))
and (selected_value := int(selected_tag.get('value', '-1'))) >= 0
and (artist_importance := metadata.ArtistImportance(selected_value))
and artist_importance is importance
and (artist_name := artist_tag.get('value'))
)
)
info = {
'title_original': get_value('input', id='title'),
'title_english': get_value('input', id='alternate_title'),
'year': get_value('input', id='year'),
'countries': (
tuple(country.strip() for country in countries.split(','))
if (countries := get_value('input', id='country')) else ()
),
'languages': (
tuple(language.strip() for language in languages.split(','))
if (languages := get_value('input', id='language')) else ()
),
'runtimes': (
{'default': runtime}
if (runtime := get_value('input', id='runtime')) else {}
),
'genres': (
tuple(tag.strip() for tag in tags.split(','))
if (tags := get_value('input', id='tags')) else ()
),
'poster_url': get_value('input', id='image'),
'summary': get_text('textarea', id='album_desc'),
'directors': get_artists(metadata.ArtistImportance.DIRECTOR),
'cast': get_artists(metadata.ArtistImportance.ACTOR),
'trailer_service': get_option(name='embedded_content[Provider]'),
'trailer_id': get_value('input', name='embedded_content[Id]'),
}
if name is None:
return info
else:
return info[name]
else:
_log.debug('no upload form found: %r', doc)
return {}
[docs]
async def upload(self, tracker_jobs):
post_data = utils.merge_dicts(tracker_jobs.post_data, {'auth': self._auth})
_log.debug('Upload POST data:\n')
for k, v in post_data.items():
_log.debug(' * %s = %s', k, v)
post_files = tracker_jobs.post_files
_log.debug('POSTing files: %r', post_files)
response = await self._request(
'POST',
url=self._upload_url,
cache=False,
data=post_data,
files=post_files,
)
# Get URL to uploaded torrent from HTTP 302 redirect location.
redirect_path = response.headers.get('location', '')
_log.debug('HTTP redirect: %r', redirect_path)
if 'torrents.php' in redirect_path:
torrent_page_url = urllib.parse.urljoin(self._base_url, redirect_path)
return torrent_page_url
else:
self._raise_upload_error(response)
def _raise_upload_error(self, html):
doc = utils.html.parse(html)
if error_tag := doc.find('p', style='color: red; text-align: center;'):
msg = utils.html.as_text(error_tag)
raise errors.RequestError(f'Upload failed: {msg}')
else:
filepath = f'upload_failed.{self.name}.html'
utils.html.dump(doc, filepath)
raise RuntimeError(f'Upload failed: No error message found (dumped HTML response to {filepath})')