everything_else.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. from .common import (get, multi_get, deep_get, multi_deep_get,
  2. liberal_update, conservative_update, remove_redirect, normalize_url,
  3. extract_str, extract_formatted_text, extract_int, extract_approx_int,
  4. extract_date, check_missing_keys, extract_item_info, extract_items,
  5. extract_response)
  6. from youtube import proto
  7. import re
  8. import urllib
  9. from math import ceil
  10. def extract_channel_info(polymer_json, tab, continuation=False):
  11. response, err = extract_response(polymer_json)
  12. if err:
  13. return {'error': err}
  14. metadata = deep_get(response, 'metadata', 'channelMetadataRenderer',
  15. default={})
  16. if not metadata:
  17. metadata = deep_get(response, 'microformat', 'microformatDataRenderer',
  18. default={})
  19. # channel doesn't exist or was terminated
  20. # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
  21. # metadata and microformat are not present for continuation requests
  22. if not metadata and not continuation:
  23. if response.get('alerts'):
  24. error_string = ' '.join(
  25. extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
  26. for alert in response['alerts']
  27. )
  28. if not error_string:
  29. error_string = 'Failed to extract error'
  30. return {'error': error_string}
  31. elif deep_get(response, 'responseContext', 'errors'):
  32. for error in response['responseContext']['errors'].get('error', []):
  33. if error.get('code') == 'INVALID_VALUE' and error.get('location') == 'browse_id':
  34. return {'error': 'This channel does not exist'}
  35. return {'error': 'Failure getting metadata'}
  36. info = {'error': None}
  37. info['current_tab'] = tab
  38. info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
  39. 'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
  40. # stuff from microformat (info given by youtube for first page on channel)
  41. info['short_description'] = metadata.get('description')
  42. if info['short_description'] and len(info['short_description']) > 730:
  43. info['short_description'] = info['short_description'][0:730] + '...'
  44. info['channel_name'] = metadata.get('title')
  45. info['avatar'] = normalize_url(multi_deep_get(metadata,
  46. ['avatar', 'thumbnails', 0, 'url'],
  47. ['thumbnail', 'thumbnails', 0, 'url'],
  48. ))
  49. channel_url = multi_get(metadata, 'urlCanonical', 'channelUrl')
  50. if channel_url:
  51. channel_id = get(channel_url.rstrip('/').split('/'), -1)
  52. info['channel_id'] = channel_id
  53. else:
  54. info['channel_id'] = metadata.get('externalId')
  55. if info['channel_id']:
  56. info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
  57. else:
  58. info['channel_url'] = None
  59. # get items
  60. info['items'] = []
  61. info['ctoken'] = None
  62. # empty channel
  63. #if 'contents' not in response and 'continuationContents' not in response:
  64. # return info
  65. if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'):
  66. items, ctoken = extract_items(response)
  67. additional_info = {
  68. 'author': info['channel_name'],
  69. 'author_id': info['channel_id'],
  70. 'author_url': info['channel_url'],
  71. }
  72. info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
  73. info['ctoken'] = ctoken
  74. if tab in ('search', 'playlists'):
  75. info['is_last_page'] = (ctoken is None)
  76. elif tab == 'about':
  77. # Latest type
  78. items, _ = extract_items(response, item_types={'aboutChannelRenderer'})
  79. if items:
  80. a_metadata = deep_get(items, 0, 'aboutChannelRenderer',
  81. 'metadata', 'aboutChannelViewModel')
  82. if not a_metadata:
  83. info['error'] = 'Could not find aboutChannelViewModel'
  84. return info
  85. info['links'] = []
  86. for link_outer in a_metadata.get('links', ()):
  87. link = link_outer.get('channelExternalLinkViewModel') or {}
  88. link_content = extract_str(deep_get(link, 'link', 'content'))
  89. for run in deep_get(link, 'link', 'commandRuns') or ():
  90. url = remove_redirect(deep_get(run, 'onTap',
  91. 'innertubeCommand', 'urlEndpoint', 'url'))
  92. if url and not (url.startswith('http://')
  93. or url.startswith('https://')):
  94. url = 'https://' + url
  95. if link_content is None or (link_content in url):
  96. break
  97. else: # didn't break
  98. url = link_content
  99. if url and not (url.startswith('http://')
  100. or url.startswith('https://')):
  101. url = 'https://' + url
  102. text = extract_str(deep_get(link, 'title', 'content'))
  103. info['links'].append( (text, url) )
  104. info['date_joined'] = extract_date(
  105. a_metadata.get('joinedDateText')
  106. )
  107. info['view_count'] = extract_int(a_metadata.get('viewCountText'))
  108. info['approx_view_count'] = extract_approx_int(
  109. a_metadata.get('viewCountText')
  110. )
  111. info['description'] = extract_str(
  112. a_metadata.get('description'), default=''
  113. )
  114. info['approx_video_count'] = extract_approx_int(
  115. a_metadata.get('videoCountText')
  116. )
  117. info['approx_subscriber_count'] = extract_approx_int(
  118. a_metadata.get('subscriberCountText')
  119. )
  120. info['country'] = extract_str(a_metadata.get('country'))
  121. info['canonical_url'] = extract_str(
  122. a_metadata.get('canonicalChannelUrl')
  123. )
  124. # Old type
  125. else:
  126. items, _ = extract_items(response,
  127. item_types={'channelAboutFullMetadataRenderer'})
  128. if not items:
  129. info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer'
  130. return info
  131. a_metadata = items[0]['channelAboutFullMetadataRenderer']
  132. info['links'] = []
  133. for link_json in a_metadata.get('primaryLinks', ()):
  134. url = remove_redirect(deep_get(link_json, 'navigationEndpoint',
  135. 'urlEndpoint', 'url'))
  136. if url and not (url.startswith('http://')
  137. or url.startswith('https://')):
  138. url = 'https://' + url
  139. text = extract_str(link_json.get('title'))
  140. info['links'].append( (text, url) )
  141. info['date_joined'] = extract_date(a_metadata.get('joinedDateText'))
  142. info['view_count'] = extract_int(a_metadata.get('viewCountText'))
  143. info['description'] = extract_str(a_metadata.get(
  144. 'description'), default='')
  145. info['approx_video_count'] = None
  146. info['approx_subscriber_count'] = None
  147. info['country'] = None
  148. info['canonical_url'] = None
  149. else:
  150. raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
  151. return info
  152. def extract_search_info(polymer_json):
  153. response, err = extract_response(polymer_json)
  154. if err:
  155. return {'error': err}
  156. info = {'error': None}
  157. info['estimated_results'] = int(response['estimatedResults'])
  158. info['estimated_pages'] = ceil(info['estimated_results']/20)
  159. results, _ = extract_items(response)
  160. info['items'] = []
  161. info['corrections'] = {'type': None}
  162. for renderer in results:
  163. type = list(renderer.keys())[0]
  164. if type == 'shelfRenderer':
  165. continue
  166. if type == 'didYouMeanRenderer':
  167. renderer = renderer[type]
  168. info['corrections'] = {
  169. 'type': 'did_you_mean',
  170. 'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'],
  171. 'corrected_query_text': renderer['correctedQuery']['runs'],
  172. }
  173. continue
  174. if type == 'showingResultsForRenderer':
  175. renderer = renderer[type]
  176. info['corrections'] = {
  177. 'type': 'showing_results_for',
  178. 'corrected_query_text': renderer['correctedQuery']['runs'],
  179. 'original_query_text': renderer['originalQuery']['simpleText'],
  180. }
  181. continue
  182. i_info = extract_item_info(renderer)
  183. if i_info.get('type') != 'unsupported':
  184. info['items'].append(i_info)
  185. return info
  186. def extract_playlist_metadata(polymer_json):
  187. response, err = extract_response(polymer_json)
  188. if err:
  189. return {'error': err}
  190. metadata = {'error': None}
  191. header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
  192. metadata['title'] = extract_str(header.get('title'))
  193. metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
  194. first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
  195. 'thumbnail', 'thumbnails', 0, 'url', default=''))
  196. if first_id:
  197. conservative_update(metadata, 'first_video_id', first_id.group(1))
  198. if metadata['first_video_id'] is None:
  199. metadata['thumbnail'] = None
  200. else:
  201. metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
  202. metadata['video_count'] = extract_int(header.get('numVideosText'))
  203. metadata['description'] = extract_str(header.get('descriptionText'), default='')
  204. metadata['author'] = extract_str(header.get('ownerText'))
  205. metadata['author_id'] = multi_deep_get(header,
  206. ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
  207. ['ownerEndpoint', 'browseEndpoint', 'browseId'])
  208. if metadata['author_id']:
  209. metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
  210. else:
  211. metadata['author_url'] = None
  212. metadata['view_count'] = extract_int(header.get('viewCountText'))
  213. metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
  214. for stat in header.get('stats', ()):
  215. text = extract_str(stat)
  216. if 'videos' in text:
  217. conservative_update(metadata, 'video_count', extract_int(text))
  218. elif 'views' in text:
  219. conservative_update(metadata, 'view_count', extract_int(text))
  220. elif 'updated' in text:
  221. metadata['time_published'] = extract_date(text)
  222. microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
  223. default={})
  224. conservative_update(
  225. metadata, 'title', extract_str(microformat.get('title'))
  226. )
  227. conservative_update(
  228. metadata, 'description', extract_str(microformat.get('description'))
  229. )
  230. conservative_update(
  231. metadata, 'thumbnail', deep_get(microformat, 'thumbnail',
  232. 'thumbnails', -1, 'url')
  233. )
  234. return metadata
  235. def extract_playlist_info(polymer_json):
  236. response, err = extract_response(polymer_json)
  237. if err:
  238. return {'error': err}
  239. info = {'error': None}
  240. video_list, _ = extract_items(response)
  241. info['items'] = [extract_item_info(renderer) for renderer in video_list]
  242. info['metadata'] = extract_playlist_metadata(polymer_json)
  243. return info
  244. def _ctoken_metadata(ctoken):
  245. result = dict()
  246. params = proto.parse(proto.b64_to_bytes(ctoken))
  247. result['video_id'] = proto.parse(params[2])[2].decode('ascii')
  248. offset_information = proto.parse(params[6])
  249. result['offset'] = offset_information.get(5, 0)
  250. result['is_replies'] = False
  251. if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
  252. result['is_replies'] = True
  253. result['sort'] = None
  254. else:
  255. try:
  256. result['sort'] = proto.parse(offset_information[4])[6]
  257. except KeyError:
  258. result['sort'] = 0
  259. return result
  260. def extract_comments_info(polymer_json, ctoken=None):
  261. response, err = extract_response(polymer_json)
  262. if err:
  263. return {'error': err}
  264. info = {'error': None}
  265. if ctoken:
  266. metadata = _ctoken_metadata(ctoken)
  267. else:
  268. metadata = {}
  269. info['video_id'] = metadata.get('video_id')
  270. info['offset'] = metadata.get('offset')
  271. info['is_replies'] = metadata.get('is_replies')
  272. info['sort'] = metadata.get('sort')
  273. info['video_title'] = None
  274. comments, ctoken = extract_items(response,
  275. item_types={'commentThreadRenderer', 'commentRenderer'})
  276. info['comments'] = []
  277. info['ctoken'] = ctoken
  278. for comment in comments:
  279. comment_info = {}
  280. if 'commentThreadRenderer' in comment: # top level comments
  281. conservative_update(info, 'is_replies', False)
  282. comment_thread = comment['commentThreadRenderer']
  283. info['video_title'] = extract_str(comment_thread.get('commentTargetTitle'))
  284. if 'replies' not in comment_thread:
  285. comment_info['reply_count'] = 0
  286. comment_info['reply_ctoken'] = None
  287. else:
  288. comment_info['reply_count'] = extract_int(deep_get(comment_thread,
  289. 'replies', 'commentRepliesRenderer', 'moreText'
  290. ), default=1) # With 1 reply, the text reads "View reply"
  291. comment_info['reply_ctoken'] = multi_deep_get(
  292. comment_thread,
  293. ['replies', 'commentRepliesRenderer', 'contents', 0,
  294. 'continuationItemRenderer', 'button', 'buttonRenderer',
  295. 'command', 'continuationCommand', 'token'],
  296. ['replies', 'commentRepliesRenderer', 'continuations', 0,
  297. 'nextContinuationData', 'continuation']
  298. )
  299. comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
  300. elif 'commentRenderer' in comment: # replies
  301. comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it
  302. comment_info['reply_ctoken'] = None
  303. conservative_update(info, 'is_replies', True)
  304. comment_renderer = comment['commentRenderer']
  305. else:
  306. comment_renderer = {}
  307. # These 3 are sometimes absent, likely because the channel was deleted
  308. comment_info['author'] = extract_str(comment_renderer.get('authorText'))
  309. comment_info['author_url'] = normalize_url(deep_get(comment_renderer,
  310. 'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url'))
  311. comment_info['author_id'] = deep_get(comment_renderer,
  312. 'authorEndpoint', 'browseEndpoint', 'browseId')
  313. comment_info['author_avatar'] = normalize_url(deep_get(
  314. comment_renderer, 'authorThumbnail', 'thumbnails', 0, 'url'))
  315. comment_info['id'] = comment_renderer.get('commentId')
  316. comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
  317. comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
  318. comment_info['like_count'] = comment_renderer.get('likeCount')
  319. comment_info['approx_like_count'] = extract_approx_int(
  320. comment_renderer.get('voteCount'))
  321. liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
  322. info['comments'].append(comment_info)
  323. return info