"""
Wrapper for ``bdinfo`` command
"""
import functools
import os
import re
import tempfile
from .. import errors, utils
from . import base
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class BdinfoJob(base.JobBase):
"""
Provide (partial) BDInfo report
This job adds the following signals to :attr:`~.JobBase.signal`:
``progress``
Emitted in short intervals after :meth:`playlists_selected` was called at least once
until all BDInfo reports are available. Registered callbacks get a
:class:`BdinfoProgress` instance.
``bdinfo_report``
Emitted when a BDInfo report is available. Registered callbacks get a
:class:`~.BdinfoReport` instance.
"""
name = 'bdinfo'
label = 'BDInfo'
# Caching is done by the daemon process.
cache_id = None
_DEFAULT_FORMAT = '{BDINFO}'
[docs]
def initialize(self, *, summary=None, format=_DEFAULT_FORMAT):
"""
Set internal state
:param summary: Only provide a shorter summary as output
One of ``full``, ``quick`` or `None`.
Note that this only affects the :attr:`output`. :attr:`bdinfo_reports`,
:attr:`full_summaries` and :attr:`quick_summaries` are always available.
:param format: String that contains the placeholder ``"{BDINFO}"``, which is replaced by the
actual BDInfo report
"""
self._summary = summary
self._format = str(format)
self._bdinfo_reports = []
self._reports_by_file = {}
self._bdinfo_process = None
self.signal.add('bdinfo_progress')
self.signal.add('bdinfo_report')
self.signal.register('bdinfo_report', self._store_bdinfo_report)
self.signal.register('finished', self._hide_job)
def _hide_job(self, job):
# Do not display the BDInfo report as output. It can be very long.
self.hidden = True
[docs]
async def run(self):
self._bdinfo_process = utils.daemon.DaemonProcess(
target=_bdinfo_process,
kwargs={
'cache_directory': self.cache_directory,
'ignore_cache': self.ignore_cache,
},
info_callback=self._handle_info,
error_callback=self._handle_error,
)
self._bdinfo_process.start()
# Receive selected playlists from PlaylistsJob and pass them on to _bdinfo_process().
async for discpath, playlists in self.receive_all('playlists', 'playlists_selected', only_posargs=True):
_log.debug('Sending playlists to _bdinfo_process(): %r', playlists)
self._bdinfo_process.send(utils.daemon.MsgType.info, {'playlists_selected': (discpath, playlists)})
_log.debug('All playlists received')
# Tell _bdinfo_process() that there won't be any more playlists.
self._bdinfo_process.send(utils.daemon.MsgType.info, {'playlists_selected': _ALL_PLAYLIST_SELECTIONS_MADE})
await self._bdinfo_process.join()
self._add_bdinfo_reports()
def _add_bdinfo_reports(self):
if not self._summary:
reports = self.bdinfo_reports
elif self._summary == 'full':
reports = self.full_summaries
elif self._summary == 'quick':
reports = self.quick_summaries
else:
raise RuntimeError(f'Unexpected summary value: {self._summary!r}')
for report in reports:
self.add_output(self._format.format(BDINFO=report))
[docs]
def terminate(self, reason=None):
if self._bdinfo_process:
self._bdinfo_process.stop()
super().terminate(reason=reason)
@base.unless_job_is_finished
def _handle_info(self, info):
if 'bdinfo_progress' in info:
self.signal.emit('bdinfo_progress', info['bdinfo_progress'])
elif 'bdinfo_report' in info:
self.signal.emit('bdinfo_report', info['bdinfo_report'])
else:
raise RuntimeError(f'Unexpected info: {info!r}')
@base.unless_job_is_finished
def _handle_error(self, error):
if isinstance(error, errors.DependencyError):
self.error(error)
else:
self.exception(error)
def _store_bdinfo_report(self, bdinfo_report):
self._bdinfo_reports.append(bdinfo_report)
self._reports_by_file[bdinfo_report.playlist.filepath] = bdinfo_report
@property
def bdinfo_reports(self):
"""Sequence of :class:`~.BdinfoReport` instances"""
return tuple(self._bdinfo_reports)
@property
def reports_by_file(self):
"""Map playlist filepaths (.mpls) to BDInfo reports gathered so far"""
return self._reports_by_file.copy()
@property
def full_summaries(self):
"""Sequence of :attr:`~.BdinfoReport.full_summary` values from :attr:`bdinfo_reports`"""
return tuple(report.full_summary for report in self.bdinfo_reports)
@property
def quick_summaries(self):
"""Sequence of :attr:`~.BdinfoReport.quick_summary` values from :attr:`bdinfo_reports`"""
return tuple(report.quick_summary for report in self.bdinfo_reports)
[docs]
class BdinfoProgress(dict):
"""
Simple :class:`dict` subclass with the following keys:
- ``playlist`` (:class:`~.disc.Playlist`)
- ``percent`` (:class:`int`)
- ``time_elapsed`` (:class:`~.types.Timestamp`)
- ``time_remaining`` (:class:`~.types.Timestamp`)
Keys are also conveniently available as attributes.
"""
def __init__(self, *, playlist, percent=0, time_elapsed=0, time_remaining=0):
self['playlist'] = playlist
self['percent'] = int(percent)
self['time_elapsed'] = utils.types.Timestamp(time_elapsed)
self['time_remaining'] = utils.types.Timestamp(time_remaining)
def __getattr__(self, name):
return self[name]
[docs]
class BdinfoReport(str):
"""
BDInfo report as special string with the additional properties :attr:`full_summary`,
:attr:`quick_summary` and :attr:`playlist`
"""
def __new__(cls, report, playlist):
self = super().__new__(cls, report)
self._playlist = playlist
return self
def __getnewargs__(self):
# Make instances picklable by providing positional arguments.
return (str(self), self.playlist)
@functools.cached_property
def full_summary(self):
"""Full summary of the BDInfo report"""
regex = re.compile(r'^.*(DISC\s+INFO:$.*?)^FILES:$.*', flags=re.DOTALL | re.MULTILINE)
match = regex.search(self)
if match:
return match.group(1).strip()
@functools.cached_property
def quick_summary(self):
"""Quick summary of the BDInfo report"""
regex = re.compile(r'^.*^QUICK\s+SUMMARY:(.*)$', flags=re.DOTALL | re.MULTILINE)
match = regex.search(self)
if match:
return match.group(1).strip()
@functools.cached_property
def playlist(self):
"""Source of the BDInfo report as a :class:`~.Playlist` instance"""
return self._playlist
def __repr__(self):
return f'<{type(self).__name__} {self.playlist!r}>'
# NOTE: This cannot be the typical `object()` constant because its identity differs between the main
# process and _bdinfo_process() and `... is _ALL_PLAYLIST_SELECTIONS_MADE` doesn't work as expected.
_ALL_PLAYLIST_SELECTIONS_MADE = '_ALL_PLAYLIST_SELECTIONS_MADE'
def _bdinfo_process(output_queue, input_queue, *, cache_directory, ignore_cache):
# This process reads user-selected playlists from the main process and runs `bdinfo` on them.
while True:
playlists_selected = utils.daemon.read_input_queue_key(input_queue, 'playlists_selected')
if playlists_selected == _ALL_PLAYLIST_SELECTIONS_MADE:
# No more values will be sent on `input_queue`.
break
else:
_discpath, playlists = playlists_selected
for playlist in playlists:
bdinfo = _get_bdinfo(
output_queue, input_queue,
playlist=playlist,
cache_directory=cache_directory,
ignore_cache=ignore_cache,
)
bdinfo_report = BdinfoReport(bdinfo, playlist)
output_queue.put((utils.daemon.MsgType.info, {'bdinfo_report': bdinfo_report}))
utils.daemon.maybe_terminate(input_queue)
def _get_bdinfo(output_queue, input_queue, *, playlist, cache_directory, ignore_cache):
# Make sure this is a BDMV path or we get weird exceptions.
if not utils.disc.is_bluray(playlist.discpath):
output_queue.put((utils.daemon.MsgType.error, errors.ContentError(f'Not a Blu-ray disc path: {playlist.discpath}')))
else:
cache_filepath = _get_cache_filepath(playlist, cache_directory)
if (
not ignore_cache
and (bdinfo := _get_bdinfo_from_cache(cache_filepath))
):
return bdinfo
else:
bdinfo = _get_bdinfo_from_bdinfo(output_queue, input_queue, playlist)
_write_bdinfo_to_cache(bdinfo, cache_filepath)
return bdinfo
def _get_bdinfo_from_bdinfo(output_queue, input_queue, playlist):
# Write BDInfo report to temporary directory that will be deleted automatically.
with tempfile.TemporaryDirectory() as bdinfo_directory:
# Because BDInfo may be running inside a docker container with a different user ID, we
# can't restrict permissions or the report cannot be written. This should not be an
# issue because the BDInfo report isn't private information.
os.chmod(bdinfo_directory, 0o777)
for progress in _generate_bdinfo(
playlist=playlist,
bdinfo_directory=bdinfo_directory,
):
utils.daemon.maybe_terminate(input_queue)
output_queue.put((utils.daemon.MsgType.info, {'bdinfo_progress': progress}))
return _find_bdinfo_in_directory(bdinfo_directory)
def _get_cache_filepath(playlist, cache_directory):
return os.path.join(
cache_directory,
f'{playlist.label}.{int(playlist.size)}.bdinfo',
)
def _get_bdinfo_from_cache(cache_filepath):
try:
with open(cache_filepath, 'r') as f:
return f.read()
except OSError as e:
msg = e.strerror or str(e)
_log.debug(f'Failed to read cache file {cache_filepath}: {msg}')
def _write_bdinfo_to_cache(bdinfo, cache_filepath):
try:
with open(cache_filepath, 'w') as f:
return f.write(bdinfo)
except OSError as e:
msg = e.strerror or str(e)
_log.debug(f'Failed to write cache file {cache_filepath}: {msg}')
def _generate_bdinfo(*, playlist, bdinfo_directory):
# Initial zero progress report while `bdinfo` executable is starting up, which can take a few
# seconds. This allows the UI to switch to a "report is being generated" display instead of just
# doing nothing.
yield BdinfoProgress(playlist=playlist)
# Execute bdinfo with `communicate=True` so we can yield progress reports while it is running.
argv = (
'bdinfo',
'--mpls=' + playlist.filename,
playlist.discpath,
bdinfo_directory,
)
process = utils.subproc.run(argv=argv, communicate=True)
# Monitor bdinfo scanning progress.
regex = re.compile(r'\D*?\b(?P<percent>\d+)(?:\.\d+|)\s?%.*?(?P<elapsed>\d+:\d+:\d+).*?(?P<remaining>\d+:\d+:\d+)')
time_elapsed = utils.types.Timestamp(0)
try:
for line in process.stdout:
match = regex.search(line)
if match:
time_elapsed = match.group('elapsed')
yield BdinfoProgress(
playlist=playlist,
percent=match.group('percent'),
time_elapsed=time_elapsed,
time_remaining=match.group('remaining'),
)
stderr = '\n'.join(process.stderr)
if stderr:
raise RuntimeError(f'Command failed: {argv}:\n{stderr}')
# Final progress report so the UI can reliably tell if a report is fully generated. This may
# be redundant, but we can't rely on upstream bdinfo executable to always print "100%".
yield BdinfoProgress(
playlist=playlist,
percent=100,
time_elapsed=time_elapsed,
time_remaining=0,
)
finally:
# Always make sure the bdinfo process is terminated. Otherwise, it may continue to run even
# after our main Python process terminates.
process.terminate()
def _find_bdinfo_in_directory(bdinfo_directory):
# Find report file. It should be the only file in `bdinfo_directory`.
files = os.listdir(bdinfo_directory)
if len(files) < 1:
raise RuntimeError(f'No BDInfo report found in temporary BDInfo report directory: {bdinfo_directory}')
elif len(files) > 1:
raise RuntimeError(f'Unexpected files in temporary BDInfo report directory: {bdinfo_directory}: {sorted(files)}')
else:
bdinfo_file_path = os.path.join(bdinfo_directory, files[0])
with open(bdinfo_file_path, 'r') as f:
return f.read()