"""
Concrete :class:`~.TrackerBase` subclass for MTV
"""
import re
import urllib
from ... import __project_name__, errors, utils
from ..base import TrackerBase
from . import config, rules
from .jobs import MtvTrackerJobs
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class MtvTracker(TrackerBase):
name = 'mtv'
label = 'MTV'
torrent_source_field = 'MTV'
setup_howto_template = (
'{howto.introduction}\n'
'\n'
'{howto.next_section}. Login Credentials\n'
'\n'
' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.username USERNAME\n'
' {howto.current_section}.2 $ upsies set trackers.{tracker.name}.password PASSWORD\n'
' {howto.current_section}.3 Store the login session cookie. (optional)\n'
' $ upsies set trackers.{tracker.name}.cookies_filepath "~/.cache/upsies/{tracker.name}.cookies"\n'
' WARNING: Anyone with acces to that file has full control over your {tracker.label} account.\n'
'\n'
'{howto.next_section}. Announce URL (Optional)\n'
'\n'
' The announce URL is fetched from the website on demand, but you can\n'
' also configure it explicitly.\n'
'\n'
' {howto.current_section}.1 $ upsies set trackers.{tracker.name}.announce_url ANNOUNCE_URL\n'
'\n'
'{howto.screenshots}\n'
'\n'
'{howto.autoseed}\n'
'\n'
'{howto.reuse_torrents}\n'
'\n'
'{howto.upload}\n'
)
TrackerJobs = MtvTrackerJobs
TrackerConfig = config.MtvTrackerConfig
cli_arguments = config.cli_arguments
rules = rules
@property
def _base_url(self):
return self.options['base_url']
@property
def _login_url(self):
return urllib.parse.urljoin(self._base_url, '/login')
@property
def _login_tfa_url(self):
return urllib.parse.urljoin(self._base_url, '/twofactor/login')
@property
def _logout_url(self):
return urllib.parse.urljoin(self._base_url, '/logout')
@property
def _upload_url(self):
return urllib.parse.urljoin(self._base_url, '/upload.php')
@property
def _torrents_url(self):
return urllib.parse.urljoin(self._base_url, '/torrents.php')
async def _request(self, method, *args, **kwargs):
return await getattr(utils.http, method.lower())(
*args,
user_agent=True,
cookies=self.cookies_filepath,
follow_redirects=False,
**kwargs,
)
async def _login(self, *, tfa_otp=None):
if not self.options.get('username'):
raise errors.RequestError('Login failed: No username configured')
elif not self.options.get('password'):
raise errors.RequestError('Login failed: No password configured')
if not tfa_otp:
# First request sends username and password.
request_kwargs = {
'url': self._login_url,
'data': {
'token': await self._get_token(self._login_url),
'username': self.options['username'],
'password': self.options['password'],
# Tell the server to remember our user session if the user wants to store it.
'keeploggedin': '1' if self.cookies_filepath else None,
# Lock session to our IP address unless we have a long-running user session
# during which our IP address may change.
'iplocked': None if self.cookies_filepath else '1',
'cinfo': '1280|720|24|0',
'submit': 'login',
},
}
else:
# Second request sends 2FA OTP.
request_kwargs = {
'url': self._login_tfa_url,
'data': {
'token': await self._get_token(self._login_tfa_url),
'code': tfa_otp,
'submit': 'login'
},
}
response = await self._request('POST', **request_kwargs)
if response.headers.get('location', '').endswith('twofactor/login'):
raise errors.TfaRequired('2FA OTP required')
[docs]
async def confirm_logged_in(self):
response = await self._request('GET', self._base_url)
doc = utils.html.parse(response)
user_page_link_regex = re.compile(r'/user.php(?:\?|.*?&)id=(\d+)')
user_page_link_tag = doc.find('a', href=user_page_link_regex)
if not user_page_link_tag:
raise errors.RequestError('Login failed')
async def _logout(self):
token = await self._get_token(self._base_url)
await self._request(
'POST',
url=self._logout_url,
data={'token': token},
)
[docs]
async def get_announce_url(self):
if self.options.get('announce_url'):
return self.options['announce_url'].get_secret_value()
elif not self.is_logged_in:
raise RuntimeError('Cannot get announce URL from website if not logged in')
else:
response = await self._request('GET', self._upload_url)
doc = utils.html.parse(response)
announce_url_tag = doc.find('input', value=re.compile(r'^https?://.*/announce\b'))
if announce_url_tag:
return announce_url_tag['value']
else:
cmd = f'{__project_name__} set trackers.{self.name}.announce_url YOUR_URL'
raise errors.RequestError(f'Failed to find announce URL - set it manually: {cmd}')
[docs]
@staticmethod
def calculate_piece_size_min_max(_bytes):
"""Anything from 32 KiB to 8 MiB is fine, regardless of content size"""
return (
32 * 1024, # 32 KiB
8 * (1024 ** 2), # 8 MiB
)
[docs]
async def upload(self, tracker_jobs):
_log.debug('Initial POST data:\n')
for k, v in tracker_jobs.post_data_upload.items():
_log.debug(' * %s = %s', k, v)
autofill_post_data = await self._make_autofill_request(tracker_jobs)
_log.debug('Autofill data: %r', autofill_post_data)
torrent_page_url = await self._make_upload_request(tracker_jobs, autofill_post_data)
_log.debug('Torrent page URL: %r', torrent_page_url)
return torrent_page_url
async def _make_autofill_request(self, tracker_jobs):
# First request uploads the torrent file. We extract "tempfileid" and
# "tempfilename" from the returned HTML form that must be returned in
# the second request. We can also extract the autogenerated "taglist".
post_data = await self._prepare_post_data(tracker_jobs.post_data_autofill)
_log.debug('Autofill POST data: %r', post_data)
post_files = {
'file_input': {
'file': tracker_jobs.torrent_filepath,
'mimetype': 'application/x-bittorrent',
},
}
response = await self._request(
'POST',
url=self._upload_url,
cache=False,
data=post_data,
files=post_files,
)
doc = utils.html.parse(response)
try:
return {
'tempfileid': self._get_form_value(doc, 'input', attrs={'name': 'tempfileid'}),
'tempfilename': self._get_form_value(doc, 'input', attrs={'name': 'tempfilename'}),
'taglist': self._get_form_value(doc, 'textarea', attrs={'name': 'taglist'}),
}
except ValueError as e:
# If we can't find required values, look for error message
_log.debug('Failed to extract values from autofill response: %s', e)
self._raise_error(doc, msg_prefix='Upload failed', tracker_jobs=tracker_jobs)
async def _make_upload_request(self, tracker_jobs, autofill_post_data):
# Second request combines our metadata with server-generated data from
# _make_autofill_request().
post_data = await self._prepare_post_data(tracker_jobs.post_data_upload)
post_data.update(autofill_post_data)
_log.debug('Final POST data:\n')
for k, v in post_data.items():
_log.debug(' * %s = %s', k, v)
response = await self._request(
'POST',
url=self._upload_url,
cache=False,
data=post_data,
)
# Get URL to uploaded torrent from HTTP 302 redirect location
redirect_path = response.headers.get('location', '')
_log.debug('HTTP 302 redirect location: %r', redirect_path)
if 'torrents.php' in redirect_path:
torrent_page_url = urllib.parse.urljoin(self._base_url, redirect_path)
return torrent_page_url
# Find error message in HTML
doc = utils.html.parse(response)
self._raise_error_dupes(doc)
try:
self._raise_error(doc, msg_prefix='Upload failed', tracker_jobs=tracker_jobs)
except RuntimeError:
# _raise_error() can't find an error message.
# Dump response headers for debugging.
# HTML was already dumped by _raise_error()
headers_filepath = f'{utils.fs.basename(tracker_jobs.content_path)}.headers'
with open(headers_filepath, 'w') as f:
for k, v in response.headers.items():
f.write(f'{k}: {v}\n')
raise
async def _prepare_post_data(self, post_data):
post_data['auth'] = await self._get_auth()
return post_data
async def _get_token(self, url):
# Return hidden <input name="token"> tag's value.
response = await self._request('GET', url)
doc = utils.html.parse(response)
return self._get_form_value(doc, 'input', attrs={'name': 'token'})
async def _get_auth(self):
# "auth" seems to be a random string that never changes, and it must be
# returned in upload requests.
if not hasattr(self, '_auth'):
assert self.is_logged_in
response = await self._request('GET', self._upload_url)
doc = utils.html.parse(response)
# There is a limit for unchecked torrents. Of course, it is not presented like any other
# error message and there is no straightforward way to find in the HTML soup.
tags = doc.select('#upload #content > div > div > h2')
if tags:
text = utils.html.as_text(tags[0])
if re.search(r'\d+ torrents awaiting staff approval', text):
raise errors.RequestError(text)
self._auth = self._get_form_value(doc, 'input', attrs={'name': 'auth'})
return self._auth
def _get_form_value(self, doc, tag_name, **kwargs):
try:
tag = doc.find(tag_name, **kwargs)
# <input value="...">
value = tag.get('value')
if value:
return value
# <textarea>value</textarea>
value = utils.html.as_text(tag)
if value:
return value
raise ValueError(f'Tag has no value: {tag}')
except AttributeError:
pass
raise ValueError(f'Could not find tag: {tag_name}')
def _raise_error(self, doc, msg_prefix, tracker_jobs=None):
error_tag = doc.find('div', attrs={'class': 'error'})
if error_tag:
msg = utils.html.as_text(error_tag)
raise errors.RequestError(f'{msg_prefix}: {msg}')
# Example error: "The exact same torrent file already exists on the site!"
alert_tag = doc.find('div', attrs={'class': 'alert'})
if alert_tag:
msg = utils.html.as_text(alert_tag)
raise errors.RequestError(f'{msg_prefix}: {msg}')
# TODO: Remove this if the "zero bytes response / GroupID cannot be null" bug is fixed.
# Also remember to remove the then unnecessary `tracker_jobs` argument.
if not doc.string and tracker_jobs:
_log.debug('Empty server response: %r', doc.prettify())
bug_report = ' '.join((
tracker_jobs.release_name.title_with_aka_and_year,
f'https://imdb.com/title/{tracker_jobs.imdb_id}',
))
forum_link = f'{self._base_url}/forum/thread/3338'
raise errors.RequestError(
f'{msg_prefix}: "GroupID cannot be null" bug encountered.\n'
'\n'
f'Please post the following information here: {forum_link}\n'
'\n'
f' {bug_report}\n'
'\n'
f'You will get a reply from staff when the issue is fixed and you can try again.\n'
'\n'
'Here is an owl for your inconvenience:\n'
'\n'
' ^ ^\n'
' (O,O)\n'
'\\ ( ) _,~´\n'
' `~"-"~~~´\n'
)
filepath = f'{msg_prefix}.{self.name}.html'
utils.html.dump(doc, filepath)
raise RuntimeError(f'{msg_prefix}: No error message found (dumped HTML response to {filepath})')
def _raise_error_dupes(self, doc):
dupes_warning = doc.find('div', id="messagebar", string=re.compile(r'(?i:dupes?|duplicates?)'))
if dupes_warning:
_log.debug('Found dupes warning: %s', dupes_warning)
table = doc.find('table', id='torrent_table')
if table:
dupe_files = tuple(
utils.html.as_text(cell)
for row in table.find_all('tr', attrs={'class': 'torrent'})
for cell in row.find('td', attrs={'class': 'torrent'})
)
if dupe_files:
raise errors.FoundDupeError(dupe_files)