Source code for upsies.trackers.sc.tracker

"""
Concrete :class:`~.TrackerBase` subclass for SC
"""

import re
import urllib

import async_lru

from ... import __project_name__, errors, utils
from ..base import TrackerBase
from . import config, metadata, rules
from .jobs import ScTrackerJobs

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class ScTracker(TrackerBase): name = 'sc' label = 'SC' torrent_source_field = 'SC' setup_howto_template = ( '{howto.introduction}\n' '\n' '{howto.next_section}. Login Credentials\n' '\n' ' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.username USERNAME\n' ' {howto.current_section}.2 $ upsies set trackers.{tracker.name}.password PASSWORD\n' ' {howto.current_section}.3 Store the login session cookie. (optional)\n' ' $ upsies set trackers.{tracker.name}.cookies_filepath "~/.cache/upsies/{tracker.name}.cookies"\n' ' WARNING: Anyone with acces to that file has full control over your {tracker.label} account.\n' '\n' '{howto.next_section}. Announce URL (Optional)\n' '\n' ' The announce URL is fetched from the website on demand, but you can\n' ' also configure it explicitly.\n' '\n' ' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.announce_url ANNOUNCE_URL\n' '\n' '{howto.screenshots}\n' '\n' '{howto.autoseed}\n' '\n' '{howto.reuse_torrents}\n' '\n' '{howto.upload}\n' ) TrackerJobs = ScTrackerJobs TrackerConfig = config.ScTrackerConfig cli_arguments = config.cli_arguments rules = rules @property def _base_url(self): return self.options['base_url'] @property def _login_url(self): return urllib.parse.urljoin(self._base_url, '/login.php') @property def _logout_url(self): return urllib.parse.urljoin(self._base_url, '/logout.php') @property def _upload_url(self): return urllib.parse.urljoin(self._base_url, '/upload.php') @property def _torrents_url(self): return urllib.parse.urljoin(self._base_url, '/torrents.php') async def _request(self, method, *args, error_prefix='', **kwargs): try: # `method` is "GET" or "POST" return await getattr(utils.http, method.lower())( *args, user_agent=True, cookies=self.cookies_filepath, follow_redirects=False, **kwargs, ) except errors.RequestError as e: if error_prefix: raise errors.RequestError(f'{error_prefix}: {e}') from e else: raise e async def _login(self): if not self.options.get('username'): raise errors.RequestError('Login failed: No username configured') elif not self.options.get('password'): raise errors.RequestError('Login failed: No password configured') post_data = { 'username': self.options['username'], 'password': self.options['password'], # Tell the server to remember our user session if the user wants to store it. 'keeplogged': '1' if self.cookies_filepath else None, 'login': 'Log in', } response = await self._request( 'POST', self._login_url, cache=False, data=post_data, error_prefix='Login failed', ) self._maybe_raise_login_error(response) def _maybe_raise_login_error(self, html): doc = utils.html.parse(html) _log.debug('FINDING ERROR:\n%s', doc.prettify()) msg = None # Find: "Your username or password was incorrect." if form_tag := doc.find('form', action='login.php'): # Remove actual <form> to leave us with only the error message. form_tag.table.extract() msg = utils.html.as_text(form_tag) if msg: raise errors.RequestError(f'Login failed: {msg}')
[docs] async def confirm_logged_in(self): response = await self._request('GET', self._base_url, cache=False) doc = utils.html.parse(response) auth_regex = re.compile(r'logout\.php\?.*\bauth=([0-9a-zA-Z]+)') logout_link_tag = doc.find('a', href=auth_regex) if logout_link_tag: logout_link_href = logout_link_tag['href'] match = auth_regex.search(logout_link_href) self._auth = match.group(1) else: raise errors.RequestError('Login failed for unknown reason')
async def _logout(self): try: await self._request( method='GET', url=self._logout_url, cache=False, params={'auth': self._auth}, error_prefix='Logout failed', ) finally: delattr(self, '_auth')
[docs] async def get_announce_url(self): if self.options.get('announce_url'): _log.debug('Getting announce URL from config file: %r', self.options['announce_url'].get_secret_value()) return self.options['announce_url'].get_secret_value() elif not self.is_logged_in: raise RuntimeError('Cannot get announce URL from website if not logged in') else: _log.debug('Getting announce URL from upload.php: %r', self._upload_url) response = await self._request( method='GET', url=self._upload_url, cache=False, ) doc = utils.html.parse(response) announce_url_tag = doc.find('input', value=re.compile(r'^https?://.*/announce\b')) if announce_url_tag: return announce_url_tag['value'] else: cmd = f'{__project_name__} set trackers.{self.name}.announce_url YOUR_URL' raise errors.RequestError(f'Failed to find announce URL - set it manually: {cmd}')
async def get_group_id(self, *, imdb_id): assert self.is_logged_in, 'Not logged in' # Find group via IMDb ID. response = await self._request( method='GET', url=self._torrents_url, params={'cataloguenumber': imdb_id}, # Cache must be disabled because group can be created at any time. cache=False, ) # Find group ID in search results. doc = utils.html.parse(response) if torrent_card_tag := doc.find('div', class_='torrent_card'): torrent_link_regex = re.compile(r'torrents\.php\?.*?id=(\d+)') if a_tag := torrent_card_tag.find('a', href=torrent_link_regex): group_id = torrent_link_regex.search(a_tag['href']).group(1) _log.debug('Group ID for %s: %r', imdb_id, group_id) return group_id _log.debug('No group ID for %s', imdb_id) @async_lru.alru_cache async def get_sc_info(self, group_id, name=None): assert self.is_logged_in, 'Not logged in' _log.debug('Getting SC %s for group: %r', name or 'info', group_id) response = await self._request( method='GET', url=self._upload_url, params={'groupid': group_id}, cache=True, ) doc = utils.html.parse(response) if upload_form_tag := doc.find('form', id="upload_table"): def get_value(tag_name, **identifiers): if tag := upload_form_tag.find(tag_name, identifiers): return utils.html.as_text(tag.get('value', '')) return '' def get_text(tag_name, **identifiers): if tag := upload_form_tag.find(tag_name, identifiers): return utils.html.as_text(tag) return '' def get_option(**identifiers): if select_tag := upload_form_tag.find('select', identifiers): if selected_option_tag := select_tag.find('option', selected=True): return selected_option_tag.get('value', '') if default_option_tag := select_tag.find('option'): return default_option_tag.get('value', '') return '' def get_artists(importance): return tuple( artist_name for artist_tag in upload_form_tag.find_all('input', id='artist') if ( (importance_tag := artist_tag.find_next_sibling('select', id='importance')) and (selected_tag := importance_tag.find('option', selected=True)) and (selected_value := int(selected_tag.get('value', '-1'))) >= 0 and (artist_importance := metadata.ArtistImportance(selected_value)) and artist_importance is importance and (artist_name := artist_tag.get('value')) ) ) info = { 'title_original': get_value('input', id='title'), 'title_english': get_value('input', id='alternate_title'), 'year': get_value('input', id='year'), 'countries': ( tuple(country.strip() for country in countries.split(',')) if (countries := get_value('input', id='country')) else () ), 'languages': ( tuple(language.strip() for language in languages.split(',')) if (languages := get_value('input', id='language')) else () ), 'runtimes': ( {'default': runtime} if (runtime := get_value('input', id='runtime')) else {} ), 'genres': ( tuple(tag.strip() for tag in tags.split(',')) if (tags := get_value('input', id='tags')) else () ), 'poster_url': get_value('input', id='image'), 'summary': get_text('textarea', id='album_desc'), 'directors': get_artists(metadata.ArtistImportance.DIRECTOR), 'cast': get_artists(metadata.ArtistImportance.ACTOR), 'trailer_service': get_option(name='embedded_content[Provider]'), 'trailer_id': get_value('input', name='embedded_content[Id]'), } if name is None: return info else: return info[name] else: _log.debug('no upload form found: %r', doc) return {}
[docs] async def upload(self, tracker_jobs): post_data = utils.merge_dicts(tracker_jobs.post_data, {'auth': self._auth}) _log.debug('Upload POST data:\n') for k, v in post_data.items(): _log.debug(' * %s = %s', k, v) post_files = tracker_jobs.post_files _log.debug('POSTing files: %r', post_files) response = await self._request( 'POST', url=self._upload_url, cache=False, data=post_data, files=post_files, ) # Get URL to uploaded torrent from HTTP 302 redirect location. redirect_path = response.headers.get('location', '') _log.debug('HTTP redirect: %r', redirect_path) if 'torrents.php' in redirect_path: torrent_page_url = urllib.parse.urljoin(self._base_url, redirect_path) return torrent_page_url else: self._raise_upload_error(response)
def _raise_upload_error(self, html): doc = utils.html.parse(html) if error_tag := doc.find('p', style='color: red; text-align: center;'): msg = utils.html.as_text(error_tag) raise errors.RequestError(f'Upload failed: {msg}') else: filepath = f'upload_failed.{self.name}.html' utils.html.dump(doc, filepath) raise RuntimeError(f'Upload failed: No error message found (dumped HTML response to {filepath})')