123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- import base64
- from youtube import (util, yt_data_extract, local_playlist, subscriptions,
- playlist)
- from youtube import yt_app
- import settings
- import urllib
- import json
- from string import Template
- import youtube.proto as proto
- import html
- import math
- import gevent
- import re
- import cachetools.func
- import traceback
- import flask
- from flask import request
- headers_desktop = (
- ('Accept', '*/*'),
- ('Accept-Language', 'en-US,en;q=0.5'),
- ('X-YouTube-Client-Name', '1'),
- ('X-YouTube-Client-Version', '2.20180830'),
- ) + util.desktop_ua
- headers_mobile = (
- ('Accept', '*/*'),
- ('Accept-Language', 'en-US,en;q=0.5'),
- ('X-YouTube-Client-Name', '2'),
- ('X-YouTube-Client-Version', '2.20180830'),
- ) + util.mobile_ua
- real_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=8XihrAcN1l4'),)
- generic_cookie = (('Cookie', 'VISITOR_INFO1_LIVE=ST1Ti53r4fU'),)
- # added an extra nesting under the 2nd base64 compared to v4
- # added tab support
- # changed offset field to uint id 1
- def channel_ctoken_v5(channel_id, page, sort, tab, view=1):
- new_sort = (2 if int(sort) == 1 else 1)
- offset = 30*(int(page) - 1)
- if tab == 'videos':
- tab = 15
- elif tab == 'shorts':
- tab = 10
- elif tab == 'streams':
- tab = 14
- pointless_nest = proto.string(80226972,
- proto.string(2, channel_id)
- + proto.string(3,
- proto.percent_b64encode(
- proto.string(110,
- proto.string(3,
- proto.string(tab,
- proto.string(1,
- proto.string(1,
- proto.unpadded_b64encode(
- proto.string(1,
- proto.string(1,
- proto.unpadded_b64encode(
- proto.string(2,
- b"ST:"
- + proto.unpadded_b64encode(
- proto.uint(1, offset)
- )
- )
- )
- )
- )
- )
- )
- # targetId, just needs to be present but
- # doesn't need to be correct
- + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38")
- )
- # 1 - newest, 2 - popular
- + proto.uint(3, new_sort)
- )
- )
- )
- )
- )
- )
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
- def channel_about_ctoken(channel_id):
- return proto.make_protobuf(
- ('base64p',
- [
- [2, 80226972,
- [
- [2, 2, channel_id],
- [2, 3,
- ('base64p',
- [
- [2, 110,
- [
- [2, 3,
- [
- [2, 19,
- [
- [2, 1, b'66b0e9e9-0000-2820-9589-582429a83980'],
- ]
- ],
- ]
- ],
- ]
- ],
- ]
- )
- ],
- ]
- ],
- ]
- )
- )
- # https://github.com/user234683/youtube-local/issues/151
- def channel_ctoken_v4(channel_id, page, sort, tab, view=1):
- new_sort = (2 if int(sort) == 1 else 1)
- offset = str(30*(int(page) - 1))
- pointless_nest = proto.string(80226972,
- proto.string(2, channel_id)
- + proto.string(3,
- proto.percent_b64encode(
- proto.string(110,
- proto.string(3,
- proto.string(15,
- proto.string(1,
- proto.string(1,
- proto.unpadded_b64encode(
- proto.string(1,
- proto.unpadded_b64encode(
- proto.string(2,
- b"ST:"
- + proto.unpadded_b64encode(
- proto.string(2, offset)
- )
- )
- )
- )
- )
- )
- # targetId, just needs to be present but
- # doesn't need to be correct
- + proto.string(2, "63faaff0-0000-23fe-80f0-582429d11c38")
- )
- # 1 - newest, 2 - popular
- + proto.uint(3, new_sort)
- )
- )
- )
- )
- )
- )
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
- # SORT:
- # videos:
- # Popular - 1
- # Oldest - 2
- # Newest - 3
- # playlists:
- # Oldest - 2
- # Newest - 3
- # Last video added - 4
- # view:
- # grid: 0 or 1
- # list: 2
- def channel_ctoken_v3(channel_id, page, sort, tab, view=1):
- # page > 1 doesn't work when sorting by oldest
- offset = 30*(int(page) - 1)
- page_token = proto.string(61, proto.unpadded_b64encode(
- proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
- ))
- tab = proto.string(2, tab)
- sort = proto.uint(3, int(sort))
- shelf_view = proto.uint(4, 0)
- view = proto.uint(6, int(view))
- continuation_info = proto.string(3,
- proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
- )
- channel_id = proto.string(2, channel_id)
- pointless_nest = proto.string(80226972, channel_id + continuation_info)
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
- def channel_ctoken_v2(channel_id, page, sort, tab, view=1):
- # see https://github.com/iv-org/invidious/issues/1319#issuecomment-671732646
- # page > 1 doesn't work when sorting by oldest
- offset = 30*(int(page) - 1)
- schema_number = {
- 3: 6307666885028338688,
- 2: 17254859483345278706,
- 1: 16570086088270825023,
- }[int(sort)]
- page_token = proto.string(61, proto.unpadded_b64encode(proto.string(1,
- proto.uint(1, schema_number) + proto.string(2,
- proto.string(1, proto.unpadded_b64encode(proto.uint(1,offset)))
- )
- )))
- tab = proto.string(2, tab)
- sort = proto.uint(3, int(sort))
- #page = proto.string(15, str(page))
- shelf_view = proto.uint(4, 0)
- view = proto.uint(6, int(view))
- continuation_info = proto.string(
- 3,
- proto.percent_b64encode(tab + sort + shelf_view + view + page_token)
- )
- channel_id = proto.string(2, channel_id)
- pointless_nest = proto.string(80226972, channel_id + continuation_info)
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
- def channel_ctoken_v1(channel_id, page, sort, tab, view=1):
- tab = proto.string(2, tab)
- sort = proto.uint(3, int(sort))
- page = proto.string(15, str(page))
- # example with shelves in videos tab: https://www.youtube.com/channel/UCNL1ZadSjHpjm4q9j2sVtOA/videos
- shelf_view = proto.uint(4, 0)
- view = proto.uint(6, int(view))
- continuation_info = proto.string(3, proto.percent_b64encode(tab + view + sort + shelf_view + page + proto.uint(23, 0)) )
- channel_id = proto.string(2, channel_id)
- pointless_nest = proto.string(80226972, channel_id + continuation_info)
- return base64.urlsafe_b64encode(pointless_nest).decode('ascii')
- def get_channel_tab(channel_id, page="1", sort=3, tab='videos', view=1,
- ctoken=None, print_status=True):
- message = 'Got channel tab' if print_status else None
- if not ctoken:
- if tab in ('videos', 'shorts', 'streams'):
- ctoken = channel_ctoken_v5(channel_id, page, sort, tab, view)
- else:
- ctoken = channel_ctoken_v3(channel_id, page, sort, tab, view)
- ctoken = ctoken.replace('=', '%3D')
- # Not sure what the purpose of the key is or whether it will change
- # For now it seems to be constant for the API endpoint, not dependent
- # on the browsing session or channel
- key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
- url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
- data = {
- 'context': {
- 'client': {
- 'hl': 'en',
- 'gl': 'US',
- 'clientName': 'WEB',
- 'clientVersion': '2.20240327.00.00',
- },
- },
- 'continuation': ctoken,
- }
- content_type_header = (('Content-Type', 'application/json'),)
- content = util.fetch_url(
- url, headers_desktop + content_type_header,
- data=json.dumps(data), debug_name='channel_tab', report_text=message)
- return content
- # cache entries expire after 30 minutes
- number_of_videos_cache = cachetools.TTLCache(128, 30*60)
- @cachetools.cached(number_of_videos_cache)
- def get_number_of_videos_channel(channel_id):
- if channel_id is None:
- return 1000
- # Uploads playlist
- playlist_id = 'UU' + channel_id[2:]
- url = 'https://m.youtube.com/playlist?list=' + playlist_id + '&pbj=1'
- try:
- response = util.fetch_url(url, headers_mobile,
- debug_name='number_of_videos', report_text='Got number of videos')
- except urllib.error.HTTPError as e:
- traceback.print_exc()
- print("Couldn't retrieve number of videos")
- return 1000
- response = response.decode('utf-8')
- # match = re.search(r'"numVideosText":\s*{\s*"runs":\s*\[{"text":\s*"([\d,]*) videos"', response)
- match = re.search(r'"numVideosText".*?([,\d]+)', response)
- if match:
- return int(match.group(1).replace(',',''))
- else:
- return 0
- def set_cached_number_of_videos(channel_id, num_videos):
- @cachetools.cached(number_of_videos_cache)
- def dummy_func_using_same_cache(channel_id):
- return num_videos
- dummy_func_using_same_cache(channel_id)
- channel_id_re = re.compile(r'videos\.xml\?channel_id=([a-zA-Z0-9_-]{24})"')
- @cachetools.func.lru_cache(maxsize=128)
- def get_channel_id(base_url):
- # method that gives the smallest possible response at ~4 kb
- # needs to be as fast as possible
- base_url = base_url.replace('https://www', 'https://m') # avoid redirect
- response = util.fetch_url(base_url + '/about?pbj=1', headers_mobile,
- debug_name='get_channel_id', report_text='Got channel id').decode('utf-8')
- match = channel_id_re.search(response)
- if match:
- return match.group(1)
- return None
- metadata_cache = cachetools.LRUCache(128)
- @cachetools.cached(metadata_cache)
- def get_metadata(channel_id):
- base_url = 'https://www.youtube.com/channel/' + channel_id
- polymer_json = util.fetch_url(base_url + '/about?pbj=1',
- headers_desktop,
- debug_name='gen_channel_about',
- report_text='Retrieved channel metadata')
- info = yt_data_extract.extract_channel_info(json.loads(polymer_json),
- 'about',
- continuation=False)
- return extract_metadata_for_caching(info)
- def set_cached_metadata(channel_id, metadata):
- @cachetools.cached(metadata_cache)
- def dummy_func_using_same_cache(channel_id):
- return metadata
- dummy_func_using_same_cache(channel_id)
- def extract_metadata_for_caching(channel_info):
- metadata = {}
- for key in ('approx_subscriber_count', 'short_description', 'channel_name',
- 'avatar'):
- metadata[key] = channel_info[key]
- return metadata
- def get_number_of_videos_general(base_url):
- return get_number_of_videos_channel(get_channel_id(base_url))
- def get_channel_search_json(channel_id, query, page):
- offset = proto.unpadded_b64encode(proto.uint(3, (page-1)*30))
- params = proto.string(2, 'search') + proto.string(15, offset)
- params = proto.percent_b64encode(params)
- ctoken = proto.string(2, channel_id) + proto.string(3, params) + proto.string(11, query)
- ctoken = base64.urlsafe_b64encode(proto.nested(80226972, ctoken)).decode('ascii')
- key = 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
- url = 'https://www.youtube.com/youtubei/v1/browse?key=' + key
- data = {
- 'context': {
- 'client': {
- 'hl': 'en',
- 'gl': 'US',
- 'clientName': 'WEB',
- 'clientVersion': '2.20240327.00.00',
- },
- },
- 'continuation': ctoken,
- }
- content_type_header = (('Content-Type', 'application/json'),)
- polymer_json = util.fetch_url(
- url, headers_desktop + content_type_header,
- data=json.dumps(data), debug_name='channel_search')
- return polymer_json
- def post_process_channel_info(info):
- info['avatar'] = util.prefix_url(info['avatar'])
- info['channel_url'] = util.prefix_url(info['channel_url'])
- for item in info['items']:
- item['thumbnail'] = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format(item['id'])
- util.prefix_urls(item)
- util.add_extra_html_info(item)
- if info['current_tab'] == 'about':
- for i, (text, url) in enumerate(info['links']):
- if isinstance(url, str) and util.YOUTUBE_URL_RE.fullmatch(url):
- info['links'][i] = (text, util.prefix_url(url))
- def get_channel_first_page(base_url=None, tab='videos', channel_id=None):
- if channel_id:
- base_url = 'https://www.youtube.com/channel/' + channel_id
- return util.fetch_url(base_url + '/' + tab + '?pbj=1&view=0',
- headers_desktop, debug_name='gen_channel_' + tab)
- playlist_sort_codes = {'2': "da", '3': "dd", '4': "lad"}
- # youtube.com/[channel_id]/[tab]
- # youtube.com/user/[username]/[tab]
- # youtube.com/c/[custom]/[tab]
- # youtube.com/[custom]/[tab]
- def get_channel_page_general_url(base_url, tab, request, channel_id=None):
- page_number = int(request.args.get('page', 1))
- # sort 1: views
- # sort 2: oldest
- # sort 3: newest
- # sort 4: newest - no shorts (Just a kludge on our end, not internal to yt)
- default_sort = '3' if settings.include_shorts_in_channel else '4'
- sort = request.args.get('sort', default_sort)
- view = request.args.get('view', '1')
- query = request.args.get('query', '')
- ctoken = request.args.get('ctoken', '')
- include_shorts = (sort != '4')
- default_params = (page_number == 1 and sort in ('3', '4') and view == '1')
- continuation = bool(ctoken) # whether or not we're using a continuation
- page_size = 30
- try_channel_api = True
- polymer_json = None
- # Use the special UU playlist which contains all the channel's uploads
- if tab == 'videos' and sort in ('3', '4'):
- if not channel_id:
- channel_id = get_channel_id(base_url)
- if page_number == 1 and include_shorts:
- tasks = (
- gevent.spawn(playlist.playlist_first_page,
- 'UU' + channel_id[2:],
- report_text='Retrieved channel videos'),
- gevent.spawn(get_metadata, channel_id),
- )
- gevent.joinall(tasks)
- util.check_gevent_exceptions(*tasks)
- # Ignore the metadata for now, it is cached and will be
- # recalled later
- pl_json = tasks[0].value
- pl_info = yt_data_extract.extract_playlist_info(pl_json)
- number_of_videos = pl_info['metadata']['video_count']
- if number_of_videos is None:
- number_of_videos = 1000
- else:
- set_cached_number_of_videos(channel_id, number_of_videos)
- else:
- tasks = (
- gevent.spawn(playlist.get_videos, 'UU' + channel_id[2:],
- page_number, include_shorts=include_shorts),
- gevent.spawn(get_metadata, channel_id),
- gevent.spawn(get_number_of_videos_channel, channel_id),
- )
- gevent.joinall(tasks)
- util.check_gevent_exceptions(*tasks)
- pl_json = tasks[0].value
- pl_info = yt_data_extract.extract_playlist_info(pl_json)
- number_of_videos = tasks[2].value
- info = pl_info
- info['channel_id'] = channel_id
- info['current_tab'] = 'videos'
- if info['items']: # Success
- page_size = 100
- try_channel_api = False
- else: # Try the first-page method next
- try_channel_api = True
- # Use the regular channel API
- if tab in ('shorts', 'streams') or (tab=='videos' and try_channel_api):
- if channel_id:
- num_videos_call = (get_number_of_videos_channel, channel_id)
- else:
- num_videos_call = (get_number_of_videos_general, base_url)
- # Use ctoken method, which YouTube changes all the time
- if channel_id and not default_params:
- if sort == 4:
- _sort = 3
- else:
- _sort = sort
- page_call = (get_channel_tab, channel_id, page_number, _sort,
- tab, view, ctoken)
- # Use the first-page method, which won't break
- else:
- page_call = (get_channel_first_page, base_url, tab)
- tasks = (
- gevent.spawn(*num_videos_call),
- gevent.spawn(*page_call),
- )
- gevent.joinall(tasks)
- util.check_gevent_exceptions(*tasks)
- number_of_videos, polymer_json = tasks[0].value, tasks[1].value
- elif tab == 'about':
- # polymer_json = util.fetch_url(base_url + '/about?pbj=1', headers_desktop, debug_name='gen_channel_about')
- channel_id = get_channel_id(base_url)
- ctoken = channel_about_ctoken(channel_id)
- polymer_json = util.call_youtube_api('web', 'browse', {
- 'continuation': ctoken,
- })
- continuation=True
- elif tab == 'playlists' and page_number == 1:
- polymer_json = util.fetch_url(base_url+ '/playlists?pbj=1&view=1&sort=' + playlist_sort_codes[sort], headers_desktop, debug_name='gen_channel_playlists')
- elif tab == 'playlists':
- polymer_json = get_channel_tab(channel_id, page_number, sort,
- 'playlists', view)
- continuation = True
- elif tab == 'search' and channel_id:
- polymer_json = get_channel_search_json(channel_id, query, page_number)
- elif tab == 'search':
- url = base_url + '/search?pbj=1&query=' + urllib.parse.quote(query, safe='')
- polymer_json = util.fetch_url(url, headers_desktop, debug_name='gen_channel_search')
- elif tab == 'videos':
- pass
- else:
- flask.abort(404, 'Unknown channel tab: ' + tab)
- if polymer_json is not None:
- info = yt_data_extract.extract_channel_info(
- json.loads(polymer_json), tab, continuation=continuation
- )
- if info['error'] is not None:
- return flask.render_template('error.html', error_message=info['error'])
- if channel_id:
- info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
- info['channel_id'] = channel_id
- else:
- channel_id = info['channel_id']
- # Will have microformat present, cache metadata while we have it
- if channel_id and default_params and tab not in ('videos', 'about'):
- metadata = extract_metadata_for_caching(info)
- set_cached_metadata(channel_id, metadata)
- # Otherwise, populate with our (hopefully cached) metadata
- elif channel_id and info.get('channel_name') is None:
- metadata = get_metadata(channel_id)
- for key, value in metadata.items():
- yt_data_extract.conservative_update(info, key, value)
- # need to add this metadata to the videos/playlists
- additional_info = {
- 'author': info['channel_name'],
- 'author_id': info['channel_id'],
- 'author_url': info['channel_url'],
- }
- for item in info['items']:
- item.update(additional_info)
- if tab in ('videos', 'shorts', 'streams'):
- info['number_of_videos'] = number_of_videos
- info['number_of_pages'] = math.ceil(number_of_videos/page_size)
- info['header_playlist_names'] = local_playlist.get_playlist_names()
- if tab in ('videos', 'shorts', 'streams', 'playlists'):
- info['current_sort'] = sort
- elif tab == 'search':
- info['search_box_value'] = query
- info['header_playlist_names'] = local_playlist.get_playlist_names()
- if tab in ('search', 'playlists'):
- info['page_number'] = page_number
- info['subscribed'] = subscriptions.is_subscribed(info['channel_id'])
- post_process_channel_info(info)
- return flask.render_template('channel.html',
- parameters_dictionary = request.args,
- **info
- )
- @yt_app.route('/channel/<channel_id>/')
- @yt_app.route('/channel/<channel_id>/<tab>')
- def get_channel_page(channel_id, tab='videos'):
- return get_channel_page_general_url('https://www.youtube.com/channel/' + channel_id, tab, request, channel_id)
- @yt_app.route('/user/<username>/')
- @yt_app.route('/user/<username>/<tab>')
- def get_user_page(username, tab='videos'):
- return get_channel_page_general_url('https://www.youtube.com/user/' + username, tab, request)
- @yt_app.route('/c/<custom>/')
- @yt_app.route('/c/<custom>/<tab>')
- def get_custom_c_page(custom, tab='videos'):
- return get_channel_page_general_url('https://www.youtube.com/c/' + custom, tab, request)
- @yt_app.route('/<custom>')
- @yt_app.route('/<custom>/<tab>')
- def get_toplevel_custom_page(custom, tab='videos'):
- return get_channel_page_general_url('https://www.youtube.com/' + custom, tab, request)
|