common.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. import re
  2. import urllib.parse
  3. import collections
  4. import collections.abc
  5. def get(object, key, default=None, types=()):
  6. '''Like dict.get(), but returns default if the result doesn't match one of the types.
  7. Also works for indexing lists.'''
  8. try:
  9. result = object[key]
  10. except (TypeError, IndexError, KeyError):
  11. return default
  12. if not types or isinstance(result, types):
  13. return result
  14. else:
  15. return default
  16. def multi_get(object, *keys, default=None, types=()):
  17. '''Like get, but try other keys if the first fails'''
  18. for key in keys:
  19. try:
  20. result = object[key]
  21. except (TypeError, IndexError, KeyError):
  22. pass
  23. else:
  24. if not types or isinstance(result, types):
  25. return result
  26. else:
  27. continue
  28. return default
  29. def deep_get(object, *keys, default=None, types=()):
  30. '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices.
  31. Last argument is the default value to use in case of any IndexErrors or KeyErrors.
  32. If types is given and the result doesn't match one of those types, default is returned'''
  33. try:
  34. for key in keys:
  35. object = object[key]
  36. except (TypeError, IndexError, KeyError):
  37. return default
  38. else:
  39. if not types or isinstance(object, types):
  40. return object
  41. else:
  42. return default
  43. def multi_deep_get(object, *key_sequences, default=None, types=()):
  44. '''Like deep_get, but can try different key sequences in case one fails.
  45. Return default if all of them fail. key_sequences is a list of lists'''
  46. for key_sequence in key_sequences:
  47. _object = object
  48. try:
  49. for key in key_sequence:
  50. _object = _object[key]
  51. except (TypeError, IndexError, KeyError):
  52. pass
  53. else:
  54. if not types or isinstance(_object, types):
  55. return _object
  56. else:
  57. continue
  58. return default
  59. def _is_empty(value):
  60. '''Determines if value is None or an empty iterable, such as '' and []'''
  61. if value is None:
  62. return True
  63. elif isinstance(value, collections.abc.Iterable) and not value:
  64. return True
  65. return False
  66. def liberal_update(obj, key, value):
  67. '''Updates obj[key] with value as long as value is not None or empty.
  68. Ensures obj[key] will at least get an empty value, however'''
  69. if (not _is_empty(value)) or (key not in obj):
  70. obj[key] = value
  71. def conservative_update(obj, key, value):
  72. '''Only updates obj if it doesn't have key or obj[key] is None/empty'''
  73. if _is_empty(obj.get(key)):
  74. obj[key] = value
  75. def liberal_dict_update(dict1, dict2):
  76. '''Update dict1 with keys from dict2 using liberal_update'''
  77. for key, value in dict2.items():
  78. liberal_update(dict1, key, value)
  79. def conservative_dict_update(dict1, dict2):
  80. '''Update dict1 with keys from dict2 using conservative_update'''
  81. for key, value in dict2.items():
  82. conservative_update(dict1, key, value)
  83. def concat_or_none(*strings):
  84. '''Concatenates strings. Returns None if any of the arguments are None'''
  85. result = ''
  86. for string in strings:
  87. if string is None:
  88. return None
  89. result += string
  90. return result
  91. def remove_redirect(url):
  92. if url is None:
  93. return None
  94. if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking
  95. query_string = url[url.find('?')+1: ]
  96. return urllib.parse.parse_qs(query_string)['q'][0]
  97. return url
  98. norm_url_re = re.compile(r'^(?:(?:https?:)?//)?((?:[\w-]+\.)+[\w-]+)?(/.*)$')
  99. def normalize_url(url):
  100. '''Insert https, resolve relative paths for youtube.com, and put www. infront of youtube.com'''
  101. if url is None:
  102. return None
  103. match = norm_url_re.fullmatch(url)
  104. if match is None:
  105. raise Exception(url)
  106. domain = match.group(1) or 'www.youtube.com'
  107. if domain == 'youtube.com':
  108. domain = 'www.youtube.com'
  109. return 'https://' + domain + match.group(2)
  110. def _recover_urls(runs):
  111. for run in runs:
  112. url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
  113. text = run.get('text', '')
  114. # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text
  115. if url is not None and (text.startswith('http://') or text.startswith('https://')):
  116. url = remove_redirect(url)
  117. run['url'] = url
  118. run['text'] = url # YouTube truncates the url text, use actual url instead
  119. def extract_str(node, default=None, recover_urls=False):
  120. '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)'''
  121. if isinstance(node, str):
  122. return node
  123. try:
  124. return node['simpleText']
  125. except (KeyError, TypeError):
  126. pass
  127. if isinstance(node, dict) and 'runs' in node:
  128. if recover_urls:
  129. _recover_urls(node['runs'])
  130. return ''.join(text_run.get('text', '') for text_run in node['runs'])
  131. return default
  132. def extract_formatted_text(node):
  133. if not node:
  134. return []
  135. if 'runs' in node:
  136. _recover_urls(node['runs'])
  137. return node['runs']
  138. elif 'simpleText' in node:
  139. return [{'text': node['simpleText']}]
  140. return []
  141. def extract_int(string, default=None, whole_word=True):
  142. if isinstance(string, int):
  143. return string
  144. if not isinstance(string, str):
  145. string = extract_str(string)
  146. if not string:
  147. return default
  148. if whole_word:
  149. match = re.search(r'\b(\d+)\b', string.replace(',', ''))
  150. else:
  151. match = re.search(r'(\d+)', string.replace(',', ''))
  152. if match is None:
  153. return default
  154. try:
  155. return int(match.group(1))
  156. except ValueError:
  157. return default
  158. def extract_approx_int(string):
  159. '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353'''
  160. if not isinstance(string, str):
  161. string = extract_str(string)
  162. if not string:
  163. return None
  164. match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', ''))
  165. if match is None:
  166. return None
  167. result = match.group(1)
  168. if re.fullmatch(r'\d+', result):
  169. result = '{:,}'.format(int(result))
  170. return result
  171. MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
  172. def extract_date(date_text):
  173. '''Input: "Mar 9, 2019". Output: "2019-3-9"'''
  174. if not isinstance(date_text, str):
  175. date_text = extract_str(date_text)
  176. if date_text is None:
  177. return None
  178. date_text = date_text.replace(',', '').lower()
  179. parts = date_text.split()
  180. if len(parts) >= 3:
  181. month, day, year = parts[-3:]
  182. month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
  183. if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
  184. return year + '-' + month + '-' + day
  185. return None
  186. def check_missing_keys(object, *key_sequences):
  187. for key_sequence in key_sequences:
  188. _object = object
  189. try:
  190. for key in key_sequence:
  191. _object = _object[key]
  192. except (KeyError, IndexError, TypeError):
  193. return 'Could not find ' + key
  194. return None
  195. def extract_item_info(item, additional_info={}):
  196. if not item:
  197. return {'error': 'No item given'}
  198. type = get(list(item.keys()), 0)
  199. if not type:
  200. return {'error': 'Could not find type'}
  201. item = item[type]
  202. info = {'error': None}
  203. if type in ('itemSectionRenderer', 'compactAutoplayRenderer'):
  204. return extract_item_info(deep_get(item, 'contents', 0), additional_info)
  205. if type in ('movieRenderer', 'clarificationRenderer'):
  206. info['type'] = 'unsupported'
  207. return info
  208. # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
  209. # camelCase split, https://stackoverflow.com/a/37697078
  210. type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
  211. if len(type_parts) < 2:
  212. info['type'] = 'unsupported'
  213. return
  214. primary_type = type_parts[-2]
  215. if primary_type == 'video':
  216. info['type'] = 'video'
  217. elif type_parts[0] == 'reel': # shorts
  218. info['type'] = 'video'
  219. primary_type = 'video'
  220. elif primary_type in ('playlist', 'radio', 'show'):
  221. info['type'] = 'playlist'
  222. info['playlist_type'] = primary_type
  223. elif primary_type == 'channel':
  224. info['type'] = 'channel'
  225. elif type == 'videoWithContextRenderer': # stupid exception
  226. info['type'] = 'video'
  227. primary_type = 'video'
  228. else:
  229. info['type'] = 'unsupported'
  230. # videoWithContextRenderer changes it to 'headline' just to be annoying
  231. info['title'] = extract_str(multi_get(item, 'title', 'headline'))
  232. if primary_type != 'channel':
  233. info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText'))
  234. info['author_id'] = extract_str(multi_deep_get(item,
  235. ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
  236. ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
  237. ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
  238. ))
  239. info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
  240. info['description'] = extract_formatted_text(multi_deep_get(
  241. item,
  242. ['descriptionText'], ['descriptionSnippet'],
  243. ['detailedMetadataSnippets', 0, 'snippetText'],
  244. ))
  245. info['thumbnail'] = normalize_url(multi_deep_get(item,
  246. ['thumbnail', 'thumbnails', 0, 'url'], # videos
  247. ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
  248. ['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
  249. ))
  250. info['badges'] = []
  251. for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()):
  252. badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label')
  253. if badge:
  254. info['badges'].append(badge)
  255. if primary_type in ('video', 'playlist'):
  256. info['time_published'] = None
  257. timestamp = re.search(r'(\d+ \w+ ago)',
  258. extract_str(item.get('publishedTimeText'), default=''))
  259. if timestamp:
  260. info['time_published'] = timestamp.group(1)
  261. if primary_type == 'video':
  262. info['id'] = multi_deep_get(item,
  263. ['videoId'],
  264. ['navigationEndpoint', 'watchEndpoint', 'videoId'],
  265. ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts
  266. )
  267. info['view_count'] = extract_int(item.get('viewCountText'))
  268. # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published
  269. accessibility_label = multi_deep_get(item,
  270. ['title', 'accessibility', 'accessibilityData', 'label'],
  271. ['headline', 'accessibility', 'accessibilityData', 'label'],
  272. default='')
  273. timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label)
  274. if timestamp:
  275. conservative_update(info, 'time_published', timestamp.group(1))
  276. view_count = re.search(r'(\d+) views', accessibility_label.replace(',', ''))
  277. if view_count:
  278. conservative_update(info, 'view_count', int(view_count.group(1)))
  279. if info['view_count']:
  280. info['approx_view_count'] = '{:,}'.format(info['view_count'])
  281. else:
  282. info['approx_view_count'] = extract_approx_int(multi_get(item,
  283. 'shortViewCountText',
  284. 'viewCountText' # shorts
  285. ))
  286. # handle case where it is "No views"
  287. if not info['approx_view_count']:
  288. if ('No views' in item.get('shortViewCountText', '')
  289. or 'no views' in accessibility_label.lower()
  290. or 'No views' in extract_str(item.get('viewCountText', '')) # shorts
  291. ):
  292. info['view_count'] = 0
  293. info['approx_view_count'] = '0'
  294. info['duration'] = extract_str(item.get('lengthText'))
  295. # dig into accessibility data to get duration for shorts
  296. accessibility_label = deep_get(item,
  297. 'accessibility', 'accessibilityData', 'label',
  298. default='')
  299. duration = re.search(r'(\d+) (second|seconds|minute) - play video$',
  300. accessibility_label)
  301. if duration:
  302. if duration.group(2) == 'minute':
  303. conservative_update(info, 'duration', '1:00')
  304. else:
  305. conservative_update(info,
  306. 'duration', '0:' + duration.group(1).zfill(2))
  307. # if it's an item in a playlist, get its index
  308. if 'index' in item: # url has wrong index on playlist page
  309. info['index'] = extract_int(item.get('index'))
  310. elif 'indexText' in item:
  311. # Current item in playlist has ▶ instead of the actual index, must
  312. # dig into url
  313. match = re.search(r'index=(\d+)', deep_get(item,
  314. 'navigationEndpoint', 'commandMetadata', 'webCommandMetadata',
  315. 'url', default=''))
  316. if match is None: # worth a try then
  317. info['index'] = extract_int(item.get('indexText'))
  318. else:
  319. info['index'] = int(match.group(1))
  320. else:
  321. info['index'] = None
  322. elif primary_type in ('playlist', 'radio'):
  323. info['id'] = item.get('playlistId')
  324. info['video_count'] = extract_int(item.get('videoCount'))
  325. info['first_video_id'] = deep_get(item, 'navigationEndpoint',
  326. 'watchEndpoint', 'videoId')
  327. elif primary_type == 'channel':
  328. info['id'] = item.get('channelId')
  329. info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
  330. elif primary_type == 'show':
  331. info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
  332. info['first_video_id'] = deep_get(item, 'navigationEndpoint',
  333. 'watchEndpoint', 'videoId')
  334. if primary_type in ('playlist', 'channel'):
  335. conservative_update(info, 'video_count', extract_int(item.get('videoCountText')))
  336. for overlay in item.get('thumbnailOverlays', []):
  337. conservative_update(info, 'duration', extract_str(deep_get(
  338. overlay, 'thumbnailOverlayTimeStatusRenderer', 'text'
  339. )))
  340. # show renderers don't have videoCountText
  341. conservative_update(info, 'video_count', extract_int(deep_get(
  342. overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
  343. )))
  344. info.update(additional_info)
  345. return info
  346. def extract_response(polymer_json):
  347. '''return response, error'''
  348. # /youtubei/v1/browse endpoint returns response directly
  349. if isinstance(polymer_json, dict) and 'responseContext' in polymer_json:
  350. # this is the response
  351. return polymer_json, None
  352. response = multi_deep_get(polymer_json, [1, 'response'], ['response'])
  353. if response is None:
  354. return None, 'Failed to extract response'
  355. else:
  356. return response, None
  357. _item_types = {
  358. 'movieRenderer',
  359. 'didYouMeanRenderer',
  360. 'showingResultsForRenderer',
  361. 'videoRenderer',
  362. 'compactVideoRenderer',
  363. 'compactAutoplayRenderer',
  364. 'videoWithContextRenderer',
  365. 'gridVideoRenderer',
  366. 'playlistVideoRenderer',
  367. 'reelItemRenderer',
  368. 'playlistRenderer',
  369. 'compactPlaylistRenderer',
  370. 'gridPlaylistRenderer',
  371. 'radioRenderer',
  372. 'compactRadioRenderer',
  373. 'gridRadioRenderer',
  374. 'showRenderer',
  375. 'compactShowRenderer',
  376. 'gridShowRenderer',
  377. 'channelRenderer',
  378. 'compactChannelRenderer',
  379. 'gridChannelRenderer',
  380. }
  381. def _traverse_browse_renderer(renderer):
  382. for tab in get(renderer, 'tabs', ()):
  383. tab_renderer = multi_get(tab, 'tabRenderer', 'expandableTabRenderer')
  384. if tab_renderer is None:
  385. continue
  386. if tab_renderer.get('selected', False):
  387. return get(tab_renderer, 'content', {})
  388. print('Could not find tab with content')
  389. return {}
  390. def _traverse_standard_list(renderer):
  391. renderer_list = multi_get(renderer, 'contents', 'items', default=())
  392. continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation')
  393. return renderer_list, continuation
  394. # these renderers contain one inside them
  395. nested_renderer_dispatch = {
  396. 'singleColumnBrowseResultsRenderer': _traverse_browse_renderer,
  397. 'twoColumnBrowseResultsRenderer': _traverse_browse_renderer,
  398. 'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}),
  399. 'richItemRenderer': lambda r: get(r, 'content', {}),
  400. 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}),
  401. }
  402. # these renderers contain a list of renderers inside them
  403. nested_renderer_list_dispatch = {
  404. 'sectionListRenderer': _traverse_standard_list,
  405. 'itemSectionRenderer': _traverse_standard_list,
  406. 'gridRenderer': _traverse_standard_list,
  407. 'richGridRenderer': _traverse_standard_list,
  408. 'playlistVideoListRenderer': _traverse_standard_list,
  409. 'structuredDescriptionContentRenderer': _traverse_standard_list,
  410. 'slimVideoMetadataSectionRenderer': _traverse_standard_list,
  411. 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None),
  412. }
  413. def get_nested_renderer_list_function(key):
  414. if key in nested_renderer_list_dispatch:
  415. return nested_renderer_list_dispatch[key]
  416. elif key.endswith('Continuation'):
  417. return _traverse_standard_list
  418. return None
  419. def extract_items_from_renderer(renderer, item_types=_item_types):
  420. ctoken = None
  421. items = []
  422. iter_stack = collections.deque()
  423. current_iter = iter(())
  424. while True:
  425. # mode 1: get a new renderer by iterating.
  426. # goes down the stack for an iterator if one has been exhausted
  427. if not renderer:
  428. try:
  429. renderer = current_iter.__next__()
  430. except StopIteration:
  431. try:
  432. current_iter = iter_stack.pop()
  433. except IndexError:
  434. return items, ctoken
  435. # Get new renderer or check that the one we got is good before
  436. # proceeding to mode 2
  437. continue
  438. # mode 2: dig into the current renderer
  439. key, value = list(renderer.items())[0]
  440. # the renderer is an item
  441. if key in item_types:
  442. items.append(renderer)
  443. # ctoken sometimes placed in these renderers, e.g. channel playlists
  444. elif key == 'continuationItemRenderer':
  445. cont = deep_get(
  446. value, 'continuationEndpoint', 'continuationCommand', 'token'
  447. )
  448. if cont:
  449. ctoken = cont
  450. # has a list in it, add it to the iter stack
  451. elif get_nested_renderer_list_function(key):
  452. renderer_list, cont = get_nested_renderer_list_function(key)(value)
  453. if renderer_list:
  454. iter_stack.append(current_iter)
  455. current_iter = iter(renderer_list)
  456. if cont:
  457. ctoken = cont
  458. # new renderer nested inside this one
  459. elif key in nested_renderer_dispatch:
  460. renderer = nested_renderer_dispatch[key](value)
  461. continue # don't reset renderer to None
  462. renderer = None
  463. def extract_items_from_renderer_list(renderers, item_types=_item_types):
  464. '''Same as extract_items_from_renderer, but provide a list of renderers'''
  465. items = []
  466. ctoken = None
  467. for renderer in renderers:
  468. new_items, new_ctoken = extract_items_from_renderer(
  469. renderer,
  470. item_types=item_types)
  471. items += new_items
  472. # prioritize ctoken associated with items
  473. if (not ctoken) or (new_ctoken and new_items):
  474. ctoken = new_ctoken
  475. return items, ctoken
  476. def extract_items(response, item_types=_item_types,
  477. search_engagement_panels=False):
  478. '''return items, ctoken'''
  479. items = []
  480. ctoken = None
  481. if 'continuationContents' in response:
  482. # sometimes there's another, empty, junk [something]Continuation key
  483. # find real one
  484. for key, renderer_cont in get(response,
  485. 'continuationContents', {}).items():
  486. # e.g. commentSectionContinuation, playlistVideoListContinuation
  487. if key.endswith('Continuation'):
  488. items, ctoken = extract_items_from_renderer(
  489. {key: renderer_cont},
  490. item_types=item_types)
  491. if items:
  492. break
  493. if ('onResponseReceivedEndpoints' in response
  494. or 'onResponseReceivedActions' in response):
  495. for endpoint in multi_get(response,
  496. 'onResponseReceivedEndpoints',
  497. 'onResponseReceivedActions',
  498. []):
  499. new_items, new_ctoken = extract_items_from_renderer_list(
  500. multi_deep_get(
  501. endpoint,
  502. ['reloadContinuationItemsCommand', 'continuationItems'],
  503. ['appendContinuationItemsAction', 'continuationItems'],
  504. default=[]
  505. ),
  506. item_types=item_types,
  507. )
  508. items += new_items
  509. if (not ctoken) or (new_ctoken and new_items):
  510. ctoken = new_ctoken
  511. if 'contents' in response:
  512. renderer = get(response, 'contents', {})
  513. new_items, new_ctoken = extract_items_from_renderer(
  514. renderer,
  515. item_types=item_types)
  516. items += new_items
  517. if (not ctoken) or (new_ctoken and new_items):
  518. ctoken = new_ctoken
  519. if search_engagement_panels and 'engagementPanels' in response:
  520. new_items, new_ctoken = extract_items_from_renderer_list(
  521. response['engagementPanels'], item_types=item_types
  522. )
  523. items += new_items
  524. if (not ctoken) or (new_ctoken and new_items):
  525. ctoken = new_ctoken
  526. return items, ctoken