Skip to content

Commit

Permalink
Introducing TwitterGuestAPIScraper
Browse files Browse the repository at this point in the history
  • Loading branch information
Yomguithereal committed Jan 17, 2024
1 parent 4779366 commit afc4292
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 78 deletions.
17 changes: 13 additions & 4 deletions ftest/twitter_scraping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from minet.twitter import TwitterAPIScraper
from minet.cli.console import console

scraper = TwitterAPIScraper("firefox")
# from minet.twitter import TwitterAPIScraper

for user in scraper.followers_you_know("794083798912827393"):
print(user)
# scraper = TwitterAPIScraper("firefox")

# for user in scraper.followers_you_know("794083798912827393"):
# print(user)

from minet.twitter import TwitterGuestAPIScraper

scraper = TwitterGuestAPIScraper()
tweet = scraper.tweet("1747650538865311986")

console.print(tweet, highlight=True)
2 changes: 1 addition & 1 deletion minet/twitter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from minet.twitter.api_client import TwitterAPIClient
from minet.twitter.api_scraper import TwitterAPIScraper
from minet.twitter.api_scraper import TwitterAPIScraper, TwitterGuestAPIScraper
293 changes: 220 additions & 73 deletions minet/twitter/api_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
TWITTER_PUBLIC_API_AUTH_HEADER,
)
from minet.twitter.exceptions import (
# TwitterPublicAPIGuestTokenError,
TwitterPublicAPIGuestTokenError,
TwitterPublicAPIBadRequest,
TwitterPublicAPIRateLimitError,
TwitterPublicAPIInvalidResponseError,
Expand Down Expand Up @@ -72,22 +72,21 @@ def is_cookie_valid(cookie: str) -> bool:
)


# def ensure_guest_token(method):
# def wrapped(self: "TwitterAPIScraper", *args, **kwargs):
def ensure_guest_token(method):
def wrapped(self: "TwitterGuestAPIScraper", *args, **kwargs):
# NOTE: we refresh the guest token periodically to avoid
# losing time wrt failed request after expiration
if (
self.guest_token is None
or self.guest_token_use_count >= MAX_GUEST_TOKEN_USE_COUNT
):
self.acquire_guest_token()
self.guest_token_use_count = 0

# # NOTE: we refresh the guest token periodically to avoid
# # losing time wrt failed request after expiration
# if (
# self.guest_token is None
# or self.guest_token_use_count >= MAX_GUEST_TOKEN_USE_COUNT
# ):
# self.acquire_guest_token()
# self.guest_token_use_count = 0
self.guest_token_use_count += 1
return method(self, *args, **kwargs)

# self.guest_token_use_count += 1
# return method(self, *args, **kwargs)

# return wrapped
return wrapped


def create_cookie_expiration():
Expand Down Expand Up @@ -336,14 +335,69 @@ def process_single_tweet(tweet_id, tweet_index, user_index):
return tweet


def recombobulate_tweet(tweet_root, entry_id=None):
__typename = tweet_root["__typename"]

if __typename == "TweetUnavailable":
return None

if __typename == "TweetWithVisibilityResults":
tweet_root = tweet_root["tweet"]

try:
tweet_meta = tweet_root["legacy"]
except KeyError:
raise TwitterPublicAPIParsingError(entry_id)

tweet_meta["source"] = tweet_root["source"]

# NOTE: tombstone not working anymore
# if tweet_meta is None:
# tweet_meta = getpath(
# entry, ["content", "item", "content", "tombstone", "tweet"]
# )

# Skipping ads
if "promotedMetadata" in tweet_meta:
return None

tweet_index = None
user_index = None

# NOTE: with new API results we need to build this index each time
tweet_index = {tweet_meta["id_str"]: tweet_meta}
user_index = {}

# Tweet user
user_index[tweet_meta["user_id_str"]] = tweet_root["core"]["user_results"][
"result"
]["legacy"]

# Quote
quoted_root = getpath(tweet_root, ["quoted_status_result", "result"])

if quoted_root is not None:
if quoted_root["__typename"] == "TweetWithVisibilityResults":
quoted_root = quoted_root["tweet"]

quoted_meta = quoted_root["legacy"]
quoted_meta["source"] = quoted_root["source"]
tweet_index[quoted_meta["id_str"]] = quoted_meta
user_index[quoted_meta["user_id_str"]] = quoted_root["core"]["user_results"][
"result"
]["legacy"]

for user_str_id, user_meta in user_index.items():
user_meta["id_str"] = user_str_id

return process_single_tweet(tweet_meta["id_str"], tweet_index, user_index)


def tweets_payload_iter(payload):
instructions = payload["data"]["search_by_raw_query"]["search_timeline"][
"timeline"
]["instructions"]

tweet_index = None
user_index = None

# with open("dump.json", "w") as f:
# import json

Expand Down Expand Up @@ -374,59 +428,11 @@ def tweets_payload_iter(payload):
continue
# raise TwitterPublicAPIParsingError

__typename = tweet_root["__typename"]

if __typename == "TweetUnavailable":
continue

if __typename == "TweetWithVisibilityResults":
tweet_root = tweet_root["tweet"]
tweet = recombobulate_tweet(tweet_root, entry_id=entry_id)

try:
tweet_meta = tweet_root["legacy"]
except KeyError:
raise TwitterPublicAPIParsingError(entry_id)

tweet_meta["source"] = tweet_root["source"]

# NOTE: tombstone not working anymore
# if tweet_meta is None:
# tweet_meta = getpath(
# entry, ["content", "item", "content", "tombstone", "tweet"]
# )

# Skipping ads
if "promotedMetadata" in tweet_meta:
if tweet is None:
continue

# NOTE: with new API results we need to build this index each time
tweet_index = {tweet_meta["id_str"]: tweet_meta}
user_index = {}

# Tweet user
user_index[tweet_meta["user_id_str"]] = tweet_root["core"]["user_results"][
"result"
]["legacy"]

# Quote
quoted_root = getpath(tweet_root, ["quoted_status_result", "result"])

if quoted_root is not None:
if quoted_root["__typename"] == "TweetWithVisibilityResults":
quoted_root = quoted_root["tweet"]

quoted_meta = quoted_root["legacy"]
quoted_meta["source"] = quoted_root["source"]
tweet_index[quoted_meta["id_str"]] = quoted_meta
user_index[quoted_meta["user_id_str"]] = quoted_root["core"][
"user_results"
]["result"]["legacy"]

for user_str_id, user_meta in user_index.items():
user_meta["id_str"] = user_str_id

tweet = process_single_tweet(tweet_meta["id_str"], tweet_index, user_index)

# Additional metadata
# NOTE: not working anymore
meta = None
Expand All @@ -447,7 +453,7 @@ def tweets_payload_iter(payload):
# =============================================================================
# Main class
# =============================================================================
class TwitterAPIScraper(object):
class TwitterAPIScraper:
def __init__(self, cookie: str):
self.pool_manager = create_pool_manager(
timeout=TWITTER_PUBLIC_API_DEFAULT_TIMEOUT, spoof_tls_ciphers=True
Expand Down Expand Up @@ -549,7 +555,7 @@ def request(self, url, headers=None, method="GET"):
# )

# @ensure_guest_token
def request_search(self, url):
def api_call(self, url):
headers = {
"Authorization": TWITTER_PUBLIC_API_AUTH_HEADER,
# "X-Guest-Token": self.guest_token,
Expand Down Expand Up @@ -611,7 +617,7 @@ def request_tweet_search(self, query, locale, cursor=None, refs=None, dump=False
# params = forge_search_params(query, cursor=cursor, target="tweets")
# url = "%s?%s" % (TWITTER_PUBLIC_SEARCH_ENDPOINT, params)

data = self.request_search(url)
data = self.api_call(url)

next_cursor = extract_cursor_from_tweets_payload(data)

Expand Down Expand Up @@ -669,7 +675,7 @@ def request_user_search(self, query, locale, cursor=None, dump=False):
# params = forge_search_params(query, cursor=cursor, target="users")
# url = "%s?%s" % (TWITTER_PUBLIC_SEARCH_ENDPOINT, params)

# data = self.request_search(url)
# data = self.api_call(url)

# next_cursor = extract_cursor_from_users_payload(data)

Expand All @@ -694,7 +700,7 @@ def request_followers_you_know(self, user_id, cursor=None):
quote(user_id), quote(cursor)
)

data = self.request_search(url)
data = self.api_call(url)

next_cursor, user_entries = extract_data_from_followers_you_know_payload(data)

Expand Down Expand Up @@ -791,3 +797,144 @@ def followers_you_know(self, user_id, locale=None):
return

cursor = next_cursor


class TwitterGuestAPIScraper:
def __init__(self):
self.pool_manager = create_pool_manager(
timeout=TWITTER_PUBLIC_API_DEFAULT_TIMEOUT, spoof_tls_ciphers=True
)

# NOTE: 10 calls per minute
self.rate_limiter_state = RateLimiterState(10, 60)

self.reset()

def epilog_builder(retry_state: RetryCallState):
exc = retry_state.outcome.exception()

if hasattr(exc, "format_epilog") and callable(exc.format_epilog):
return exc.format_epilog()

return None

self.retryer = create_request_retryer(
min=1,
additional_exceptions=[
TwitterPublicAPIRateLimitError,
TwitterPublicAPIInvalidResponseError,
TwitterPublicAPIOverCapacityError,
TwitterPublicAPIncompleteTweetIndexError,
TwitterPublicAPIncompleteUserIndexError,
TwitterPublicAPIHiccupError, # TODO: I might want to drop this at some point
],
epilog=epilog_builder,
)

def reset(self):
self.guest_token = None
self.guest_token_use_count = 0
self.cookie = None

@rate_limited_method()
def request(self, url, headers=None, method="GET"):
return request(
url,
pool_manager=self.pool_manager,
spoof_ua=True,
method=method,
headers=headers,
)

def acquire_guest_token(self):
headers = {
"Authorization": TWITTER_PUBLIC_API_AUTH_HEADER,
"Accept-Language": "en-US,en;q=0.5",
}

response = self.request(TWITTER_GUEST_ACTIVATE_ENDPOINT, headers, method="POST")

if response.status >= 400:
raise TwitterPublicAPIInvalidResponseError(
status=response.status, response_text=response.text
)

try:
api_token_response = response.json()
except JSONDecodeError:
raise TwitterPublicAPIInvalidResponseError(
status=response.status, response_text=response.text
)

guest_token = api_token_response.get("guest_token")

if guest_token is None:
raise TwitterPublicAPIGuestTokenError

self.guest_token = guest_token
self.cookie = response.headers[
"Set-Cookie"
] + ", gt=%s; Domain=.twitter.com; Path=/; Secure ; Expires=%s" % (
guest_token,
create_cookie_expiration(),
)

@ensure_guest_token
def api_call(self, url):
headers = {
"Authorization": TWITTER_PUBLIC_API_AUTH_HEADER,
"X-Guest-Token": self.guest_token,
"Cookie": self.cookie,
"Accept-Language": "en-US,en;q=0.5",
"X-Twitter-Active-User": "yes",
"X-Twitter-Client-Language": "en",
}

response = self.request(url, headers=headers)

if response.status == 429:
self.reset()
raise TwitterPublicAPIRateLimitError

if response.status in [401, 403]:
self.reset()
raise TwitterPublicAPIBadAuthError(response.status)

try:
data = response.json()
except JSONDecodeError:
raise TwitterPublicAPIInvalidResponseError(
status=response.status, response_text=response.text
)

if response.status >= 400:
error = getpath(data, ["errors", 0])

if error is not None and response.status == 400 and error.get("code") == 47:
raise TwitterPublicAPIBadRequest

if error is not None and error.get("code") == 130:
raise TwitterPublicAPIOverCapacityError

raise TwitterPublicAPIInvalidResponseError(
status=response.status, response_text=response.text
)

return data

def tweet(self, tweet_id, locale=None):
url = "https://api.twitter.com/graphql/8be30APApp7lds22UzVS7Q/TweetResultByRestId?variables=%7B%22tweetId%22%3A%22{}%22%2C%22withCommunity%22%3Afalse%2C%22includePromotedContent%22%3Afalse%2C%22withVoice%22%3Afalse%7D&features=%7B%22creator_subscriptions_tweet_preview_api_enabled%22%3Atrue%2C%22c9s_tweet_anatomy_moderator_badge_enabled%22%3Atrue%2C%22tweetypie_unmention_optimization_enabled%22%3Atrue%2C%22responsive_web_edit_tweet_api_enabled%22%3Atrue%2C%22graphql_is_translatable_rweb_tweet_is_translatable_enabled%22%3Atrue%2C%22view_counts_everywhere_api_enabled%22%3Atrue%2C%22longform_notetweets_consumption_enabled%22%3Atrue%2C%22responsive_web_twitter_article_tweet_consumption_enabled%22%3Afalse%2C%22tweet_awards_web_tipping_enabled%22%3Afalse%2C%22freedom_of_speech_not_reach_fetch_enabled%22%3Atrue%2C%22standardized_nudges_misinfo%22%3Atrue%2C%22tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled%22%3Atrue%2C%22rweb_video_timestamps_enabled%22%3Atrue%2C%22longform_notetweets_rich_text_read_enabled%22%3Atrue%2C%22longform_notetweets_inline_media_enabled%22%3Atrue%2C%22responsive_web_graphql_exclude_directive_enabled%22%3Atrue%2C%22verified_phone_label_enabled%22%3Afalse%2C%22responsive_web_media_download_video_enabled%22%3Afalse%2C%22responsive_web_graphql_skip_user_profile_image_extensions_enabled%22%3Afalse%2C%22responsive_web_graphql_timeline_navigation_enabled%22%3Atrue%2C%22responsive_web_enhance_cards_enabled%22%3Afalse%7D".format(
quote(tweet_id)
)

data = self.api_call(url)

tweet_root = getpath(data, ("data", "tweetResult", "result"))
tweet = recombobulate_tweet(tweet_root)
tweet = normalize_tweet(
tweet,
locale=locale,
collection_source="scraping",
)

return tweet

0 comments on commit afc4292

Please sign in to comment.