diff --git a/youtube_transcript_api/__init__.py b/youtube_transcript_api/__init__.py index 3f22674..e2ed0aa 100644 --- a/youtube_transcript_api/__init__.py +++ b/youtube_transcript_api/__init__.py @@ -1 +1,3 @@ from ._api import YouTubeTranscriptApi +from ._transcripts import TranscriptDataFetcher, TranscriptData, Transcript +from ._errors import TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript, VideoUnavailable diff --git a/youtube_transcript_api/_api.py b/youtube_transcript_api/_api.py index df8d2e2..98ce16c 100644 --- a/youtube_transcript_api/_api.py +++ b/youtube_transcript_api/_api.py @@ -1,44 +1,9 @@ -import sys - -# This can only be tested by using different python versions, therefore it is not covered by coverage.py -if sys.version_info.major == 2: # pragma: no cover - reload(sys) - sys.setdefaultencoding('utf-8') - -from xml.etree import ElementTree - -import re - import requests -from ._html_unescaping import unescape +from ._transcripts import TranscriptDataFetcher class YouTubeTranscriptApi(): - class CouldNotRetrieveTranscript(Exception): - """ - Raised if a transcript could not be retrieved. - """ - - ERROR_MESSAGE = ( - 'Could not get the transcript for the video {video_url}! ' - 'This usually happens if one of the following things is the case:\n' - ' - subtitles have been disabled by the uploader\n' - ' - none of the language codes you provided are valid\n' - ' - none of the languages you provided are supported by the video\n' - ' - the video is no longer available.\n\n' - 'If none of these things is the case, please create an issue at ' - 'https://github.com/jdepoix/youtube-transcript-api/issues.' - 'Please add which version of youtube_transcript_api you are using and make sure that there ' - 'are no open issues which already describe your problem!' - ) - - def __init__(self, video_id): - super(YouTubeTranscriptApi.CouldNotRetrieveTranscript, self).__init__( - self.ERROR_MESSAGE.format(video_url=_TranscriptFetcher.WATCH_URL.format(video_id=video_id)) - ) - self.video_id = video_id - @classmethod def get_transcripts(cls, video_ids, languages=('en',), continue_after_error=False, proxies=None): """ @@ -47,7 +12,7 @@ class YouTubeTranscriptApi(): :param video_ids: a list of youtube video ids :type video_ids: [str] :param languages: A list of language codes in a descending priority. For example, if this is set to ['de', 'en'] - it will first try to fetch the german transcript (de) and then fetch the english transcipt (en) if it fails to + it will first try to fetch the german transcript (de) and then fetch the english transcript (en) if it fails to do so. As I can't provide a complete list of all working language codes with full certainty, you may have to play around with the language codes a bit, to find the one which is working for you! :type languages: [str] @@ -91,78 +56,6 @@ class YouTubeTranscriptApi(): :return: a list of dictionaries containing the 'text', 'start' and 'duration' keys :rtype: [{'text': str, 'start': float, 'end': float}] """ - try: - return _TranscriptParser(_TranscriptFetcher(video_id, languages, proxies).fetch()).parse() - except Exception: - raise YouTubeTranscriptApi.CouldNotRetrieveTranscript(video_id) - - -class _TranscriptFetcher(): - WATCH_URL = 'https://www.youtube.com/watch?v={video_id}' - API_BASE_URL = 'https://www.youtube.com/api/' - TIMEDTEXT_STRING = 'timedtext?v=' - NAME_REGEX = re.compile(r'&name=.*?(&)|&name=.*') - - def __init__(self, video_id, languages, proxies): - self.video_id = video_id - self.languages = languages - self.proxies = proxies - - def fetch(self): - if self.proxies: - fetched_site = requests.get(self.WATCH_URL.format(video_id=self.video_id), proxies=self.proxies).text - else: - fetched_site = requests.get(self.WATCH_URL.format(video_id=self.video_id)).text - timedtext_splits = [split[:split.find('"')] - .replace('\\u0026', '&') - .replace('\\', '') - for split in fetched_site.split(self.TIMEDTEXT_STRING)] - matched_splits = [] - for language in self.languages: - matched_splits = [split for split in timedtext_splits if '&lang={}'.format(language) in split] - if matched_splits: - break - if matched_splits: - timedtext_url = min(matched_splits, key=self._sort_splits) - response = self._execute_api_request(timedtext_url) - if response: - return response - - return None - - def _sort_splits(self, matched_split): - """Returns a value related to a given caption track url. - - This function is used to sort the matched splits by string - length because we want non-asr and non-dialect options returned first. - With this in mind, it is remove the 'name' arugument from the url as - it could possibly make the values inaccurate to what we desire. - - matched_split: The caption track url we want to return a value for. - """ - return len(re.sub(self.NAME_REGEX, r'\1', matched_split)) - - def _execute_api_request(self, timedtext_url): - url = '{}{}{}'.format(self.API_BASE_URL, self.TIMEDTEXT_STRING, timedtext_url) - if self.proxies: - return requests.get(url, proxies=self.proxies).text - else: - return requests.get(url).text - - -class _TranscriptParser(): - HTML_TAG_REGEX = re.compile(r'<[^>]*>', re.IGNORECASE) - - def __init__(self, plain_data): - self.plain_data = plain_data - - def parse(self): - return [ - { - 'text': re.sub(self.HTML_TAG_REGEX, '', unescape(xml_element.text)), - 'start': float(xml_element.attrib['start']), - 'duration': float(xml_element.attrib['dur']), - } - for xml_element in ElementTree.fromstring(self.plain_data) - if xml_element.text is not None - ] + with requests.Session() as http_client: + http_client.proxies = proxies if proxies else {} + return TranscriptDataFetcher(http_client).fetch(video_id).find_transcript(languages).fetch() diff --git a/youtube_transcript_api/_errors.py b/youtube_transcript_api/_errors.py new file mode 100644 index 0000000..5dc4d8e --- /dev/null +++ b/youtube_transcript_api/_errors.py @@ -0,0 +1,62 @@ +from ._settings import WATCH_URL + + +class CouldNotRetrieveTranscript(Exception): + """ + Raised if a transcript could not be retrieved. + """ + ERROR_MESSAGE = '\nCould not retrieve a transcript for the video {video_url}!' + CAUSE_MESSAGE_INTRO = ' This is most likely caused by:\n\n{cause}' + CAUSE_MESSAGE = '' + GITHUB_REFERRAL = ( + '\n\nIf you are sure that the described cause is not responsible for this error ' + 'and that a transcript should be retrievable, please create an issue at ' + 'https://github.com/jdepoix/youtube-transcript-api/issues.' + 'Please add which version of youtube_transcript_api you are using ' + 'and provide the information needed to replicate the error. ' + 'Also make sure that there are no open issues which already describe your problem!' + ) + + def __init__(self, video_id): + self.video_id = video_id + super(CouldNotRetrieveTranscript, self).__init__(self._build_error_message()) + + def _build_error_message(self): + cause = self.cause + error_message = self.ERROR_MESSAGE.format(video_url=WATCH_URL.format(video_id=self.video_id)) + + if cause: + error_message += self.CAUSE_MESSAGE_INTRO.format(cause=cause) + self.GITHUB_REFERRAL + + return error_message + + @property + def cause(self): + return self.CAUSE_MESSAGE + + +class VideoUnavailable(CouldNotRetrieveTranscript): + CAUSE_MESSAGE = 'The video is no longer available' + + +class TranscriptsDisabled(CouldNotRetrieveTranscript): + CAUSE_MESSAGE = 'Subtitles are disabled for this video' + + +class NoTranscriptFound(CouldNotRetrieveTranscript): + CAUSE_MESSAGE = ( + 'No transcripts were found for any of the requested language codes: {requested_language_codes}\n\n' + '{transcript_data}' + ) + + def __init__(self, video_id, requested_language_codes, transcript_data): + self._requested_language_codes = requested_language_codes + self._transcript_data = transcript_data + super(NoTranscriptFound, self).__init__(video_id) + + @property + def cause(self): + return self.CAUSE_MESSAGE.format( + requested_language_codes=self._requested_language_codes, + transcript_data=str(self._transcript_data), + ) diff --git a/youtube_transcript_api/_settings.py b/youtube_transcript_api/_settings.py new file mode 100644 index 0000000..b1f7dfe --- /dev/null +++ b/youtube_transcript_api/_settings.py @@ -0,0 +1 @@ +WATCH_URL = 'https://www.youtube.com/watch?v={video_id}' diff --git a/youtube_transcript_api/_transcripts.py b/youtube_transcript_api/_transcripts.py new file mode 100644 index 0000000..37a272b --- /dev/null +++ b/youtube_transcript_api/_transcripts.py @@ -0,0 +1,202 @@ +import sys + +# This can only be tested by using different python versions, therefore it is not covered by coverage.py +if sys.version_info.major == 2: # pragma: no cover + reload(sys) + sys.setdefaultencoding('utf-8') + +import json + +from xml.etree import ElementTree + +import re + +from ._html_unescaping import unescape +from ._errors import VideoUnavailable, NoTranscriptFound, TranscriptsDisabled +from ._settings import WATCH_URL + + +class TranscriptDataFetcher(): + def __init__(self, http_client): + self._http_client = http_client + + def fetch(self, video_id): + return TranscriptData.build( + self._http_client, + video_id, + self._extract_captions_json(self._fetch_html(video_id), video_id) + ) + + def _extract_captions_json(self, html, video_id): + splitted_html = html.split('"captions":') + + if len(splitted_html) <= 1: + if '"playabilityStatus":' not in html: + raise VideoUnavailable(video_id) + + raise TranscriptsDisabled(video_id) + + return json.loads(splitted_html[1].split(',"videoDetails')[0].replace('\n', ''))[ + 'playerCaptionsTracklistRenderer' + ] + + def _fetch_html(self, video_id): + return self._http_client.get(WATCH_URL.format(video_id=video_id)).text.replace( + '\\u0026', '&' + ).replace( + '\\', '' + ) + + +class TranscriptData(): + # TODO implement iterator + + def __init__( + self, http_client, video_id, manually_created_transcripts, generated_transcripts, translation_languages + ): + self._http_client = http_client + self.video_id = video_id + self._manually_created_transcripts = manually_created_transcripts + self._generated_transcripts = generated_transcripts + self._translation_languages = translation_languages + + @staticmethod + def build(http_client, video_id, captions_json): + manually_created_transcripts = [] + generated_transcripts = [] + + for caption in captions_json['captionTracks']: + (generated_transcripts if caption.get('kind', '') == 'asr' else generated_transcripts).append( + { + 'url': caption['baseUrl'], + 'language': caption['name']['simpleText'], + 'language_code': caption['languageCode'], + 'is_generated': caption.get('kind', '') == 'asr', + 'is_translatable': caption['isTranslatable'], + } + ) + + return TranscriptData( + http_client, + video_id, + manually_created_transcripts, + generated_transcripts, + [ + { + 'language': translation_language['languageName']['simpleText'], + 'language_code': translation_language['languageCode'], + } for translation_language in captions_json['translationLanguages'] + ], + ) + + def find_transcript(self, language_codes): + try: + return self.find_manually_created_transcript(language_codes) + except NoTranscriptFound: + pass + + return self.find_generated_transcript(language_codes) + + def find_generated_transcript(self, language_codes): + return self._find_transcript(language_codes, generated=True) + + def find_manually_created_transcript(self, language_codes): + return self._find_transcript(language_codes, generated=False) + + def _find_transcript(self, language_codes, generated): + transcripts = self._generated_transcripts if generated else self._manually_created_transcripts + + for language_code in language_codes: + for transcript in transcripts: + if transcript['language_code'] == language_code: + return Transcript( + self._http_client, + transcript['url'], + transcript['language'], + transcript['language_code'], + transcript['is_generated'], + self._translation_languages if transcript['is_translatable'] else [] + ) + + raise NoTranscriptFound( + self.video_id, + language_codes, + self + ) + + def __str__(self): + return ( + 'For this video ({video_id}) transcripts are available in the following languages:\n\n' + '(MANUALLY CREATED)\n' + '{available_manually_created_transcript_languages}\n\n' + '(GENERATED)\n' + '{available_generated_transcripts}' + ).format( + video_id=self.video_id, + available_manually_created_transcript_languages=self._get_language_description( + self._manually_created_transcripts + ), + available_generated_transcripts=self._get_language_description( + self._generated_transcripts + ), + ) + + def _get_language_description(self, transcripts): + return '\n'.join( + ' - {language_code} ("{language}")'.format( + language=transcript['language'], + language_code=transcript['language_code'], + ) for transcript in transcripts + ) if transcripts else 'None' + + +class Transcript(): + def __init__(self, http_client, url, language, language_code, is_generated, translation_languages): + self._http_client = http_client + self.url = url + self.language = language + self.language_code = language_code + self.is_generated = is_generated + self.translation_languages = translation_languages + + def fetch(self): + return _TranscriptParser().parse( + self._http_client.get(self.url).text + ) + +# TODO integrate translations in future release +# @property +# def is_translatable(self): +# return len(self.translation_languages) > 0 +# +# +# class TranslatableTranscript(Transcript): +# def __init__(self, http_client, url, translation_languages): +# super(TranslatableTranscript, self).__init__(http_client, url) +# self._translation_languages = translation_languages +# self._translation_language_codes = {language['language_code'] for language in translation_languages} +# +# +# def translate(self, language_code): +# if language_code not in self._translation_language_codes: +# raise TranslatableTranscript.TranslationLanguageNotAvailable() +# +# return Transcript( +# self._http_client, +# '{url}&tlang={language_code}'.format(url=self._url, language_code=language_code) +# ) + + +class _TranscriptParser(): + HTML_TAG_REGEX = re.compile(r'<[^>]*>', re.IGNORECASE) + + def parse(self, plain_data): + return [ + { + 'text': re.sub(self.HTML_TAG_REGEX, '', unescape(xml_element.text)), + 'start': float(xml_element.attrib['start']), + 'duration': float(xml_element.attrib['dur']), + } + for xml_element in ElementTree.fromstring(plain_data) + if xml_element.text is not None + ] diff --git a/youtube_transcript_api/test/assets/youtube_transcripts_disabled.html.static b/youtube_transcript_api/test/assets/youtube_transcripts_disabled.html.static new file mode 100644 index 0000000..626cc67 --- /dev/null +++ b/youtube_transcript_api/test/assets/youtube_transcripts_disabled.html.static @@ -0,0 +1,2160 @@ + + + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ++ + +
+ + ++ + +
+ + ++ + +
+ + ++ + +
+ + ++ + +
+ + ++ + +
+ + +