"""
API for tvmaze.com
"""
import collections
import functools
import json
import re
from ... import errors, utils
from .. import html, http
from ..types import ReleaseType
from . import common
from .base import WebDbApiBase
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class TvmazeApi(WebDbApiBase):
"""API for tvmaze.com"""
name = 'tvmaze'
label = 'TVmaze'
default_config = {}
_url_base = 'http://api.tvmaze.com'
[docs]
def sanitize_query(self, query):
"""Set :attr:`~.common.Query.type` to :attr:`~.types.ReleaseType.unknown`"""
query = super().sanitize_query(query)
query.type = ReleaseType.unknown
return query
[docs]
def get_id_from_text(self, text):
match = re.search(r'^(\d+)$', text)
if match:
return match.group(1)
# Example URL: https://www.tvmaze.com/shows/36906/gary-and-his-demons
match = re.search(r'\b(?i:shows)/(\d+)\b', text)
if match:
return match.group(1)
[docs]
async def search(self, query):
_log.debug('Searching TVmaze for %s', query)
if query.id:
show = await self._get_show(query.id)
return [_TvmazeSearchResult(show=show, tvmaze_api=self)]
elif not query.title or query.type is ReleaseType.movie:
return []
else:
url = f'{self._url_base}/search/shows'
params = {'q': query.title_normalized}
results_str = await http.get(url, params=params, cache=True)
try:
items = json.loads(results_str)
assert isinstance(items, list)
except (ValueError, TypeError, AssertionError) as e:
raise errors.RequestError(f'Unexpected search response: {results_str}') from e
else:
results = tuple(
_TvmazeSearchResult(show=item['show'], tvmaze_api=self)
for item in items
)
# The API doesn't allow us to search for a specific year
if query.year:
return tuple(
result
for result in results
if str(result.year) == query.year
)
else:
return results
async def _get_json(self, url, params={}):
response = await http.get(url, params=params, cache=True)
try:
info = json.loads(response)
assert isinstance(info, (collections.abc.Mapping, collections.abc.Sequence))
except (ValueError, TypeError, AssertionError) as e:
raise errors.RequestError(f'Unexpected search response: {response}') from e
else:
return info
async def _get_show(self, id):
return await self._get_json(
url=f'{self._url_base}/shows/{id}',
params={'embed[]': ('cast', 'crew', 'akas')},
)
[docs]
async def cast(self, id):
if id:
show = await self._get_show(id)
cast = show.get('_embedded', {}).get('cast', ())
return tuple(
common.Person(
item['person']['name'],
url=item['person'].get('url', ''),
role=item['character']['name'],
)
for item in utils.deduplicate(cast, key=lambda item: item['person']['name'])
)
return ()
async def _countries(self, id):
if id:
show = await self._get_show(id)
return _get_countries(show)
return ()
[docs]
async def languages(self, id):
if id:
show = await self._get_show(id)
if language := show.get('language'):
return (language,)
return ()
[docs]
async def creators(self, id):
if id:
show = await self._get_show(id)
crew = show.get('_embedded', {}).get('crew', ())
return tuple(
common.Person(
item['person']['name'],
url=item['person'].get('url', ''),
)
for item in crew
if item.get('type') == 'Creator' and item.get('person', {}).get('name')
)
return ()
[docs]
async def directors(self, id):
return ()
[docs]
async def genres(self, id):
if id:
show = await self._get_show(id)
return _get_genres(show)
return ()
[docs]
async def poster_url(self, id, season=None):
if id:
info = {}
if season:
url = f'{self._url_base}/shows/{id}/seasons'
seasons = await self._get_json(url)
for s in seasons:
if str(s.get('number')) == str(season) and s.get('image'):
info = s
break
if not info:
info = await self._get_show(id)
return (info.get('image') or {}).get('original', None)
return ''
rating_min = 0.0
rating_max = 10.0
[docs]
async def rating(self, id):
if id:
show = await self._get_show(id)
return show.get('rating', {}).get('average')
return None
async def _runtimes(self, id):
runtimes = {}
if id:
show = await self._get_show(id)
runtime = show.get('runtime', 0)
if runtime:
runtimes['default'] = round(int(runtime))
return runtimes
[docs]
async def summary(self, id):
if id:
show = await self._get_show(id)
return _get_summary(show)
return ''
async def _title_original(self, id):
akas = await self._get_akas(id)
return akas['ORIGIN']
async def _titles_english(self, id):
akas = await self._get_akas(id)
return akas.values()
async def _get_akas(self, id):
show = await self._get_show(id)
akas = {}
for aka in show.get('_embedded', {}).get('akas', ()):
# `aka['country']` may also be `None`, which indicates the
# original country
country = aka.get('country')
if country is None:
akas['ORIGIN'] = aka['name']
else:
code = country.get('code', '')
if code in self._english_country_codes:
akas['code'] = aka['name']
if 'ORIGIN' not in akas:
akas['ORIGIN'] = show['name']
return akas
_english_country_codes = (
'US',
'UK',
'AU',
'NZ',
)
[docs]
async def type(self, id):
# TVmaze does not support movies and we can't distinguish between season
# and episode by ID.
return ReleaseType.unknown
[docs]
async def url(self, id):
if id:
show = await self._get_show(id)
return show.get('url', '')
return ''
[docs]
async def year(self, id):
if id:
show = await self._get_show(id)
return _get_year(show)
return ''
[docs]
async def imdb_id(self, id):
"""Return IMDb ID for TVmaze ID `id` or `None`"""
if id:
show = await self._get_show(id)
imdb_id = show.get('externals', {}).get('imdb')
if imdb_id:
return imdb_id
return ''
[docs]
async def episode(self, id, season, episode):
"""
Get episode information
:param id: Show ID
:param season: Season number
:param episode: Episode number
:return: :class:`dict` with these keys:
- ``date`` (:class:`str` as "YYYY-MM-DD")
- ``episode`` (:class:`str`)
- ``season`` (:class:`str`)
- ``summary`` (:class:`str`)
- ``title`` (:class:`str`)
- ``url`` (:class:`str`)
"""
if id:
episode = await self._get_json(
url=f'{self._url_base}/shows/{id}/episodebynumber',
params={'season': season, 'number': episode},
)
else:
episode = {}
return {
'date': episode.get('airdate', ''),
'episode': str(episode.get('number', '')) or '',
'season': str(episode.get('season', '')) or '',
'summary': _get_summary(episode),
'title': episode.get('name', ''),
'url': episode.get('url', ''),
}
[docs]
async def status(self, id):
"""Return something like "Running", "Ended" or empty string"""
if id:
show = await self._get_show(id)
return show.get('status')
return ''
class _TvmazeSearchResult(common.SearchResult):
def __init__(self, *, show, tvmaze_api):
super().__init__(
cast=functools.partial(tvmaze_api.cast, show['id']),
countries=_get_countries(show),
directors=(),
id=show['id'],
genres=_get_genres(show),
poster=functools.partial(tvmaze_api.poster, show['id']),
summary=_get_summary(show),
title=show['name'],
title_english=functools.partial(tvmaze_api.title_english, show['id']),
title_original=functools.partial(tvmaze_api.title_original, show['id']),
type=ReleaseType.series,
url=show['url'],
year=_get_year(show),
)
def _get_summary(show):
summary = show.get('summary', None)
if summary:
soup = html.parse(summary)
return '\n'.join(paragraph.text for paragraph in soup.find_all('p'))
else:
return ''
def _get_year(show):
premiered = show.get('premiered', None)
if premiered:
year = str(premiered).split('-')[0]
if year.isdigit() and len(year) == 4:
return year
else:
return ''
def _get_genres(show):
genres = show.get('genres', None)
if genres:
return tuple(str(g).lower() for g in genres)
else:
return ()
def _get_countries(show):
info = show.get('network', None) or show.get('webChannel', None)
if info:
country = info.get('country', None)
if country:
name = country.get('name', None)
if name:
return (name,)
return ''