Source code for upsies.utils.webdbs.imdb

"""
API for imdb.com
"""

import functools
import json
import re

from ... import utils
from ..types import ReleaseType
from . import common
from .base import WebDbApiBase

import logging  # isort:skip
_log = logging.getLogger(__name__)


GRAPHQL_QUERY_SEARCH = """
{
    advancedTitleSearch(
        first: 30,
        sort: { sortBy: POPULARITY, sortOrder: ASC }
        constraints: {%%%CONSTRAINTS%%%}
    ) {
        total
        edges {
            node {
                title {
                    id
                    titleText {
                        text
                    }
                    originalTitleText {
                        text
                    }
                    titleType {
                        id
                    }
                    releaseYear {
                        year
                    }
                    plot {
                        plotText {
                            plainText
                        }
                    }
                    countriesOfOrigin {
                        countries {
                            text
                        }
                    }
                }
            }
        }
    }
}
"""

GRAPHQL_QUERY_ID = """
query GetTitleInfo {
    title(id: "%%%ID%%%") {
        id
        titleText {
            text
            isOriginalTitle
            country {
                text
            }
        }
        originalTitleText {
            text
        }
        releaseYear {
            year
            endYear
        }
        titleType {
            id
        }
        plot {
            plotText {
                plainText
            }
        }
        ratingsSummary {
            aggregateRating
            voteCount
        }
        primaryImage {
            url
        }
        runtime {
            displayableProperty {
                value {
                    plainText
                }
            }
            seconds
        }
        titleGenres {
            genres {
                genre {
                    text
                }
            }
        }
        principalCredits {
            category {
                text
                id
            }
            credits {
                name {
                    id
                    nameText {
                        text
                    }
                }
            }
        }
        runtimes(first: 10) {
            edges {
                node {
                    id
                    seconds
                    displayableProperty {
                        value {
                            plainText
                        }
                    }
                    attributes {
                        text
                    }
                }
            }
        }
        countriesOfOrigin {
            countries {
                text
            }
        }
        spokenLanguages {
            spokenLanguages {
                text
            }
        }
    }
}
"""

TYPE_MAP = {
    ReleaseType.movie: ('movie', 'short', 'tvMovie', 'video', 'tvShort'),
    ReleaseType.season: ('tvSeries', 'tvMiniSeries'),
    # Searching for single episodes is currently not supported
    ReleaseType.episode: ('tvSeries', 'tvMiniSeries'),
}

TYPE_MAP_REVERSE = {
    'movie': ReleaseType.movie,
    'short': ReleaseType.movie,
    'tvMovie': ReleaseType.movie,
    'video': ReleaseType.movie,
    'tvShort': ReleaseType.movie,
    'tvSeries': ReleaseType.season,
    'tvMiniSeries': ReleaseType.season,
    'tvEpisode': ReleaseType.episode,
}

WEBSITE_BASE = 'https://www.imdb.com'


[docs] class ImdbApi(WebDbApiBase): """API for imdb.com""" name = 'imdb' label = 'IMDb' default_config = {}
[docs] def get_id_from_text(self, text): # Example: https://www.imdb.com/title/tt0048918/ match = re.search(r'\b(tt\d+)\b', text) if match: return match.group(1)
[docs] def sanitize_query(self, query): """ Deal with IMDb-specific quirks - Remove ``"and"`` from :attr:`.Query.title` because IMDb doesn't find ``"Foo & Bar"`` if we search for ``"Foo and Bar"``. It seems to work vice versa, i.e. the query ``"Foo and Bar"`` finds ``"Foo & Bar"``, so we keep any ``"&"``. - Replace ``"dont"`` with ``"don't"``, ``"cant"`` with ``"can't"``, etc. """ query = super().sanitize_query(query) query.title = re.sub(r'\s(?i:and)(\s)', r'\1', query.title) query.title = re.sub(r'\b(?i:dont)(\b)', "don't", query.title) query.title = re.sub(r'\b(?i:cant)(\b)', "can't", query.title) query.title = re.sub(r'\b(?i:wont)(\b)', "won't", query.title) return query
_url_base = 'https://caching.graphql.imdb.com' def _get_graphql_query(self, template_name, values): template = globals()[f'GRAPHQL_QUERY_{template_name.upper()}'] query = ' '.join(template.replace('\n', ' ').split()) for k, v in values.items(): query = query.replace(f'%%%{k}%%%', v) if '%%%' in query: raise RuntimeError(f'Unresolved template string in query: {query}') else: return json.dumps({'query': query}) async def _get_info(self, id): query = self._get_graphql_query('ID', {'ID': id}) response = (await utils.http.post( url=self._url_base, data=query, headers={'Content-Type': 'application/json'}, timeout=10, cache=True, user_agent='BROWSER', )).json() return response['data']['title'] or {}
[docs] async def search(self, query): _log.debug('Searching IMDb for %r', query) if query.id: title_english = await self.title_english(query.id) title_original = await self.title_original(query.id) return [_ImdbSearchResult( imdb_api=self, cast=functools.partial(self.cast, query.id), countries=functools.partial(self.countries, query.id), directors=functools.partial(self.directors, query.id), genres=functools.partial(self.genres, query.id), id=query.id, summary=functools.partial(self.summary, query.id), title=title_english or title_original, title_english=title_english, title_original=title_original, type=await self.type(query.id), url=await self.url(query.id), year=await self.year(query.id), )] elif not query.title: return [] else: constraints = [f'titleTextConstraint: {{searchTerm: "{query.title}"}}'] if query.type is not ReleaseType.unknown: types = TYPE_MAP[query.type] types_str = '[' + ', '.join(f'"{t}"' for t in types) + ']' constraints.append(f'titleTypeConstraint: {{anyTitleTypeIds: {types_str}}}') if query.year is not None: constraints.append( f'releaseDateConstraint: {{releaseDateRange: {{start: "{query.year}-01-01", end: "{query.year}-12-31"}}}}' ) constraints.append('explicitContentConstraint: {explicitContentFilter: INCLUDE_ADULT}') query = self._get_graphql_query('SEARCH', {'CONSTRAINTS': ', '.join(constraints)}) response = (await utils.http.post( url=self._url_base, data=query, headers={'Content-Type': 'application/json'}, timeout=10, cache=True, user_agent='BROWSER', )).json() return [ _ImdbSearchResult(info=result['node']['title'], imdb_api=self) for result in response['data']['advancedTitleSearch']['edges'] ]
[docs] async def cast(self, id): if id: return await self._get_persons(id, category='cast') return ()
[docs] async def creators(self, id): if id: return await self._get_persons(id, category='creator') return ()
[docs] async def directors(self, id): if id: return await self._get_persons(id, category='director') return ()
async def _get_persons(self, id, *, category): info = await self._get_info(id) principal_credits = info.get('principalCredits', ()) credits = () for credits_ in principal_credits: category_ = credits_.get('category') or {} if category_.get('id') == category: credits = credits_.get('credits', ()) break def get_person(credit): name = ((credit.get('name') or {}).get('nameText') or {}).get('text', '') id = (credit.get('name') or {}).get('id', '') if name and id: return common.Person(name, url=f'{WEBSITE_BASE}/name/{id}') return tuple( person for credit in credits if (person := get_person(credit)) ) async def _countries(self, id): countries = [] if id: info = await self._get_info(id) items = (info.get('countriesOfOrigin') or {}).get('countries', ()) countries.extend( country for item in items if (country := item.get('text')) ) return tuple(countries)
[docs] async def languages(self, id): languages = [] if id: info = await self._get_info(id) items = (info.get('spokenLanguages') or {}).get('spokenLanguages', ()) languages.extend( language for item in items if (language := item.get('text')) ) return tuple(languages)
[docs] async def genres(self, id): genres = [] if id: info = await self._get_info(id) items = (info.get('titleGenres') or {}).get('genres', ()) genres.extend( genre.casefold() for item in items if (genre := ((item.get('genre') or {}).get('text') or None)) ) return tuple(genres)
[docs] async def poster_url(self, id, season=None): if id: info = await self._get_info(id) poster_url = (info.get('primaryImage') or {}).get('url', '') # Request scaled down poster (300 pixels wide) poster_url = re.sub(r'._V1_*.jpg$', '._V1_SX300.jpg', poster_url) return poster_url return ''
rating_min = 0.0 rating_max = 10.0
[docs] async def rating(self, id): if id: info = await self._get_info(id) return (info.get('ratingsSummary') or {}).get('aggregateRating', None) return None
_ignored_runtimes_keys = ( re.compile(r'^(?i:approx)\w*$'), ) async def _runtimes(self, id): if id: info = await self._get_info(id) runtimes = tuple( node for edge in (info.get('runtimes') or {}).get('edges', ()) if (node := edge.get('node')) ) def get_cut_name(runtime): attributes = runtime.get('attributes', ()) name = 'default' if attributes: name = attributes[0].get('text') or 'default' if name != 'default': # Capitalize words. We can't use "\b" because that results in "Director'S Cut". name = re.sub(r'(?:^|\s)[a-z]', lambda match: match.group(0).upper(), name) return name def get_runtime_minutes(runtime): return round(runtime.get('seconds', 0) / 60) return { get_cut_name(runtime): get_runtime_minutes(runtime) for runtime in runtimes } return {}
[docs] async def summary(self, id): if id: info = await self._get_info(id) return ((info.get('plot') or {}).get('plotText') or {}).get('plainText', '') return ''
async def _title_original(self, id): if id: info = await self._get_info(id) return (info.get('originalTitleText') or {}).get('text', '') return '' async def _titles_english(self, id): if id: info = await self._get_info(id) return ((info.get('titleText') or {}).get('text', ''),) return ()
[docs] async def type(self, id): if id: info = await self._get_info(id) name = (info.get('titleType') or {}).get('id', '') return TYPE_MAP_REVERSE.get(name, ReleaseType.unknown) return ReleaseType.unknown
[docs] async def url(self, id): if id: return f'{WEBSITE_BASE}/title/{id}' return ''
[docs] async def year(self, id): if id: info = await self._get_info(id) return str((info.get('releaseYear') or {}).get('year', '')) return ''
class _ImdbSearchResult(common.SearchResult): def __init__(self, *, imdb_api, info=None, cast=None, countries=None, directors=None, genres=None, id=None, poster=None, summary=None, title=None, title_english=None, title_original=None, type=None, url=None, year=None): info = info or {} self._imdb_api = imdb_api id = id or self._get_id(info) super().__init__( cast=cast or functools.partial(imdb_api.cast, id), countries=countries or functools.partial(imdb_api.countries, id), directors=directors or functools.partial(imdb_api.directors, id), genres=genres or functools.partial(imdb_api.genres, id), id=id, poster=functools.partial(imdb_api.poster, id), summary=summary or self._get_summary(info), title=title or self._get_title(info), title_english=title_english or functools.partial(imdb_api.title_english, id), title_original=title_original or functools.partial(imdb_api.title_original, id), type=type or self._get_type(info), url=url or self._get_url(info), year=year or self._get_year(info), ) def _get_id(self, info): return info.get('id', '') def _get_summary(self, info): return ((info.get('plot') or {}).get('plotText') or {}).get('plainText', '') def _get_title(self, info): return (info.get('titleText') or {}).get('text', '') def _get_type(self, info): name = (info.get('titleType') or {}).get('id', '') return TYPE_MAP_REVERSE.get(name, ReleaseType.unknown) def _get_url(self, info): id = self._get_id(info) if id: return f'{WEBSITE_BASE}/title/{id}' return '' def _get_year(self, info): return (info.get('releaseYear') or {}).get('year', '')