Source code for upsies.jobs.mediainfo

"""
Wrapper for ``mediainfo`` command
"""

import os

from .. import errors, utils
from . import base

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class MediainfoJob(base.JobBase): """ Get output from ``mediainfo`` command This job adds the following signals to :attr:`~.JobBase.signal`: ``generating_report`` Emitted before mediainfo report generation begins. Registered callbacks get the video file path ``mediainfo`` was called with. ``generated_report`` Emitted after mediainfo report generation has finished. Registered callbacks get the video file path ``mediainfo`` was called with and the report text. """ name = 'mediainfo' label = 'Mediainfo' _DEFAULT_FORMAT = '{MEDIAINFO}' # Caching is done on a per-file basis in the daemon process. cache_id = None @property def content_path(self): """Instantiation argument of the same name""" return self._content_path
[docs] def initialize( self, *, content_path, from_all_videos=False, exclude_files=(), format='{MEDIAINFO}', ): """ Set internal state :param content_path: Path to video file or directory that contains a video file :param bool from_all_videos: Whether to get ``mediainfo`` output from each video file or only from the first video :param exclude_files: Sequence of glob patterns (:class:`str`) and :class:`re.Pattern` objects (return value from :func:`re.compile`) that are matched against each relative path beneath `content_path` Glob patterns are matched case-insensitively. .. note:: Non-video files and stuff like `Sample.mkv` are always excluded (see :func:`.fs.find_main_videos`). :param format: String that contains the placeholder ``"{MEDIAINFO}"``, which is replaced by the actual mediainfo Any other placeholders are ignored. """ self._content_path = content_path self._from_all_videos = from_all_videos self._exclude_files = exclude_files self._format = format self._reports_by_file = {} self._mediainfo_process = None self.signal.add('generating_report') self.signal.add('generated_report') self.signal.register('generated_report', self._store_report_by_file) self.signal.register('finished', self._hide_job)
def _hide_job(self, job): # Do not display the mediainfo report as output. It can be very long. self.hidden = True
[docs] async def run(self): self._mediainfo_process = utils.daemon.DaemonProcess( target=_mediainfo_process, kwargs={ 'from_all_videos': self._from_all_videos, 'exclude_files': self._exclude_files, 'cache_directory': self.cache_directory, 'ignore_cache': self.ignore_cache, }, info_callback=self._handle_info, error_callback=self._handle_error, ) self._mediainfo_process.start() if utils.disc.is_disc(self._content_path, multidisc=True): await self._process_playlists() else: await self._process_file_or_directory() await self._mediainfo_process.join() self._add_mediainfo_reports()
[docs] def terminate(self, reason=None): if self._mediainfo_process: self._mediainfo_process.stop() super().terminate(reason=reason)
async def _process_playlists(self): # Receive selected playlists from PlaylistsJob and pass them on to _mediainfo_process(). async for _discpath, playlists in self.receive_all('playlists', 'playlists_selected', only_posargs=True): _log.debug('Sending playlists to _mediainfo_process(): %r', playlists) for playlist in playlists: self._mediainfo_process.send(utils.daemon.MsgType.info, {'source': playlist}) _log.debug('All playlists received') # Tell _mediainfo_process() that there won't be any more playlists. self._mediainfo_process.send(utils.daemon.MsgType.info, {'source': _ALL_SOURCES_SELECTED}) async def _process_file_or_directory(self): self._mediainfo_process.send(utils.daemon.MsgType.info, {'source': self._content_path}) self._mediainfo_process.send(utils.daemon.MsgType.info, {'source': _ALL_SOURCES_SELECTED}) @base.unless_job_is_finished def _handle_info(self, info): if 'generating_report' in info: self.signal.emit('generating_report', info['generating_report']) elif 'generated_report' in info: self.signal.emit('generated_report', info['video_filepath'], info['generated_report']) else: raise RuntimeError(f'Unexpected info: {info!r}') @base.unless_job_is_finished def _handle_error(self, error): if isinstance(error, errors.ContentError): self.error(error) else: self.exception(error) def _add_mediainfo_reports(self): for report in self.reports_by_file.values(): # We are not using self._format.format(MEDIAINFO=report) because that turns "{" and "}" # into special characters that must be escaped, which the user should not expect. self.add_output(self._format.replace('{MEDIAINFO}', report)) def _store_report_by_file(self, video_filepath, mediainfo): _log.debug(f'Storing mediainfo report for {video_filepath}') self._reports_by_file[video_filepath] = mediainfo @property def reports_by_file(self): """ Map video file paths to ``mediainfo`` outputs gathered so far .. note:: For VIDEO_TS releases, one mediainfo is made for an ``.IFO`` file and a second mediainfo is made for a ``.VOB`` file. """ return self._reports_by_file.copy()
# NOTE: This cannot be the typical `object()` constant because its identity differs between the main # process and _mediainfo_process() and `... is _ALL_SOURCES_SELECTED` doesn't work as # expected. _ALL_SOURCES_SELECTED = '_ALL_SOURCES_SELECTED' def _mediainfo_process(output_queue, input_queue, *, from_all_videos, exclude_files, cache_directory, ignore_cache): # This process reads user-selected video sources from the main process and runs `mediainfo` on # them. while True: source = utils.daemon.read_input_queue_key(input_queue, 'source') utils.daemon.maybe_terminate(input_queue) if source == _ALL_SOURCES_SELECTED: # No more values will be sent on `input_queue`. break else: _send_mediainfo_reports_for_source( output_queue=output_queue, input_queue=input_queue, source=source, from_all_videos=from_all_videos, exclude_files=exclude_files, cache_directory=cache_directory, ignore_cache=ignore_cache, ) def _send_mediainfo_reports_for_source( output_queue, input_queue, *, source, from_all_videos, exclude_files, cache_directory, ignore_cache, ): for video_filepath in _get_filepaths_for_source(source, from_all_videos, exclude_files): utils.daemon.maybe_terminate(input_queue) output_queue.put((utils.daemon.MsgType.info, {'generating_report': video_filepath})) try: mediainfo_report = _get_mediainfo_report(video_filepath, cache_directory, ignore_cache) except errors.ContentError as e: output_queue.put((utils.daemon.MsgType.error, e)) else: output_queue.put((utils.daemon.MsgType.info, { 'generated_report': mediainfo_report, 'video_filepath': video_filepath, })) def _get_mediainfo_report(video_filepath, cache_directory, ignore_cache): cache_filepath = _get_cache_filepath(video_filepath, cache_directory) if not ignore_cache: # Try to read mediainfo from cache file or fail silently. try: with open(cache_filepath, 'r') as f: return f.read() except OSError as e: msg = e.strerror or str(e) _log.debug(f'Failed to read cache file {cache_filepath}: {msg}') # Create mediainfo report. mediainfo_report = utils.mediainfo.get_mediainfo_report(video_filepath) # Write mediainfo report to cache or fail silently. try: with open(cache_filepath, 'w') as f: f.write(mediainfo_report) except OSError as e: msg = e.strerror or str(e) _log.debug(f'Failed to write cache file {cache_filepath}: {msg}') finally: return mediainfo_report def _get_cache_filepath(video_filepath, cache_directory): return os.path.join( cache_directory, f'{utils.fs.basename(video_filepath)}.{utils.fs.file_size(video_filepath) or 0}.mediainfo', ) def _get_filepaths_for_source(source, from_all_videos, exclude_files): if isinstance(source, utils.disc.Playlist): if source.type == 'dvd': return ( source.filepath, # IFO source.largest_item, # VOB ) else: return (source.largest_item,) elif os.path.isdir(source): video_files = utils.fs.find_main_videos(source, exclude=exclude_files) if from_all_videos: return video_files elif video_files: return (video_files[0],) else: return () else: # Source is file. return (source,)