Skip to content

Commit

Permalink
Better yt channel-links command
Browse files Browse the repository at this point in the history
  • Loading branch information
Yomguithereal committed Dec 20, 2023
1 parent 01e19bd commit 7741328
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
6 changes: 3 additions & 3 deletions minet/cli/youtube/channel_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


@with_enricher_and_loading_bar(
headers=["url"],
headers=["title", "url"],
title="Retrieving channel links",
unit="channels",
sub_unit="links",
Expand All @@ -19,5 +19,5 @@ def action(cli_args, enricher, loading_bar):
if links is None:
continue

for link in links:
enricher.writerow(row, [link])
for title, link in links:
enricher.writerow(row, [title, link])
43 changes: 34 additions & 9 deletions minet/youtube/scraper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import List, Set, Tuple, Optional
from typing import List, Tuple, Optional, Iterator

import re
import json
from html import unescape
from urllib.parse import unquote
from ural import infer_redirection
from ebbe import getpath

from minet.scrape import WonderfulSoup
from minet.web import (
Expand All @@ -22,26 +23,42 @@

CAPTION_TRACKS_RE = re.compile(r'"captionTracks":(\[.*?\])')
INITIAL_DATA_RE = re.compile(
rb"(?:const|let|var)\s+ytInitialData\s*=\s*({.+});</script>"
rb"(?:const|let|var)\s+ytInitialData\s*=\s*({.+})\s*;</script>"
)


def gather_url_endpoints(data):
def gather_external_links(data) -> Iterator[Tuple[str, str]]:
if isinstance(data, dict):
for k, v in data.items():
if k == "urlEndpoint":
if k == "channelExternalLinkViewModel":
if not isinstance(v, dict):
return

yield infer_redirection(v["url"])
yield (
getpath(v, ("title", "content")),
infer_redirection(
getpath(
v,
(
"link",
"commandRuns",
0,
"onTap",
"innertubeCommand",
"urlEndpoint",
"url",
),
)
),
)

return

yield from gather_url_endpoints(v)
yield from gather_external_links(v)

elif isinstance(data, list):
for v in data:
yield from gather_url_endpoints(v)
yield from gather_external_links(v)

else:
return
Expand Down Expand Up @@ -152,7 +169,12 @@ def get_channel_id(self, channel_url: str) -> Optional[str]:

return None

def get_channel_links(self, channel_url: str) -> Optional[Set[str]]:
def get_channel_links(self, channel_url: str) -> Optional[List[Tuple[str, str]]]:
# NOTE: for some weird reason, the /about page has more info in
# the ytInitialData global variable even if visual content is
# strictly identical.
channel_url = channel_url.split("?", 1)[0].split("#")[0].rstrip("/") + "/about"

response = self.request(channel_url, spoof_ua=True)

match = INITIAL_DATA_RE.search(response.body)
Expand All @@ -165,4 +187,7 @@ def get_channel_links(self, channel_url: str) -> Optional[Set[str]]:
except json.JSONDecodeError:
return None

return set(gather_url_endpoints(data))
# with open("./dump.json", "w") as f:
# json.dump(data, f, ensure_ascii=False, indent=2)

return list(gather_external_links(data))
4 changes: 3 additions & 1 deletion test/scraper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,4 +1085,6 @@ def basic_optional_scalar() -> Optional[str]:
assert infer_fieldnames_from_function_return_type(basic_float) == ["value"]
assert infer_fieldnames_from_function_return_type(basic_bool) == ["value"]
assert infer_fieldnames_from_function_return_type(basic_void) == ["value"]
assert infer_fieldnames_from_function_return_type(basic_optional_scalar) == ["value"]
assert infer_fieldnames_from_function_return_type(basic_optional_scalar) == [
"value"
]

0 comments on commit 7741328

Please sign in to comment.