Source code for upsies.uis.tui.jobwidgets.webdb

import functools
import textwrap

from prompt_toolkit.filters import Condition
from prompt_toolkit.layout.containers import ConditionalContainer, DynamicContainer, HSplit, VSplit, Window
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.utils import get_cwidth

from ....utils import browser, types, webdbs
from .. import widgets
from . import JobWidgetBase

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class WebDbSearchJobWidget(JobWidgetBase): is_interactive = True total_width = 80 left_column_width = 36 right_column_width = total_width - left_column_width - 1 total_height = 20
[docs] def setup(self): self._query_exception = None self._at_least_one_search_was_done = False self._widgets = { # First row 'id': widgets.TextField(width=15, style='class:dialog.search.info'), 'query': widgets.InputField( style='class:dialog.search.query', text=str(self.job.query), on_accepted=self.handle_query_accepted, on_changed=self.handle_query_changed, ), 'keybindings': widgets.TextField(style='class:info'), # Right column 'search_results': _SearchResults(width=self.right_column_width), 'title_original': widgets.TextField( style='class:dialog.search.info', width=self.right_column_width, height=1, ), 'title_english': widgets.TextField( style='class:dialog.search.info', width=self.right_column_width, height=1, ), # Left column 'summary': widgets.TextField( style='class:dialog.search.info', width=self.left_column_width, height=8, ), 'genres': widgets.TextField( style='class:dialog.search.info', width=self.left_column_width, height=2, ), 'directors': widgets.TextField( style='class:dialog.search.info', width=self.left_column_width, height=1, ), 'cast': widgets.TextField( style='class:dialog.search.info', width=self.left_column_width, height=2, ), 'countries': widgets.TextField( style='class:dialog.search.info', width=self.left_column_width, height=1, ), # Poster (rightmost column) 'poster': widgets.Image(height=self.total_height), } self.set_query_text(self.job.query) self.job.signal.register('search_results', self.handle_search_results) self.job.signal.register('searching_status', self.handle_searching_status) self.job.signal.register('info_updating', self.handle_info_updating) self.job.signal.register('info_updated', self.handle_info_updated) self.job.signal.register('query_updated', self.handle_query_updated)
[docs] def invalidate(self, *, warnings=False): super().invalidate() if warnings: try: # Invalidate cached warnings del self.warnings except AttributeError: # No warnings have been generated yet pass
def handle_query_changed(self, buffer): new_query_text = self._widgets['query'].text if new_query_text != self._old_query_text: self._widgets['query'].style = 'class:dialog.search.query.changed' else: self._widgets['query'].style = 'class:dialog.search.query' def handle_query_accepted(self, buffer): self._widgets['query'].style = 'class:dialog.search.query' query_text = self._widgets['query'].text if query_text != self._old_query_text: try: self.job.search(webdbs.Query.from_string(query_text)) except ValueError as e: self._query_exception = e else: self._query_exception = None self.set_query_text(self.job.query) else: # The same query was accepted twice without changing it. # Select focused search result. focused = self._widgets['search_results'].focused_result if focused is not None: self.job.result_selected(focused) else: self.job.result_selected(None) # Query exception is no longer valid self._query_exception = None # Invalidate warnings to include new self._query_exception self.invalidate(warnings=True) def handle_searching_status(self, is_searching): self._widgets['search_results'].is_searching = is_searching def handle_search_results(self, results): self._widgets['search_results'].results = results self._at_least_one_search_was_done = True # Invalidate warnings to display/remove no-result-search-hints self.invalidate(warnings=True) def handle_info_updating(self, attr): self._widgets[attr].is_loading = True self.invalidate() def handle_info_updated(self, attr, value): self._widgets[attr].is_loading = False if isinstance(value, (bytes, bytearray)): self._widgets[attr].text = value elif value: self._widgets[attr].text = str(value) else: self._widgets[attr].text = '' self.invalidate() def handle_query_updated(self, query): self.set_query_text(query) for attr in ( 'id', 'title_original', 'title_english', 'summary', 'genres', 'directors', 'cast', 'countries', 'poster', ): self._widgets[attr].text = '' self.invalidate() def set_query_text(self, text): self._widgets['query'].set_text(str(text), preserve_cursor_position=True) self._widgets['query'].style = 'class:dialog.search.query' self._old_query_text = self._widgets['query'].text @functools.cached_property def warnings(self): """Note for the user to find results""" warnings = [] # User entered invalid query if self._query_exception: warnings.append(str(self._query_exception)) # No search results - provide some hints if ( # Don't show hints if we are still starting up and the first # search is not yet happening. self._at_least_one_search_was_done # Don't show hints if we are currently searching. and not self.job.is_searching # Don't show hints if there are search results. and not self._widgets['search_results'].results ): # Generic hints warnings.append( textwrap.fill( 'Try removing or adjusting "year:…" or "type:…". ' 'You can provide an exact ID by searching for "id:…".', width=self.total_width, ), ) # Search hints for this specific webdb if self.job.db.no_results_info: warnings.append( textwrap.fill( self.job.db.no_results_info, width=self.total_width, ) ) # Maybe provide no ID as a valid job result if self.job.no_id_ok: warnings.append( textwrap.fill( ( f'Press Enter to provide no {self.job.db.label} ID if you are sure ' f'"{self.job.query.title}" does not exist on {self.job.db.label}.' ), width=self.total_width, ) ) return '\n\n'.join(warnings) @functools.cached_property def runtime_widget(self): w = self._widgets # Everything except for the poster on the very right. textfields = HSplit( children=[ VSplit([ w['query'], widgets.hspacer, widgets.HLabel( text='ID', content=w['id'], style='class:dialog.search.label', ), ]), w['keybindings'], ConditionalContainer( filter=Condition(lambda: self.warnings), content=HSplit([ widgets.vspacer, Window( FormattedTextControl( lambda: self.warnings, style='class:warning', ), ) ]), ), VSplit([ HSplit([ widgets.VLabel('Results', w['search_results'], style='class:dialog.search.label'), widgets.VLabel('Original Title', w['title_original'], style='class:dialog.search.label'), widgets.VLabel('Also Known As', w['title_english'], style='class:dialog.search.label'), ]), widgets.hspacer, HSplit([ widgets.VLabel('Summary', w['summary'], style='class:dialog.search.label'), widgets.VLabel('Genres', w['genres'], style='class:dialog.search.label'), widgets.VLabel('Director', w['directors'], style='class:dialog.search.label'), widgets.VLabel('Cast', w['cast'], style='class:dialog.search.label'), widgets.VLabel('Country', w['countries'], style='class:dialog.search.label'), ]), ]), ], style='class:dialog.search', ) # `textfields` and the poster. dialog = VSplit( children=[ textfields, widgets.hspacer, w['poster'], ], ) # Add a spacer below the whole form to get some breathing room. layout = HSplit( children=[ dialog, widgets.hspacer, ], ) @self.keybindings_local.add('down') @self.keybindings_local.add('c-n') @self.keybindings_local.add('tab') def _(_event): prev_focused = self._widgets['search_results'].focused_result self._widgets['search_results'].focus_next() now_focused = self._widgets['search_results'].focused_result if prev_focused != now_focused: self.job.result_focused(now_focused) @self.keybindings_local.add('up') @self.keybindings_local.add('c-p') @self.keybindings_local.add('s-tab') def _(_event): prev_focused = self._widgets['search_results'].focused_result self._widgets['search_results'].focus_previous() now_focused = self._widgets['search_results'].focused_result if prev_focused != now_focused: self.job.result_focused(now_focused) # Alt+s @self.keybindings_local.add('escape', 's') def _(_event): self.job.query.type = types.ReleaseType.series self._widgets['keybindings'].text = 'Alt+s: Search for series' # Alt+m @self.keybindings_local.add('escape', 'm') def _(_event): self.job.query.type = types.ReleaseType.movie self._widgets['keybindings'].text += ' | Alt+m: Search for movies' # Alt+Enter @self.keybindings_local.add('escape', 'enter') def _(_event): url = self._widgets['search_results'].focused_result.url browser.open(url) return layout
class _SearchResults(DynamicContainer): def __init__(self, results=(), width=40): self.results = results self._year_width = 4 self._type_width = 6 self._title_width = width - self._year_width - self._type_width - 2 self._activity_indicator = widgets.ActivityIndicator() super().__init__( lambda: Window( content=FormattedTextControl(self._get_text_fragments, focusable=False), width=width, height=14, style='class:dialog.search.results', ) ) @property def is_searching(self): return self._activity_indicator.active @is_searching.setter def is_searching(self, value): self._activity_indicator.active = bool(value) @property def results(self): return self._results @results.setter def results(self, results): self._results = tuple(results) self._focused_index = 0 @property def focused_result(self): if self._results: return self._results[self._focused_index] else: return None def focus_next(self): if self._focused_index < len(self._results) - 1: self._focused_index += 1 def focus_previous(self): if self._focused_index > 0: self._focused_index -= 1 def focus_first(self): self._focused_index = 0 def focus_last(self): self._focused_index = len(self._results) - 1 def _get_text_fragments(self): if self.is_searching: return [('class:dialog.search.results', self._activity_indicator.text)] elif not self._results: return 'No results' frags = [] for i, result in enumerate(self._results): if i == self._focused_index: title_style = 'class:dialog.search.results.focused' frags.append(('[SetCursorPosition]', '')) self._focused_result = result else: title_style = 'class:dialog.search.results' if get_cwidth(result.title) > self._title_width: title = result.title[:self._title_width - 1] + '…' else: title = result.title frags.append((title_style, title.ljust(self._title_width))) frags.append(('', ( ' ' f'{str(result.year or "").rjust(4)}' ' ' f'{str(result.type).rjust(6)}' ))) frags.append(('', '\n')) frags.pop() # Remove last newline return frags