Source code for upsies.jobs.screenshots

"""
Create screenshots from video file(s)
"""

import collections
import inspect
import os

from .. import errors, utils
from . import base

import logging  # isort:skip
_log = logging.getLogger(__name__)

shutil = utils.LazyModule(module='shutil', namespace=globals())


DEFAULT_NUMBER_OF_SCREENSHOTS = 2


[docs] class ScreenshotsJob(base.JobBase): r""" Create screenshots from video file(s) or :class:`~.Playlist` instances To get :class:`~.Playlist`\ s, this job relies on a :class:`~.PlaylistsJob` instance to be available via a job :attr:`~.JobBase.siblings` named "playlists". This job adds the following signals to :attr:`~.JobBase.signal`: ``screenshots_total`` Emitted before screenshots are created. Registered callbacks get the total number of screenshots as a positional argument. """ name = 'screenshots' label = 'Screenshots' # Caching of original/optimized screenshots is handled in the _screenshots/optimize_process() # functions based on file names, which we generate and should be unique. cache_id = None
[docs] def initialize(self, *, content_path, precreated=(), exclude_files=(), timestamps=(), count=0, from_all_videos=False, optimize='default', tonemap=False): r""" Set internal state :param str content_path: Path to file or directory or sequence of paths :param precreated: Sequence of paths of already existing screenshots These do not count towards the wanted number of screenshots. `count` screenshots are created in addition to any precreated screenshots. :param exclude_files: Sequence of glob patterns (:class:`str`) and :class:`re.Pattern` objects (return value from :func:`re.compile`) that are matched against the relative path beneath each `source` Glob patterns are matched case-insensitively. .. note:: Non-video files and stuff like `Sample.mkv` are always excluded by :func:`.fs.find_main_videos`. :param timestamps: Screenshot positions in the video :type timestamps: sequence of "[[H+:]M+:]S+" strings or seconds :param count: Number of screenshots to make or :func:`callable` that returns how many screenshots to make If this is a callable (synchronous or asynchronous), it is called when this job is :meth:`~.JobBase.start`\ ed, and :attr:`count` is ``0`` until then. :param bool from_all_videos: Whether to take `count` screenshots from each video file or only from the first video See :func:`.fs.find_main_videos` for more information. :param optimize: `level` argument for :func:`~image.optimize` If this is ``"default"``, missing optimization dependencies are silently ignored. :param bool tonemap: Whether to apply tonemap algorithm for HDR screenshots If `timestamps` and `count` are not given, screenshot positions are picked at even intervals. If `count` is larger than the number of `timestamps`, more timestamps are added. """ self._content_path = content_path self._precreated = precreated self._exclude_files = exclude_files self._timestamps = timestamps if callable(count): self._count = 0 self._count_callable = count else: self._count = count self._count_callable = None self._from_all_videos = from_all_videos self._optimize = optimize self._tonemap = tonemap self._screenshots_created = 0 self._screenshots_total = -1 self._screenshots_process = None self._optimize_process = None self._screenshots_by_file = collections.defaultdict(list) self.signal.add('screenshots_total', record=True)
[docs] async def run(self): """Execute subprocesses for screenshot creation and optimization""" # Execute subprocesses await self._execute_screenshots_process() if self._optimize not in ('none', None): self._execute_optimize_process() if utils.disc.is_disc(self._content_path, multidisc=True): await self._process_playlists() else: await self._process_file_or_directory() # Wait for subprocesses await self._screenshots_process.join() if self._optimize_process: await self._optimize_process.join()
async def _execute_screenshots_process(self): if inspect.iscoroutinefunction(self._count_callable): self._count = await self._count_callable() elif callable(self._count_callable): self._count = self._count_callable() _log.debug('Screenshots per video file: %s', self._count) self._screenshots_process = utils.daemon.DaemonProcess( name='_screenshots_process', target=_screenshots_process, kwargs={ 'precreated': self._precreated, 'exclude_files': self._exclude_files, 'timestamps': self._timestamps, 'count': self._count, 'from_all_videos': self._from_all_videos, 'output_dir': self.cache_directory, 'overwrite': self.ignore_cache, 'tonemap': self._tonemap, }, info_callback=self._handle_info, error_callback=self._handle_error, ) self._screenshots_process.start() def _execute_optimize_process(self): self._optimize_process = utils.daemon.DaemonProcess( name='_optimize_process', target=_optimize_process, kwargs={ 'level': self._optimize, 'overwrite': self.ignore_cache, # Ignore missing dependecy if we do "default" optimization 'ignore_dependency_error': self._optimize == 'default', 'cache_directory': self.cache_directory, }, info_callback=self._handle_info, error_callback=self._handle_error, ) self._optimize_process.start() async def _process_playlists(self): async for _discpath, playlists in self.receive_all('playlists', 'playlists_selected', only_posargs=True): _log.debug('Sending playlists to _screenshots_process(): %r', playlists) for playlist in playlists: self._screenshots_process.send(utils.daemon.MsgType.info, {'source': playlist}) self._screenshots_process.send(utils.daemon.MsgType.info, {'source': _ALL_SOURCES_SELECTED}) async def _process_file_or_directory(self): self._screenshots_process.send(utils.daemon.MsgType.info, {'source': self._content_path}) self._screenshots_process.send(utils.daemon.MsgType.info, {'source': _ALL_SOURCES_SELECTED}) @base.unless_job_is_finished def _handle_info(self, info): if 'screenshots_total' in info: self._screenshots_total = info['screenshots_total'] self.signal.emit('screenshots_total', self._screenshots_total) elif 'screenshot_filepath' in info: if self._optimize_process: _log.debug('Screenshot: %s: %.2f KiB', info['screenshot_filepath'], (utils.fs.file_size(info['screenshot_filepath']) or 0) / 1024) self._optimize_process.send(utils.daemon.MsgType.info, info) else: _log.debug('Screenshot: %s: %.2f KiB', info['screenshot_filepath'], (utils.fs.file_size(info['screenshot_filepath']) or 0) / 1024) self.add_output(info['screenshot_filepath'], info['video_filepath'], info['source']) elif 'optimized_screenshot_filepath' in info: _log.debug('Optimized %s: %.2f KiB', info['optimized_screenshot_filepath'], (utils.fs.file_size(info['optimized_screenshot_filepath']) or 0) / 1024) self.add_output(info['optimized_screenshot_filepath'], info['video_filepath'], info['source']) else: raise RuntimeError(f'Unexpected info: {info!r}') if self._optimize_process and self.screenshots_created == self.screenshots_total: self._optimize_process.stop() @base.unless_job_is_finished def _handle_error(self, error): if ( isinstance(error, ( errors.ScreenshotError, errors.ImageOptimizeError, errors.DependencyError, )) or not isinstance(error, BaseException) ): self.error(error) else: raise error
[docs] def terminate(self, reason=None): """ Stop screenshot creation and optimization subprocesses before terminating the job """ if self._screenshots_process: self._screenshots_process.stop() if self._optimize_process: self._optimize_process.stop() super().terminate(reason=reason)
@property def exit_code(self): """`0` if all screenshots were made, `1` otherwise, `None` if unfinished""" if self.is_finished: if self.screenshots_total < 0: # Job is finished but _screenshots_process() never told us how many screenshots # should be created. That means we're either using previously cached output or the # job was cancelled while _screenshots_process() was still initializing. if self.output: # If we have cached output, assume the cached number of screenshots is what the # user wanted because the output of failed jobs is not cached (see # JobBase._write_cache()). return 0 else: return 1 elif len(self.output) == self.screenshots_total: return 0 else: return 1 @property def exclude_files(self): """ Sequence of glob and :class:`regex <re.Pattern>` patterns to exclude See :meth:`initialize` for more information. Setting this property when this job :attr:`~.JobBase.is_started` raises :class:`RuntimeError`. """ return self._exclude_files @exclude_files.setter @base.raise_if_started def exclude_files(self, exclude_files): self._exclude_files = exclude_files @property def from_all_videos(self): """ Whether to make screenshots from all video files or only the first Setting this property when this job :attr:`~.JobBase.is_started` raises :class:`RuntimeError`. """ return self._from_all_videos @from_all_videos.setter @base.raise_if_started def from_all_videos(self, from_all_videos): self._from_all_videos = from_all_videos @property def count(self): """ How many screenshots to make per video file Setting this property when this job :attr:`~.JobBase.is_started` raises :class:`RuntimeError`. """ return self._count @count.setter @base.raise_if_started def count(self, count): self._count = count @property def timestamps(self): """ Specific list of timestamps to make Setting this property when this job :attr:`~.JobBase.is_started` raises :class:`RuntimeError`. """ return self._timestamps @timestamps.setter @base.raise_if_started def timestamps(self, timestamps): self._timestamps = timestamps @property def screenshots_total(self): """ Total number of screenshots to make .. note:: This is ``-1`` until the subprocess that creates the screenshots is executed and determined the number of screenshots. """ return self._screenshots_total @property def screenshots_created(self): """Total number of screenshots made so far""" return self._screenshots_created @property def screenshots_by_file(self): """Map video file paths to sequences of generated screenshot file paths so far""" return { video_filepath: tuple(screenshot_paths) for video_filepath, screenshot_paths in self._screenshots_by_file.items() }
[docs] def add_output(self, screenshot_filepath, video_filepath, source): """ Populate :attr:`~.JobBase.output` and :attr:`screenshots_by_file` and bump :attr:`screenshots_created` :param screenshots_filepath: Path to screenshot file :param video_filepath: Path to video file `screenshot_filepath` is from :param source: Path to release directory, :class:`~.Playlist` instance or the same as `video_filepath` """ def normalize_path(path): # Resolve symbolic links and make path absolute. return os.path.abspath(os.path.realpath(path)) # Copy screenshot to home_directory if it is different from cache_directory. home_directory = normalize_path(self.home_directory) cache_directory = normalize_path(self.cache_directory) if home_directory != cache_directory: destination_path = self.home_directory or '.' try: screenshot_filepath = shutil.copy2(screenshot_filepath, destination_path) except OSError as e: msg = e.strerror or str(e) self.error(f'{msg}: {destination_path}') return # Register successfully created screenshot internally. self._screenshots_created += 1 if isinstance(source, utils.disc.Playlist): self._screenshots_by_file[source.filepath].append(screenshot_filepath) else: self._screenshots_by_file[video_filepath].append(screenshot_filepath) return super().add_output(screenshot_filepath)
def _screenshots_process( output_queue, input_queue, *, precreated, exclude_files, timestamps, count, from_all_videos, output_dir, overwrite, tonemap, ): # First we have to collect all video files we want to create screenshots from. This allows us to # report the total number of screenshots. screenshot_infos = _get_screenshot_infos( input_queue=input_queue, custom_timestamps=timestamps, count=count, exclude_files=exclude_files, from_all_videos=from_all_videos, ) # How many screenshots we are going to make. This is used for displaying progress. screenshots_total = len(precreated) + len(screenshot_infos) output_queue.put((utils.daemon.MsgType.info, {'screenshots_total': screenshots_total})) # Feed user-provided, precreated screenshots back into the normal processing pipeline. for screenshot_filepath in precreated: output_queue.put((utils.daemon.MsgType.info, { 'screenshot_filepath': screenshot_filepath, 'video_filepath': '', 'source': '', })) try: for source, video_filepath, screenshot_filename_base, timestamp in screenshot_infos: screenshot_filepath = _make_screenshot( video_filepath=video_filepath, screenshot_filename_base=screenshot_filename_base, timestamp=timestamp, output_dir=output_dir, overwrite=overwrite, tonemap=tonemap, ) output_queue.put((utils.daemon.MsgType.info, { 'screenshot_filepath': screenshot_filepath, 'video_filepath': video_filepath, 'source': source, })) except errors.ScreenshotError as e: output_queue.put((utils.daemon.MsgType.error, e)) _ALL_SOURCES_SELECTED = '_ALL_SOURCES_SELECTED' def _get_screenshot_infos(input_queue, *, custom_timestamps, count, exclude_files, from_all_videos): screenshot_infos = [] while True: source = utils.daemon.read_input_queue_key(input_queue, 'source') if source == '_ALL_SOURCES_SELECTED': break else: # Return sequence of tuples where each contains all the information to create one screenshot. video_infos = _get_video_infos(source, exclude_files, from_all_videos) # We want to create `len(validated_timestamps)` screenshots for each item in `video_infos`. for source, video_filepath, screenshot_filename_base in video_infos: validated_timestamps = _validate_timestamps( video_filepath=video_filepath, timestamps=custom_timestamps, count=count, ) screenshot_infos.extend( (source, video_filepath, screenshot_filename_base, ts) for ts in validated_timestamps ) return tuple(screenshot_infos) def _get_video_infos(source, exclude_files, from_all_videos): if isinstance(source, utils.disc.Playlist): return _get_video_infos_from_playlist(source) else: return _get_video_infos_from_file_or_directory(source, exclude_files, from_all_videos) def _get_video_infos_from_playlist(playlist): # Include parent directory name. screenshot_filename_base = [playlist.discname] # Only include the playlist filename for BDMVs. For DVDs, it is implied by the file name of the # playlist item. if not playlist.filename.startswith('VTS_'): screenshot_filename_base.append(utils.fs.strip_extension(playlist.filename)) # Include file name of playlist item (e.g. 00003.m2ts or VTS_12_1.VOB) without file extension. screenshot_filename_base.append(utils.fs.strip_extension(utils.fs.basename(playlist.largest_item))) return ( # (<source>, <video_filepath>, <screenshot_filename_base>) (playlist, playlist.largest_item, '.'.join(screenshot_filename_base)), ) def _get_video_infos_from_file_or_directory(content_path, exclude_files, from_all_videos): video_filepaths = utils.fs.find_main_videos(content_path, exclude_files) video_infos = [] for video_filepath in video_filepaths: screenshot_filename_base = utils.fs.strip_extension(utils.fs.basename(video_filepath)) video_infos.append((content_path, video_filepath, screenshot_filename_base)) if not from_all_videos: break return tuple(video_infos) def _make_screenshot(*, video_filepath, screenshot_filename_base, timestamp, output_dir, overwrite, tonemap): screenshot_filepath = os.path.join( output_dir, screenshot_filename_base + f'.{timestamp}.png', ) if not overwrite and os.path.exists(screenshot_filepath): return screenshot_filepath else: return utils.image.screenshot( video_file=video_filepath, screenshot_file=screenshot_filepath, timestamp=timestamp, tonemap=tonemap, ) def _validate_timestamps(*, video_filepath, timestamps, count): # Validate, normalize, deduplicate and sort timestamps duration = utils.mediainfo.get_duration(video_filepath) if duration < 1: raise errors.ContentError(f'Video duration is too short: {duration}s') # Convert timestamp int/float/str to Timestamp and limit its value. validated_timestamps = [] min_ts = utils.types.Timestamp(0) max_ts = utils.types.Timestamp(duration) for ts in timestamps: try: ts = max(min_ts, min(max_ts, utils.types.Timestamp(ts))) except ValueError as e: raise errors.ContentError(e) from e else: validated_timestamps.append(ts) # Deduplicated validated_timestamps. validated_timestamps = sorted(set(validated_timestamps)) if not timestamps and not count: count = DEFAULT_NUMBER_OF_SCREENSHOTS # Add more timestamps if the user didn't specify less than `count`. if count > 0 and len(validated_timestamps) < count: # Get position as fraction of video duration for each timestamp: # 0.0 = Timestamp(0) # 1.0 = Timestamp(duration) positions = [ts / duration for ts in sorted(validated_timestamps)] # Include start and end of video. They are required for the algorithm below. if 0.0 not in positions: positions.insert(0, 0.0) if 1.0 not in positions: positions.append(1.0) # Sort positions so they can be paired to find the largest gap. positions.sort() # Add new positions between the two positions with the largest gap until we have the desired # number of screenshots. while len(validated_timestamps) < count: pairs = zip(positions, positions[1:]) _max_distance, pos1, pos2 = max((b - a, a, b) for a, b in pairs) new_position = ((pos2 - pos1) / 2) + pos1 validated_timestamps.append(utils.types.Timestamp(int(duration * new_position))) positions.append(new_position) positions.sort() # Return deduplicated, sorted, immutable validated_timestamps. return tuple(sorted(set(validated_timestamps))) def _optimize_process( output_queue, input_queue, *, level, overwrite, ignore_dependency_error, cache_directory, ): # Keep reading queued screenshots forever. read_input_queue_until_empty() raises # DaemonProcessTerminated if there's a `MsgType.terminate` queued up. msgs = [] while True: new_msgs = utils.daemon.read_input_queue_until_empty(input_queue) msgs.extend(new_msgs) if msgs: _typ, info = msgs.pop(0) _optimize_screenshot( output_queue=output_queue, screenshot_filepath=info['screenshot_filepath'], video_filepath=info['video_filepath'], source=info['source'], level=level, overwrite=overwrite, ignore_dependency_error=ignore_dependency_error, cache_directory=cache_directory, ) def _optimize_screenshot( output_queue, *, screenshot_filepath, video_filepath, source, level, overwrite, ignore_dependency_error, cache_directory, ): output_file = utils.fs.ensure_path_in_cache( os.path.join( utils.fs.dirname(screenshot_filepath), ( utils.fs.basename(utils.fs.strip_extension(screenshot_filepath)) + '.' + f'optimized={level}' + '.' + utils.fs.file_extension(screenshot_filepath) ) ), cache_directory, ) if not overwrite and os.path.exists(output_file): output_queue.put((utils.daemon.MsgType.info, { 'optimized_screenshot_filepath': output_file, 'video_filepath': video_filepath, 'source': source, })) else: try: optimized_screenshot_filepath = utils.image.optimize( screenshot_filepath, level=level, output_file=output_file, ) except errors.ImageOptimizeError as e: output_queue.put((utils.daemon.MsgType.error, e)) except errors.DependencyError as e: if ignore_dependency_error: # Act like we optimized `screenshot_filepath` output_queue.put((utils.daemon.MsgType.info, { 'optimized_screenshot_filepath': screenshot_filepath, 'video_filepath': video_filepath, 'source': source, })) else: raise e else: output_queue.put((utils.daemon.MsgType.info, { 'optimized_screenshot_filepath': optimized_screenshot_filepath, 'video_filepath': video_filepath, 'source': source, }))