From afc4292f430b4465f05a4616640542a14566d4d5 Mon Sep 17 00:00:00 2001 From: Yomguithereal Date: Wed, 17 Jan 2024 17:30:24 +0100 Subject: [PATCH] Introducing TwitterGuestAPIScraper --- ftest/twitter_scraping.py | 17 +- minet/twitter/__init__.py | 2 +- minet/twitter/api_scraper.py | 293 ++++++++++++++++++++++++++--------- 3 files changed, 234 insertions(+), 78 deletions(-) diff --git a/ftest/twitter_scraping.py b/ftest/twitter_scraping.py index 36084b3d44..2aa3fd718d 100644 --- a/ftest/twitter_scraping.py +++ b/ftest/twitter_scraping.py @@ -1,6 +1,15 @@ -from minet.twitter import TwitterAPIScraper +from minet.cli.console import console -scraper = TwitterAPIScraper("firefox") +# from minet.twitter import TwitterAPIScraper -for user in scraper.followers_you_know("794083798912827393"): - print(user) +# scraper = TwitterAPIScraper("firefox") + +# for user in scraper.followers_you_know("794083798912827393"): +# print(user) + +from minet.twitter import TwitterGuestAPIScraper + +scraper = TwitterGuestAPIScraper() +tweet = scraper.tweet("1747650538865311986") + +console.print(tweet, highlight=True) diff --git a/minet/twitter/__init__.py b/minet/twitter/__init__.py index 6c15cd7860..d689b5e3b1 100644 --- a/minet/twitter/__init__.py +++ b/minet/twitter/__init__.py @@ -1,2 +1,2 @@ from minet.twitter.api_client import TwitterAPIClient -from minet.twitter.api_scraper import TwitterAPIScraper +from minet.twitter.api_scraper import TwitterAPIScraper, TwitterGuestAPIScraper diff --git a/minet/twitter/api_scraper.py b/minet/twitter/api_scraper.py index 5acbb232a3..cfb2987120 100644 --- a/minet/twitter/api_scraper.py +++ b/minet/twitter/api_scraper.py @@ -30,7 +30,7 @@ TWITTER_PUBLIC_API_AUTH_HEADER, ) from minet.twitter.exceptions import ( - # TwitterPublicAPIGuestTokenError, + TwitterPublicAPIGuestTokenError, TwitterPublicAPIBadRequest, TwitterPublicAPIRateLimitError, TwitterPublicAPIInvalidResponseError, @@ -72,22 +72,21 @@ def is_cookie_valid(cookie: str) -> bool: ) -# def ensure_guest_token(method): -# def wrapped(self: "TwitterAPIScraper", *args, **kwargs): +def ensure_guest_token(method): + def wrapped(self: "TwitterGuestAPIScraper", *args, **kwargs): + # NOTE: we refresh the guest token periodically to avoid + # losing time wrt failed request after expiration + if ( + self.guest_token is None + or self.guest_token_use_count >= MAX_GUEST_TOKEN_USE_COUNT + ): + self.acquire_guest_token() + self.guest_token_use_count = 0 -# # NOTE: we refresh the guest token periodically to avoid -# # losing time wrt failed request after expiration -# if ( -# self.guest_token is None -# or self.guest_token_use_count >= MAX_GUEST_TOKEN_USE_COUNT -# ): -# self.acquire_guest_token() -# self.guest_token_use_count = 0 + self.guest_token_use_count += 1 + return method(self, *args, **kwargs) -# self.guest_token_use_count += 1 -# return method(self, *args, **kwargs) - -# return wrapped + return wrapped def create_cookie_expiration(): @@ -336,14 +335,69 @@ def process_single_tweet(tweet_id, tweet_index, user_index): return tweet +def recombobulate_tweet(tweet_root, entry_id=None): + __typename = tweet_root["__typename"] + + if __typename == "TweetUnavailable": + return None + + if __typename == "TweetWithVisibilityResults": + tweet_root = tweet_root["tweet"] + + try: + tweet_meta = tweet_root["legacy"] + except KeyError: + raise TwitterPublicAPIParsingError(entry_id) + + tweet_meta["source"] = tweet_root["source"] + + # NOTE: tombstone not working anymore + # if tweet_meta is None: + # tweet_meta = getpath( + # entry, ["content", "item", "content", "tombstone", "tweet"] + # ) + + # Skipping ads + if "promotedMetadata" in tweet_meta: + return None + + tweet_index = None + user_index = None + + # NOTE: with new API results we need to build this index each time + tweet_index = {tweet_meta["id_str"]: tweet_meta} + user_index = {} + + # Tweet user + user_index[tweet_meta["user_id_str"]] = tweet_root["core"]["user_results"][ + "result" + ]["legacy"] + + # Quote + quoted_root = getpath(tweet_root, ["quoted_status_result", "result"]) + + if quoted_root is not None: + if quoted_root["__typename"] == "TweetWithVisibilityResults": + quoted_root = quoted_root["tweet"] + + quoted_meta = quoted_root["legacy"] + quoted_meta["source"] = quoted_root["source"] + tweet_index[quoted_meta["id_str"]] = quoted_meta + user_index[quoted_meta["user_id_str"]] = quoted_root["core"]["user_results"][ + "result" + ]["legacy"] + + for user_str_id, user_meta in user_index.items(): + user_meta["id_str"] = user_str_id + + return process_single_tweet(tweet_meta["id_str"], tweet_index, user_index) + + def tweets_payload_iter(payload): instructions = payload["data"]["search_by_raw_query"]["search_timeline"][ "timeline" ]["instructions"] - tweet_index = None - user_index = None - # with open("dump.json", "w") as f: # import json @@ -374,59 +428,11 @@ def tweets_payload_iter(payload): continue # raise TwitterPublicAPIParsingError - __typename = tweet_root["__typename"] - - if __typename == "TweetUnavailable": - continue - - if __typename == "TweetWithVisibilityResults": - tweet_root = tweet_root["tweet"] + tweet = recombobulate_tweet(tweet_root, entry_id=entry_id) - try: - tweet_meta = tweet_root["legacy"] - except KeyError: - raise TwitterPublicAPIParsingError(entry_id) - - tweet_meta["source"] = tweet_root["source"] - - # NOTE: tombstone not working anymore - # if tweet_meta is None: - # tweet_meta = getpath( - # entry, ["content", "item", "content", "tombstone", "tweet"] - # ) - - # Skipping ads - if "promotedMetadata" in tweet_meta: + if tweet is None: continue - # NOTE: with new API results we need to build this index each time - tweet_index = {tweet_meta["id_str"]: tweet_meta} - user_index = {} - - # Tweet user - user_index[tweet_meta["user_id_str"]] = tweet_root["core"]["user_results"][ - "result" - ]["legacy"] - - # Quote - quoted_root = getpath(tweet_root, ["quoted_status_result", "result"]) - - if quoted_root is not None: - if quoted_root["__typename"] == "TweetWithVisibilityResults": - quoted_root = quoted_root["tweet"] - - quoted_meta = quoted_root["legacy"] - quoted_meta["source"] = quoted_root["source"] - tweet_index[quoted_meta["id_str"]] = quoted_meta - user_index[quoted_meta["user_id_str"]] = quoted_root["core"][ - "user_results" - ]["result"]["legacy"] - - for user_str_id, user_meta in user_index.items(): - user_meta["id_str"] = user_str_id - - tweet = process_single_tweet(tweet_meta["id_str"], tweet_index, user_index) - # Additional metadata # NOTE: not working anymore meta = None @@ -447,7 +453,7 @@ def tweets_payload_iter(payload): # ============================================================================= # Main class # ============================================================================= -class TwitterAPIScraper(object): +class TwitterAPIScraper: def __init__(self, cookie: str): self.pool_manager = create_pool_manager( timeout=TWITTER_PUBLIC_API_DEFAULT_TIMEOUT, spoof_tls_ciphers=True @@ -549,7 +555,7 @@ def request(self, url, headers=None, method="GET"): # ) # @ensure_guest_token - def request_search(self, url): + def api_call(self, url): headers = { "Authorization": TWITTER_PUBLIC_API_AUTH_HEADER, # "X-Guest-Token": self.guest_token, @@ -611,7 +617,7 @@ def request_tweet_search(self, query, locale, cursor=None, refs=None, dump=False # params = forge_search_params(query, cursor=cursor, target="tweets") # url = "%s?%s" % (TWITTER_PUBLIC_SEARCH_ENDPOINT, params) - data = self.request_search(url) + data = self.api_call(url) next_cursor = extract_cursor_from_tweets_payload(data) @@ -669,7 +675,7 @@ def request_user_search(self, query, locale, cursor=None, dump=False): # params = forge_search_params(query, cursor=cursor, target="users") # url = "%s?%s" % (TWITTER_PUBLIC_SEARCH_ENDPOINT, params) - # data = self.request_search(url) + # data = self.api_call(url) # next_cursor = extract_cursor_from_users_payload(data) @@ -694,7 +700,7 @@ def request_followers_you_know(self, user_id, cursor=None): quote(user_id), quote(cursor) ) - data = self.request_search(url) + data = self.api_call(url) next_cursor, user_entries = extract_data_from_followers_you_know_payload(data) @@ -791,3 +797,144 @@ def followers_you_know(self, user_id, locale=None): return cursor = next_cursor + + +class TwitterGuestAPIScraper: + def __init__(self): + self.pool_manager = create_pool_manager( + timeout=TWITTER_PUBLIC_API_DEFAULT_TIMEOUT, spoof_tls_ciphers=True + ) + + # NOTE: 10 calls per minute + self.rate_limiter_state = RateLimiterState(10, 60) + + self.reset() + + def epilog_builder(retry_state: RetryCallState): + exc = retry_state.outcome.exception() + + if hasattr(exc, "format_epilog") and callable(exc.format_epilog): + return exc.format_epilog() + + return None + + self.retryer = create_request_retryer( + min=1, + additional_exceptions=[ + TwitterPublicAPIRateLimitError, + TwitterPublicAPIInvalidResponseError, + TwitterPublicAPIOverCapacityError, + TwitterPublicAPIncompleteTweetIndexError, + TwitterPublicAPIncompleteUserIndexError, + TwitterPublicAPIHiccupError, # TODO: I might want to drop this at some point + ], + epilog=epilog_builder, + ) + + def reset(self): + self.guest_token = None + self.guest_token_use_count = 0 + self.cookie = None + + @rate_limited_method() + def request(self, url, headers=None, method="GET"): + return request( + url, + pool_manager=self.pool_manager, + spoof_ua=True, + method=method, + headers=headers, + ) + + def acquire_guest_token(self): + headers = { + "Authorization": TWITTER_PUBLIC_API_AUTH_HEADER, + "Accept-Language": "en-US,en;q=0.5", + } + + response = self.request(TWITTER_GUEST_ACTIVATE_ENDPOINT, headers, method="POST") + + if response.status >= 400: + raise TwitterPublicAPIInvalidResponseError( + status=response.status, response_text=response.text + ) + + try: + api_token_response = response.json() + except JSONDecodeError: + raise TwitterPublicAPIInvalidResponseError( + status=response.status, response_text=response.text + ) + + guest_token = api_token_response.get("guest_token") + + if guest_token is None: + raise TwitterPublicAPIGuestTokenError + + self.guest_token = guest_token + self.cookie = response.headers[ + "Set-Cookie" + ] + ", gt=%s; Domain=.twitter.com; Path=/; Secure ; Expires=%s" % ( + guest_token, + create_cookie_expiration(), + ) + + @ensure_guest_token + def api_call(self, url): + headers = { + "Authorization": TWITTER_PUBLIC_API_AUTH_HEADER, + "X-Guest-Token": self.guest_token, + "Cookie": self.cookie, + "Accept-Language": "en-US,en;q=0.5", + "X-Twitter-Active-User": "yes", + "X-Twitter-Client-Language": "en", + } + + response = self.request(url, headers=headers) + + if response.status == 429: + self.reset() + raise TwitterPublicAPIRateLimitError + + if response.status in [401, 403]: + self.reset() + raise TwitterPublicAPIBadAuthError(response.status) + + try: + data = response.json() + except JSONDecodeError: + raise TwitterPublicAPIInvalidResponseError( + status=response.status, response_text=response.text + ) + + if response.status >= 400: + error = getpath(data, ["errors", 0]) + + if error is not None and response.status == 400 and error.get("code") == 47: + raise TwitterPublicAPIBadRequest + + if error is not None and error.get("code") == 130: + raise TwitterPublicAPIOverCapacityError + + raise TwitterPublicAPIInvalidResponseError( + status=response.status, response_text=response.text + ) + + return data + + def tweet(self, tweet_id, locale=None): + url = "https://api.twitter.com/graphql/8be30APApp7lds22UzVS7Q/TweetResultByRestId?variables=%7B%22tweetId%22%3A%22{}%22%2C%22withCommunity%22%3Afalse%2C%22includePromotedContent%22%3Afalse%2C%22withVoice%22%3Afalse%7D&features=%7B%22creator_subscriptions_tweet_preview_api_enabled%22%3Atrue%2C%22c9s_tweet_anatomy_moderator_badge_enabled%22%3Atrue%2C%22tweetypie_unmention_optimization_enabled%22%3Atrue%2C%22responsive_web_edit_tweet_api_enabled%22%3Atrue%2C%22graphql_is_translatable_rweb_tweet_is_translatable_enabled%22%3Atrue%2C%22view_counts_everywhere_api_enabled%22%3Atrue%2C%22longform_notetweets_consumption_enabled%22%3Atrue%2C%22responsive_web_twitter_article_tweet_consumption_enabled%22%3Afalse%2C%22tweet_awards_web_tipping_enabled%22%3Afalse%2C%22freedom_of_speech_not_reach_fetch_enabled%22%3Atrue%2C%22standardized_nudges_misinfo%22%3Atrue%2C%22tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled%22%3Atrue%2C%22rweb_video_timestamps_enabled%22%3Atrue%2C%22longform_notetweets_rich_text_read_enabled%22%3Atrue%2C%22longform_notetweets_inline_media_enabled%22%3Atrue%2C%22responsive_web_graphql_exclude_directive_enabled%22%3Atrue%2C%22verified_phone_label_enabled%22%3Afalse%2C%22responsive_web_media_download_video_enabled%22%3Afalse%2C%22responsive_web_graphql_skip_user_profile_image_extensions_enabled%22%3Afalse%2C%22responsive_web_graphql_timeline_navigation_enabled%22%3Atrue%2C%22responsive_web_enhance_cards_enabled%22%3Afalse%7D".format( + quote(tweet_id) + ) + + data = self.api_call(url) + + tweet_root = getpath(data, ("data", "tweetResult", "result")) + tweet = recombobulate_tweet(tweet_root) + tweet = normalize_tweet( + tweet, + locale=locale, + collection_source="scraping", + ) + + return tweet