From f70a6c6a775559085ea37bb65bf9cc0a271a0579 Mon Sep 17 00:00:00 2001 From: Drake Date: Thu, 13 Jun 2024 15:04:53 -0400 Subject: [PATCH] Fix 404 error handling and JSON parsing issue --- src/Osintgram.py | 153 +++++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/src/Osintgram.py b/src/Osintgram.py index d1e5571b..12336b09 100644 --- a/src/Osintgram.py +++ b/src/Osintgram.py @@ -48,19 +48,26 @@ def __init__(self, target, is_file, is_json, is_cli, output_dir, clear_cookies): self.writeFile = is_file self.jsonDump = is_json - def clear_cookies(self,clear_cookies): + def clear_cookies(self, clear_cookies): if clear_cookies: self.clear_cache() def setTarget(self, target): + if not self.api: + print("API client is not initialized.") + return self.target = target user = self.get_user(target) - self.target_id = user['id'] - self.is_private = user['is_private'] - self.following = self.check_following() - self.__printTargetBanner__() - self.output_dir = self.output_dir + "/" + str(self.target) - Path(self.output_dir).mkdir(parents=True, exist_ok=True) + if user: + self.target_id = user['id'] + self.is_private = user['is_private'] + self.following = self.check_following() + self.__printTargetBanner__() + self.output_dir = self.output_dir + "/" + str(self.target) + Path(self.output_dir).mkdir(parents=True, exist_ok=True) + else: + print(f"Failed to get user info for target: {target}") + def __get_feed__(self): data = [] @@ -1047,29 +1054,17 @@ def get_people_tagged_by_user(self): def get_user(self, username): try: - content = self.api.username_info(username) - if self.writeFile: - file_name = self.output_dir + "/" + self.target + "_user_id.txt" - file = open(file_name, "w") - file.write(str(content['user']['pk'])) - file.close() - - user = dict() - user['id'] = content['user']['pk'] - user['is_private'] = content['user']['is_private'] - - return user + if not self.api: + print("API client is not initialized.") + return None + user_info = self.api.username_info(username) + return user_info['user'] except ClientError as e: - pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED) - error = json.loads(e.error_response) - if 'message' in error: - print(error['message']) - if 'error_title' in error: - print(error['error_title']) - if 'challenge' in error: - print("Please follow this link to complete the challenge: " + error['challenge']['url']) - sys.exit(2) - + print(f"Client error: {e}") + return None + except Exception as e: + print(f"Unexpected error: {e}") + return None def set_write_file(self, flag): if flag: @@ -1101,63 +1096,79 @@ def login(self, u, p): if not os.path.isfile(settings_file): # settings file does not exist print(f'Unable to find file: {settings_file!s}') - + # login new self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p, - on_login=lambda x: self.onlogin_callback(x, settings_file)) - + on_login=lambda x: self.onlogin_callback(x, settings_file)) else: - with open(settings_file) as file_data: - cached_settings = json.load(file_data, object_hook=self.from_json) - # print('Reusing settings: {0!s}'.format(settings_file)) - - # reuse auth settings - self.api = AppClient( - username=u, password=p, - settings=cached_settings, - on_login=lambda x: self.onlogin_callback(x, settings_file)) - + with open(settings_file, 'rb') as file_data: + try: + content = file_data.read() + if not content.strip().startswith(b'{'): + raise json.JSONDecodeError("Invalid JSON format", content, 0) + + cached_settings = json.loads(content.decode('utf-8'), object_hook=self.from_json) + print(f'Reusing settings: {settings_file}') + except (json.JSONDecodeError, Exception) as e: + print(f'Error parsing settings file: {e}') + cached_settings = None + + if cached_settings: + try: + # reuse auth settings + self.api = AppClient( + username=u, password=p, + settings=cached_settings, + on_login=lambda x: self.onlogin_callback(x, settings_file)) + except Exception as e: + print(f"Error using cached settings, logging in again: {e}") + self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p, + on_login=lambda x: self.onlogin_callback(x, settings_file)) + else: + # if settings are corrupted, proceed with fresh login + self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p, + on_login=lambda x: self.onlogin_callback(x, settings_file)) except (ClientCookieExpiredError, ClientLoginRequiredError) as e: print(f'ClientCookieExpiredError/ClientLoginRequiredError: {e!s}') - + # Login expired # Do relogin but use default ua, keys and such self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p, - on_login=lambda x: self.onlogin_callback(x, settings_file)) - + on_login=lambda x: self.onlogin_callback(x, settings_file)) except ClientError as e: - pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED) - error = json.loads(e.error_response) - pc.printout(error['message'], pc.RED) - pc.printout(": ", pc.RED) - pc.printout(e.msg, pc.RED) - pc.printout("\n") - if 'challenge' in error: - print("Please follow this link to complete the challenge: " + error['challenge']['url']) - exit(9) + print(f"Client error: {e}") + except Exception as e: + print(f"Unexpected error: {e}") - def to_json(self, python_object): - if isinstance(python_object, bytes): - return {'__class__': 'bytes', - '__value__': codecs.encode(python_object, 'base64').decode()} - raise TypeError(repr(python_object) + ' is not JSON serializable') + def onlogin_callback(self, api, settings_file): + try: + with open(settings_file, 'w', encoding='utf-8') as outfile: + json.dump(api.settings, outfile, default=self.to_json, indent=4) + print(f'Login settings saved to: {settings_file}') + except Exception as e: + print(f'Error saving settings to file: {e}') + + def to_json(self, obj): + if isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + if isinstance(obj, bytes): + return obj.decode('utf-8', errors='ignore') + return obj.__dict__ if hasattr(obj, '__dict__') else str(obj) def from_json(self, json_object): - if '__class__' in json_object and json_object['__class__'] == 'bytes': - return codecs.decode(json_object['__value__'].encode(), 'base64') + if 'password' in json_object: + del json_object['password'] return json_object - def onlogin_callback(self, api, new_settings_file): - cache_settings = api.settings - with open(new_settings_file, 'w') as outfile: - json.dump(cache_settings, outfile, default=self.to_json) - # print('SAVED: {0!s}'.format(new_settings_file)) - def check_following(self): - if str(self.target_id) == self.api.authenticated_user_id: - return True - endpoint = 'users/{user_id!s}/full_detail_info/'.format(**{'user_id': self.target_id}) - return self.api._call_api(endpoint)['user_detail']['user']['friendship_status']['following'] + try: + return self.api.friendships_show(self.target_id)['following'] + except ClientError as e: + print(f"Client error: {e}") + return False + except Exception as e: + print(f"Unexpected error: {e}") + return False def check_private_profile(self): if self.is_private and not self.following: