Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 82 additions & 71 deletions src/Osintgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,26 @@ def __init__(self, target, is_file, is_json, is_cli, output_dir, clear_cookies):
self.writeFile = is_file
self.jsonDump = is_json

def clear_cookies(self,clear_cookies):
def clear_cookies(self, clear_cookies):
if clear_cookies:
self.clear_cache()

def setTarget(self, target):
if not self.api:
print("API client is not initialized.")
return
self.target = target
user = self.get_user(target)
self.target_id = user['id']
self.is_private = user['is_private']
self.following = self.check_following()
self.__printTargetBanner__()
self.output_dir = self.output_dir + "/" + str(self.target)
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
if user:
self.target_id = user['id']
self.is_private = user['is_private']
self.following = self.check_following()
self.__printTargetBanner__()
self.output_dir = self.output_dir + "/" + str(self.target)
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
else:
print(f"Failed to get user info for target: {target}")


def __get_feed__(self):
data = []
Expand Down Expand Up @@ -1047,29 +1054,17 @@ def get_people_tagged_by_user(self):

def get_user(self, username):
try:
content = self.api.username_info(username)
if self.writeFile:
file_name = self.output_dir + "/" + self.target + "_user_id.txt"
file = open(file_name, "w")
file.write(str(content['user']['pk']))
file.close()

user = dict()
user['id'] = content['user']['pk']
user['is_private'] = content['user']['is_private']

return user
if not self.api:
print("API client is not initialized.")
return None
user_info = self.api.username_info(username)
return user_info['user']
except ClientError as e:
pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED)
error = json.loads(e.error_response)
if 'message' in error:
print(error['message'])
if 'error_title' in error:
print(error['error_title'])
if 'challenge' in error:
print("Please follow this link to complete the challenge: " + error['challenge']['url'])
sys.exit(2)

print(f"Client error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None

def set_write_file(self, flag):
if flag:
Expand Down Expand Up @@ -1101,63 +1096,79 @@ def login(self, u, p):
if not os.path.isfile(settings_file):
# settings file does not exist
print(f'Unable to find file: {settings_file!s}')

# login new
self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p,
on_login=lambda x: self.onlogin_callback(x, settings_file))

on_login=lambda x: self.onlogin_callback(x, settings_file))
else:
with open(settings_file) as file_data:
cached_settings = json.load(file_data, object_hook=self.from_json)
# print('Reusing settings: {0!s}'.format(settings_file))

# reuse auth settings
self.api = AppClient(
username=u, password=p,
settings=cached_settings,
on_login=lambda x: self.onlogin_callback(x, settings_file))

with open(settings_file, 'rb') as file_data:
try:
content = file_data.read()
if not content.strip().startswith(b'{'):
raise json.JSONDecodeError("Invalid JSON format", content, 0)

cached_settings = json.loads(content.decode('utf-8'), object_hook=self.from_json)
print(f'Reusing settings: {settings_file}')
except (json.JSONDecodeError, Exception) as e:
print(f'Error parsing settings file: {e}')
cached_settings = None

if cached_settings:
try:
# reuse auth settings
self.api = AppClient(
username=u, password=p,
settings=cached_settings,
on_login=lambda x: self.onlogin_callback(x, settings_file))
except Exception as e:
print(f"Error using cached settings, logging in again: {e}")
self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p,
on_login=lambda x: self.onlogin_callback(x, settings_file))
else:
# if settings are corrupted, proceed with fresh login
self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p,
on_login=lambda x: self.onlogin_callback(x, settings_file))
except (ClientCookieExpiredError, ClientLoginRequiredError) as e:
print(f'ClientCookieExpiredError/ClientLoginRequiredError: {e!s}')

# Login expired
# Do relogin but use default ua, keys and such
self.api = AppClient(auto_patch=True, authenticate=True, username=u, password=p,
on_login=lambda x: self.onlogin_callback(x, settings_file))

on_login=lambda x: self.onlogin_callback(x, settings_file))
except ClientError as e:
pc.printout('ClientError {0!s} (Code: {1:d}, Response: {2!s})'.format(e.msg, e.code, e.error_response), pc.RED)
error = json.loads(e.error_response)
pc.printout(error['message'], pc.RED)
pc.printout(": ", pc.RED)
pc.printout(e.msg, pc.RED)
pc.printout("\n")
if 'challenge' in error:
print("Please follow this link to complete the challenge: " + error['challenge']['url'])
exit(9)
print(f"Client error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")

def to_json(self, python_object):
if isinstance(python_object, bytes):
return {'__class__': 'bytes',
'__value__': codecs.encode(python_object, 'base64').decode()}
raise TypeError(repr(python_object) + ' is not JSON serializable')
def onlogin_callback(self, api, settings_file):
try:
with open(settings_file, 'w', encoding='utf-8') as outfile:
json.dump(api.settings, outfile, default=self.to_json, indent=4)
print(f'Login settings saved to: {settings_file}')
except Exception as e:
print(f'Error saving settings to file: {e}')

def to_json(self, obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode('utf-8', errors='ignore')
return obj.__dict__ if hasattr(obj, '__dict__') else str(obj)

def from_json(self, json_object):
if '__class__' in json_object and json_object['__class__'] == 'bytes':
return codecs.decode(json_object['__value__'].encode(), 'base64')
if 'password' in json_object:
del json_object['password']
return json_object

def onlogin_callback(self, api, new_settings_file):
cache_settings = api.settings
with open(new_settings_file, 'w') as outfile:
json.dump(cache_settings, outfile, default=self.to_json)
# print('SAVED: {0!s}'.format(new_settings_file))

def check_following(self):
if str(self.target_id) == self.api.authenticated_user_id:
return True
endpoint = 'users/{user_id!s}/full_detail_info/'.format(**{'user_id': self.target_id})
return self.api._call_api(endpoint)['user_detail']['user']['friendship_status']['following']
try:
return self.api.friendships_show(self.target_id)['following']
except ClientError as e:
print(f"Client error: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False

def check_private_profile(self):
if self.is_private and not self.following:
Expand Down