Source code for upsies.utils.image

"""
Dump frames from video file
"""

import os

from .. import errors, utils

import logging  # isort:skip
_log = logging.getLogger(__name__)


def _ffmpeg_executable():
    if utils.os_family() == 'windows':
        return 'ffmpeg.exe'
    else:
        return 'ffmpeg'


def _is_tonemappable(video_file):
    return any(
        hdr_format.startswith('HDR')
        for hdr_format in utils.mediainfo.video.get_hdr_formats(video_file)
    )


def _make_screenshot_cmd(video_file, timestamp, screenshot_file, tonemap):
    # ffmpeg's "image2" image file muxer uses "%" for string formatting, so we
    # must escape "%" in `video_file`
    screenshot_file = screenshot_file.replace('%', '%%')

    # -vf argument from:
    # https://rendezvois.github.io/video/screenshots/programs-choices/#ffmpeg

    vf = {}

    # Some magic to prevent grey screenshots for some BDMV releases. (Maybe related to VC-1?)
    if (
            utils.disc.is_bluray(video_file)
            or utils.fs.file_extension(video_file).lower() == 'm2ts'
    ):
        vf['fps'] = ('1/60',)

    # Always fix aspect ratio in case video is anamorphic. Doesn't matter if it isn't.
    vf['scale'] = ["'max(sar,1)*iw'", "'max(1/sar,1)*ih'"]

    if utils.mediainfo.video.is_bt2020(video_file):
        vf['scale'].extend((
            'in_h_chr_pos=0',
            'in_v_chr_pos=0',
            'in_color_matrix=bt2020',
        ))

    elif utils.mediainfo.video.is_bt709(video_file):
        vf['scale'].extend((
            'in_h_chr_pos=0',
            'in_v_chr_pos=128',
            'in_color_matrix=bt709',
        ))

    elif utils.mediainfo.video.is_bt601(video_file):
        vf['scale'].extend((
            'in_h_chr_pos=0',
            'in_v_chr_pos=128',
            'in_color_matrix=bt601',
        ))

    vf['scale'].append('flags=' + '+'.join((
        'full_chroma_int',
        'full_chroma_inp',
        'accurate_rnd',
        'spline',
    )))

    vf = ','.join(
        f'{filtername}=' + ':'.join(filtervalue)
        for filtername, filtervalue in vf.items()
    )

    if tonemap and _is_tonemappable(video_file):
        # https://ffmpeg.org/ffmpeg-filters.html#tonemap-1
        vf += ',zscale=t=linear,tonemap=hable,zscale=t=bt709,format=rgb24'

    return (
        _ffmpeg_executable(),
        '-y',
        '-hide_banner',
        '-loglevel', 'error',
        '-ss', str(timestamp),
        '-i', f'file:{video_file}',
        '-vf', vf,
        '-pix_fmt', 'rgb24',
        '-vframes', '1',
        f'file:{screenshot_file}',
    )


[docs] def screenshot(*, video_file, timestamp, screenshot_file, tonemap): """ Create single screenshot from video file :param str video_file: Path to video file :param timestamp: Time location in the video :type timestamp: :class:`~.Timestamp` (or any :class:`int` or :class:`float`) :param str screenshot_file: Path to screenshot file .. note:: It is important to use the returned file path because it is passed through :func:`~.fs.sanitize_path` to make sure it can exist. :raise ScreenshotError: if something goes wrong :return: Path to screenshot file """ # See if `video_file` is readable before we do further checks and launch ffmpeg, which will # produce a long and unexpected error message. try: utils.fs.assert_file_readable(video_file) except errors.ContentError as e: raise errors.ScreenshotError(e) from e # Validate timestamp. if not isinstance(timestamp, utils.types.Timestamp): try: timestamp = utils.types.Timestamp(timestamp) except (TypeError, ValueError) as e: raise errors.ScreenshotError(f'Invalid timestamp: {timestamp!r}') from e # Make `screenshot_file` compatible to the file system. screenshot_file = utils.fs.sanitize_path(screenshot_file) # Add "tonemapped" flag in file name. if tonemap and _is_tonemappable(video_file): screenshot_file = ( utils.fs.strip_extension(screenshot_file) + '.tonemapped.' + utils.fs.file_extension(screenshot_file) ) # Ensure timestamp is within range. duration = utils.types.Timestamp(utils.mediainfo.get_duration(video_file)) if timestamp > duration: raise errors.ScreenshotError(f'Timestamp is too close to or after end of video ({duration}): {timestamp}') # Make screenshot. cmd = _make_screenshot_cmd(video_file, timestamp, screenshot_file, tonemap) output = utils.subproc.run(cmd, ignore_errors=True, join_stderr=True) if not os.path.exists(screenshot_file): import shlex raise errors.ScreenshotError( f'{video_file}: Failed to create screenshot at {timestamp}: {output}\n' + ' '.join(shlex.quote(arg) for arg in cmd) ) else: return screenshot_file
def _make_resize_cmd(image_file, dimensions, resized_file): # ffmpeg's "image2" image file muxer uses "%" for string formatting resized_file = resized_file.replace('%', '%%') return ( _ffmpeg_executable(), '-y', '-hide_banner', '-loglevel', 'error', '-i', f'file:{image_file}', '-vf', f'scale={dimensions}:force_original_aspect_ratio=decrease', f'file:{resized_file}', )
[docs] def resize(image_file, *, width=0, height=0, target_directory=None, target_filename=None, overwrite=False): """ Resize image, preserve aspect ratio :param image_file: Path to source image :param width: Desired image width in pixels or `0` :param height: Desired image height in pixels or `0` :param target_directory: Where to put the resized image or `None` to use the parent directory of `image_file` :param target_filename: File name of resized image or `None` to generate a name from `image_file`, `width` and `height` :param bool overwrite: Whether to overwrite the resized image file if it already exists If `width` and `height` are falsy (the default) return `image_file` if `target_directory` and `target_filename` are falsy or copy `image_file` to the target path. .. note:: It is important to use the returned file path because it is passed through :func:`~.fs.sanitize_path` to make sure it can exist. :raise ImageResizeError: if resizing fails :return: Path to resized or copied image """ try: utils.fs.assert_file_readable(image_file) except errors.ContentError as e: raise errors.ImageResizeError(e) from e if width and width < 1: raise errors.ImageResizeError(f'Width must be greater than zero: {width}') elif height and height < 1: raise errors.ImageResizeError(f'Height must be greater than zero: {height}') dimensions_map = {'width': int(width), 'height': int(height)} ext_args = {'minlen': 3, 'maxlen': 4} def get_target_filename(): if target_filename: filename = utils.fs.strip_extension(target_filename, **ext_args) extension = utils.fs.file_extension(target_filename, **ext_args) if not extension: extension = utils.fs.file_extension(image_file, **ext_args) else: filename = utils.fs.basename(utils.fs.strip_extension(image_file, **ext_args)) dimensions = ','.join(f'{k}={v}' for k, v in dimensions_map.items() if v) if dimensions: filename += f'.{dimensions}' extension = utils.fs.file_extension(image_file, **ext_args) if extension: filename += f'.{extension}' else: filename += '.jpg' return filename def get_target_directory(): if target_directory: return str(target_directory) else: return utils.fs.dirname(image_file) # Assemble full target filepath and make sure it can exist target_filepath = utils.fs.sanitize_path( os.path.join(get_target_directory(), get_target_filename()), ) if not overwrite and os.path.exists(target_filepath): _log.debug('Already resized: %r', target_filepath) return target_filepath if not width and not height: # Nothing to resize if target_filepath != str(image_file): # Copy image_file to target_filepath try: utils.fs.mkdir(utils.fs.dirname(target_filepath)) except errors.ContentError as e: raise errors.ImageResizeError(e) from e import shutil try: return str(shutil.copy2(image_file, target_filepath)) except OSError as e: msg = e.strerror or str(e) raise errors.ImageResizeError( f'Failed to copy {image_file} to {target_filepath}: {msg}' ) from e else: # Nothing to copy return str(image_file) ffmpeg_params = ':'.join( f'{k[0]}={v or -1}' for k, v in dimensions_map.items() ) cmd = _make_resize_cmd(image_file, ffmpeg_params, target_filepath) output = utils.subproc.run(cmd, ignore_errors=True, join_stderr=True) if not os.path.exists(target_filepath): error = output or 'Unknown reason' raise errors.ImageResizeError(f'Failed to resize: {error}') else: return str(target_filepath)
# NOTE: Most of the optimization is achieved at level 1 with ~40 % smaller # files. Anything higher seems to only reduce by tens of kB or less. _optimization_levels = { 'low': '1', 'medium': '2', 'high': '4', 'placebo': 'max', } optimization_levels = (*tuple(_optimization_levels), 'none', 'default') """Valid `level` arguments for :func:`optimize`"""
[docs] def optimize(image_file, output_file=None, level=None): """ Optimize PNG image size :path image_file: Path to PNG File :path output_file: Path to optimized `image_file` or any falsy value to overwrite `image_file` :path str,int level: Optimiziation level (``"low"``, ``"medium"``, ``"high"``) or ``"default"`` to use recommended level or ``"none"`` / `None` to not do any optimization If the optimization fails and `image_file` does not end with ".png", it is assumed that it is not a PNG and the original file is returned unoptimized. :return: path to optimized PNG file :raise ImageOptimizeError: if the optimization fails """ if level not in ('none', None): if level == 'default': level = 'medium' try: opt = _optimization_levels[str(level)] except KeyError as e: raise errors.ImageOptimizeError(f'Invalid optimization level: {level}') from e cmd = [ 'oxipng', '--preserve', '--opt', opt, '--interlace', '0', # Remove any interlacing '--strip', 'safe', # Remove irrelevant metadata str(image_file), ] if output_file: sanitized_output_file = utils.fs.sanitize_path(output_file) cmd.extend(('--out', sanitized_output_file)) return_value = sanitized_output_file else: return_value = image_file # oxipng prints errors AND info messages to stderr, so we can't use output on stderr as an # indicator of failure. Instead, we check if the output file exists error_message, exitcode = utils.subproc.run(cmd, join_stderr=True, return_exitcode=True) error_message = error_message.strip() if exitcode == 0: return return_value elif utils.fs.file_extension(image_file).lower() == 'png': if error_message: raise errors.ImageOptimizeError(f'Failed to optimize: {error_message}') else: raise errors.ImageOptimizeError('Failed to optimize for unknown reason') else: # This is probably not a PNG and we can just fail silently and return the # unoptimized original. return image_file
[docs] def get_mime_type(image_file): """Return MIME type of `image_file` or `None` if it cannot be determined""" tracks = utils.mediainfo.get_tracks(image_file, default={}) try: return tracks['General'][0]['InternetMediaType'] except (KeyError, IndexError): return None
[docs] def convert(image_file, *, mime_type, output_file): """ Convert image :param str image_file: Path to image :param str mime_type: Target MIME type to convert `image_file` to (e.g. "image/png") :param str output_file: Where to write the converted image to :raise ImageConvertError: if the conversion failed :return: Sanitized `output_file` (via :meth:`~.utils.fs.sanitize_path`) """ sanitized_output_file = utils.fs.sanitize_path(output_file) cmd = ('ffmpeg', '-y', '-i', image_file, sanitized_output_file) error_message, exit_code = utils.subproc.run(cmd, join_stderr=True, return_exitcode=True) if exit_code != 0: raise errors.ImageConvertError(f'Failed to convert to {mime_type}: {image_file}: {error_message}') elif not os.path.exists(sanitized_output_file): raise errors.ImageConvertError(f'Failed to convert to {mime_type}: {image_file}') else: return sanitized_output_file