"""
Create torrent file
"""
import collections
import datetime
import errno
import fnmatch
import functools
import os
import re
import time
from .. import __project_name__, __version__, constants, errors, utils
from . import LazyModule, fs, types
torf = LazyModule(module='torf', namespace=globals())
SKIP_SEARCHING = 'skip_searching'
"""
Return value for the `progress_callback` to stop searching for a reusable
torrent (see :func:`create`)
"""
[docs]
def create(*, content_path, announce, source,
torrent_path, exclude=(), use_cache=True,
reuse_torrent_path=None,
piece_size_calculator=None, piece_size_min_max_calculator=None,
init_callback, progress_callback):
"""
Generate and write torrent file
:param str content_path: Path to the torrent's payload
:param str announce: Announce URL
:param str source: Value of the ``source`` field in the torrent. This makes
the torrent unique for each tracker to avoid cross-seeding issues, so it
is usually the tracker's abbreviated name.
:param str torrent_path: Path of the generated torrent file
:param exclude: Sequence of glob patterns (:class:`str`) and
:class:`re.Pattern` (return value from :func:`re.compile`) or
:class:`~.types.Regex` objects
Files beneath `content_path` are excluded from the torrent.
Glob patterns are matched case-insensitively. For case-insensitive
matching with regular expressions, use ``(?i:<pattern>)``.
:param bool use_cache: Whether to get piece hashes from previously created
torrents or from `reuse_torrent_path`
:param reuse_torrent_path: Path to existing torrent file to get hashed
pieces and piece size from. If the given torrent file doesn't match the
files in the torrent we want to create, hash the pieces normally.
If this is a directory, search it recursively for ``*.torrent`` files
and use the first one that matches.
Non-existing or otherwise unreadable paths as well as falsy values
(e.g. ``""`` or `None`) are silently ignored.
If this is a sequence, its items are expected to be directory or file
paths and handled as described above.
:param piece_size_calculator: Function that takes the torrent's content size
in bytes and returns the piece size
If this is `None`, the default implementation is used.
:param piece_size_min_max_calculator: Function that takes the torrent's
content size in bytes and returns the allowed minimum and maximum piece
sizes or `None` to use the default minimum or maximum piece size
If this is `None`, the default minimum and maximum piece sizes are used.
:param init_callback: Callable that is called once before torrent generation
commences with a :class:`Files` object
:param progress_callback: Callable that is called at regular intervals with
a :class:`CreateTorrentProgress` or :class:`FindTorrentProgress` object
as a positional argument
Callbacks can cancel the torrent creation by returning `True` or any other
truthy value. If `progress_callback` returns :data:`SKIP_SEARCHING`, the
search for a reusable torrent is cancelled and pieces are hashed normally.
:raise TorrentCreateError: if anything goes wrong
:return: `torrent_path` or `None` if cancelled
"""
if not announce:
raise errors.TorrentCreateError('Announce URL is empty')
if not source:
raise errors.TorrentCreateError('Source is empty')
# Create Torrent object
torrent = _get_torrent(
content_path=content_path,
exclude=_get_exclude_regexs(exclude),
announce=announce,
source=source,
)
# Custom piece_size management
if piece_size_min_max_calculator:
# Calculate custom piece_size boundaries based on torrent's content size
torrent.piece_size_min, torrent.piece_size_max = piece_size_min_max_calculator(torrent.size)
if piece_size_calculator:
# Calculate custom piece_size based on torrent's content size
torrent.piece_size = piece_size_calculator(torrent.size)
# Report files with `exclude` applied
cancelled = init_callback(Files(torrent))
if cancelled:
return None
if use_cache:
# Try to get piece hashes from existing torrent
cancelled = _find_hashes(
torrent=torrent,
reuse_torrent_path=reuse_torrent_path,
callback=progress_callback,
)
if cancelled and cancelled != SKIP_SEARCHING:
return None
if not torrent.is_ready:
# Hash pieces
cancelled = _generate_hashes(
torrent=torrent,
callback=progress_callback,
)
if cancelled:
return None
# Write generic torrent so we can reuse the hashes in the future
_store_generic_torrent(torrent)
# Write torrent to `torrent_path`
_write_torrent_path(torrent, torrent_path)
return torrent_path
def _get_exclude_regexs(exclude):
regexs = []
for pattern in exclude:
if isinstance(pattern, re.Pattern):
regexs.append(pattern)
elif isinstance(pattern, types.Regex):
regexs.append(re.compile(pattern.pattern))
elif isinstance(pattern, str):
regexs.append(re.compile(fnmatch.translate(str(pattern)), flags=re.IGNORECASE))
else:
raise TypeError(f'Unexpected exclude type: {pattern!r}')
return regexs
def _get_torrent(*, content_path, exclude, announce, source):
try:
return torf.Torrent(
path=content_path,
exclude_regexs=exclude,
trackers=((announce,),),
source=source,
private=True,
created_by=f'{__project_name__} {__version__}',
creation_date=time.time(),
)
except torf.TorfError as e:
raise errors.TorrentCreateError(str(e)) from e
def _generate_hashes(*, torrent, callback):
wrapped_callback = _CreateTorrentCallback(callback)
try:
torrent.generate(
callback=wrapped_callback,
interval=1.0,
)
except torf.TorfError as e:
raise errors.TorrentCreateError(str(e)) from e
else:
return wrapped_callback.return_value
def _find_hashes(*, torrent, reuse_torrent_path, callback):
if not torrent.files:
# All files are excluded (let someone deal with it)
return False
wrapped_callback = _FindTorrentCallback(callback)
try:
torrent.reuse(
_get_reuse_torrent_paths(torrent, reuse_torrent_path),
callback=wrapped_callback,
interval=1.0,
)
except torf.TorfError as e:
raise errors.TorrentCreateError(str(e)) from e
else:
return wrapped_callback.return_value
def _get_reuse_torrent_paths(torrent, reuse_torrent_path):
reuse_torrent_paths = []
if reuse_torrent_path:
if isinstance(reuse_torrent_path, str):
reuse_torrent_paths.append(reuse_torrent_path)
elif isinstance(reuse_torrent_path, collections.abc.Iterable):
reuse_torrent_paths.extend(p for p in reuse_torrent_path if p)
else:
raise ValueError(f'Invalid reuse_torrent_path: {reuse_torrent_path!r}')
generic_torrent_path = _get_generic_torrent_path(torrent=torrent, create_directory=False)
reuse_torrent_paths.insert(0, generic_torrent_path)
return tuple(
os.path.expanduser(path)
for path in reuse_torrent_paths
)
def _store_generic_torrent(torrent):
generic_torrent = torf.Torrent(
private=True,
created_by=f'{__project_name__} {__version__}',
creation_date=time.time(),
comment='This torrent is used to cache previously hashed pieces.',
)
_copy_torrent_info(torrent, generic_torrent)
generic_torrent_path = _get_generic_torrent_path(generic_torrent, create_directory=True)
generic_torrent.write(generic_torrent_path, overwrite=True)
def _write_torrent_path(torrent, torrent_path):
try:
torrent.write(torrent_path, overwrite=True)
except torf.TorfError as e:
raise errors.TorrentCreateError(str(e)) from e
def _get_generic_torrent_path(torrent, *, create_directory=True):
directory = constants.GENERIC_TORRENTS_DIRPATH
if create_directory:
try:
fs.mkdir(directory)
except errors.ContentError as e:
raise errors.TorrentCreateError(f'{directory}: {e}') from e
cache_id = _get_torrent_id(torrent)
filename = f'{torrent.name}.{cache_id}.torrent'
return os.path.join(directory, filename)
def _get_torrent_id_info(torrent):
return {
'name': torrent.name,
'files': tuple((str(f), f.size) for f in torrent.files),
}
def _get_torrent_id(torrent):
return utils.semantic_hash(_get_torrent_id_info(torrent))
def _copy_torrent_info(from_torrent, to_torrent):
from_info, to_info = from_torrent.metainfo['info'], to_torrent.metainfo['info']
to_info['pieces'] = from_info['pieces']
to_info['piece length'] = from_info['piece length']
to_info['name'] = from_info['name']
if 'length' in from_info:
to_info['length'] = from_info['length']
else:
to_info['files'] = from_info['files']
[docs]
class Files:
"""Structured torrent file content"""
def __init__(self, torrent):
self._torrent = torrent
@functools.cached_property
def list(self):
"""Sequence of existing file paths (:class:`str`)"""
return tuple(str(filepath) for filepath in self._torrent.filepaths)
@functools.cached_property
def tree(self):
"""
Nested files
This is a tree where each node is a tuple in which the first item is the
directory name and the second item is a sequence of `(file_name,
file_size)` or `(file_name, sub_tree)` tuples.
Example:
.. code::
('Parent',
('Foo', (
('Picture.jpg', 82489),
('Music.mp3', 5315672),
('More files', (
('This.txt', 57734),
('And that.txt', 184),
('Also some of this.txt', 88433),
)),
)),
('Bar', (
('Yee.mp4', 288489392),
('Yah.mkv', 3883247384),
)),
)
"""
return self._make_file_tree(
self._torrent.filetree,
parent_path=str(self._torrent.path.parent),
strip_leading_sep=False,
)
def _make_file_tree(self, filetree, *, parent_path='', strip_leading_sep=True):
files = []
for name, file in filetree.items():
path = os.sep.join((parent_path, name))
if strip_leading_sep:
path = path.strip(os.sep)
else:
path = path.rstrip(os.sep)
if isinstance(file, collections.abc.Mapping):
subtree = self._make_file_tree(file)
files.append((path, subtree))
else:
files.append((path, file.size))
return tuple(files)
@functools.cached_property
def excluded(self):
"""Sequence of file paths that exist but are not in the torrent for any reason"""
return tuple(
file_path
for file_path in utils.fs.file_list(self._torrent.path)
if file_path not in self.list
)
class _CallbackBase:
def __init__(self, callback):
self._callback = callback
self._progress_samples = []
self._time_started = time.time()
self.return_value = None
def __call__(self, progress):
self.return_value = return_value = self._callback(progress)
return return_value
def _calculate_info(self, items_done, items_total):
time_now = time.time()
percent_done = items_done / items_total * 100
seconds_elapsed = time_now - self._time_started
items_per_second = 0
items_remaining = items_total - items_done
seconds_remaining = 0
self._add_sample(time_now, items_done)
# Estimate how long torrent creation will take
samples = self._progress_samples
if len(samples) >= 2:
# Calculate the difference between each pair of samples
diffs = [
b[1] - a[1]
for a, b in zip(samples[:-1], samples[1:])
]
items_per_second = self._get_average(diffs, weight_factor=1.1)
if items_per_second > 0:
seconds_remaining = items_remaining / items_per_second
seconds_total = seconds_elapsed + seconds_remaining
time_finished = self._time_started + seconds_total
return {
'percent_done': percent_done,
'items_remaining': items_remaining,
'items_per_second': items_per_second,
'seconds_elapsed': datetime.timedelta(seconds=seconds_elapsed),
'seconds_remaining': datetime.timedelta(seconds=seconds_remaining),
'seconds_total': datetime.timedelta(seconds=seconds_total),
'time_finished': datetime.datetime.fromtimestamp(time_finished),
'time_started': datetime.datetime.fromtimestamp(self._time_started),
}
def _add_sample(self, time_now, items_done):
def get_sample_age(sample):
time_sample = sample[0]
return time_now - time_sample
samples = self._progress_samples
samples.append((time_now, items_done))
# Prune samples older than 10 seconds
while samples and get_sample_age(samples[0]) > 10:
del samples[0]
def _get_average(self, samples, weight_factor, get_value=lambda sample: sample):
# Give recent samples more weight than older samples
# https://en.wikipedia.org/wiki/Moving_average
weights = []
for _ in range(len(samples)):
try:
weight = weights[-1]
except IndexError:
weight = 1
weights.append(weight * weight_factor)
return sum(
get_value(sample) * weight
for sample, weight in zip(samples, weights)
) / sum(weights)
class _CreateTorrentCallback(_CallbackBase):
def __call__(self, torrent, filepath, pieces_done, pieces_total):
info = self._calculate_info(pieces_done, pieces_total)
piece_size = torrent.piece_size
bytes_per_second = types.Bytes(info['items_per_second'] * piece_size)
progress = CreateTorrentProgress(
pieces_done=pieces_done,
pieces_total=pieces_total,
percent_done=info['percent_done'],
bytes_per_second=bytes_per_second,
piece_size=piece_size,
total_size=torrent.size,
filepath=filepath,
seconds_elapsed=info['seconds_elapsed'],
seconds_remaining=info['seconds_remaining'],
seconds_total=info['seconds_total'],
time_finished=info['time_finished'],
time_started=info['time_started'],
)
return super().__call__(progress)
class _FindTorrentCallback(_CallbackBase):
def __call__(self, torrent, filepath, files_done, files_total, status, exception):
info = self._calculate_info(files_done, files_total)
# Ignore "No such file or directory". This should only happen if the
# generic torrent does not exist yet. For all other paths, torrent files
# are collected from traversing directories.
if isinstance(exception, torf.ReadError) and exception.errno == errno.ENOENT:
exception = None
if status is True:
status = 'hit'
elif status is False:
status = 'miss'
else:
status = 'verifying'
progress = FindTorrentProgress(
files_done=files_done,
files_total=files_total,
percent_done=info['percent_done'],
files_per_second=info['items_per_second'],
filepath=filepath,
status=status,
exception=errors.TorrentCreateError(str(exception)) if exception else None,
seconds_elapsed=info['seconds_elapsed'],
seconds_remaining=info['seconds_remaining'],
seconds_total=info['seconds_total'],
time_finished=info['time_finished'],
time_started=info['time_started'],
)
return super().__call__(progress)
[docs]
class CreateTorrentProgress(collections.namedtuple(
typename='CreateTorrentProgress',
field_names=(
'bytes_per_second',
'filepath',
'percent_done',
'piece_size',
'pieces_done',
'pieces_total',
'seconds_elapsed',
'seconds_remaining',
'seconds_total',
'time_finished',
'time_started',
'total_size',
),
)):
"""
:func:`~.collections.namedtuple` with these attributes:
- ``bytes_per_second`` (:class:`~.types.Bytes`)
- ``filepath`` (:class:`str`)
- ``percent_done`` (:class:`float`)
- ``piece_size`` (:class:`~.types.Bytes`)
- ``pieces_done`` (:class:`int`)
- ``pieces_total`` (:class:`int`)
- ``seconds_elapsed`` (:class:`~.datetime.datetime.timedelta`)
- ``seconds_remaining`` (:class:`~.datetime.datetime.timedelta`)
- ``seconds_total`` (:class:`~.datetime.datetime.timedelta`)
- ``time_finished`` (:class:`~.datetime.datetime.datetime`)
- ``time_started`` (:class:`~.datetime.datetime.datetime`)
- ``total_size`` (:class:`~.types.Bytes`)
"""
[docs]
class FindTorrentProgress(collections.namedtuple(
typename='CreateTorrentProgress',
field_names=(
'exception',
'filepath',
'files_done',
'files_per_second',
'files_total',
'percent_done',
'seconds_elapsed',
'seconds_remaining',
'seconds_total',
'status',
'time_finished',
'time_started',
),
)):
"""
:func:`~.collections.namedtuple` with these attributes:
- ``exception`` (:class:`~.errors.TorrentCreateError` or `None`)
- ``filepath`` (:class:`str`)
- ``files_done`` (:class:`int`)
- ``files_per_second`` (:class:`int`)
- ``files_total`` (:class:`int`)
- ``percent_done`` (:class:`float`)
- ``seconds_elapsed`` (:class:`~.datetime.datetime.timedelta`)
- ``seconds_remaining`` (:class:`~.datetime.datetime.timedelta`)
- ``seconds_total`` (:class:`~.datetime.datetime.timedelta`)
- ``status`` (``hit``, ``miss`` or ``verifying``)
- ``time_finished`` (:class:`~.datetime.datetime.datetime`)
- ``time_started`` (:class:`~.datetime.datetime.datetime`)
"""