Skip to content
This repository was archived by the owner on Aug 15, 2022. It is now read-only.

Added optional parameters to STOP(), added a Logger, added a try/except around parse_args() to prevent a crash #52

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 75 additions & 93 deletions ItsAGramLive/ItsAGramLive.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import tempfile
import pyperclip
import requests
import logging
# Turn off InsecureRequestWarning
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from PIL import Image

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

Expand Down Expand Up @@ -60,14 +60,20 @@ class ItsAGramLive:
IG_SIG_KEY = '4f8732eb9ba7d1c8e8897a75d6474d4eb3f5279137431b2aafb71fafe2abe178'
SIG_KEY_VERSION = '4'

def __init__(self, username='', password=''):
def __init__(self, username='', password='', logging_level='INFO'):
logging.basicConfig(level=logging_level)
logging.getLogger("urllib3").setLevel(logging_level)

if bool(username) is False and bool(password) is False:
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument("-u", "--username", type=str, help="username", required=True)
parser.add_argument("-p", "--password", type=str, help="password", required=True)
parser.add_argument("-proxy", type=str, help="Proxy format - user:password@ip:port", default=None)
args = parser.parse_args()
try:
args = parser.parse_args()
except SystemExit:
logging.fatal('Error while parsing arguments. Did you provide your Username & Password ?')
raise Exception('Credentials not provided')

username = args.username
password = args.password
Expand All @@ -88,6 +94,7 @@ def __init__(self, username='', password=''):
'User-Agent': self.USER_AGENT,
}


def set_user(self, username, password):
self.username = username
self.password = password
Expand Down Expand Up @@ -127,6 +134,7 @@ def get_code_challenge_required(self, path, choice=0):
self.send_request(path, self.generate_signature(json.dumps(data)), True)

def login(self, force=False):
logging.info('Logging in.')
if not self.isLoggedIn or force:
if self.send_request(endpoint='si/fetch_headers/?challenge_type=signup&guid=' + self.generate_UUID(False),
login=True):
Expand All @@ -142,24 +150,25 @@ def login(self, force=False):
if self.send_request('accounts/login/', post=self.generate_signature(json.dumps(data)), login=True):
if "error_type" in self.LastJson:
if self.LastJson['error_type'] == 'bad_password':
print(self.LastJson['message'])
logging.error(self.LastJson['message'])
return False

if "two_factor_required" not in self.LastJson:
self.isLoggedIn = True
self.username_id = self.LastJson["logged_in_user"]["pk"]
self.rank_token = "%s_%s" % (self.username_id, self.uuid)
self.token = self.LastResponse.cookies["csrftoken"]
logging.info('Logged in.')
return True
else:
if self.two_factor():
self.isLoggedIn = True
self.username_id = self.LastJson["logged_in_user"]["pk"]
self.rank_token = "%s_%s" % (self.username_id, self.uuid)
self.token = self.LastResponse.cookies["csrftoken"]
logging.info('Logged in.')
return True

return False
raise Exception("bad_password")

def two_factor(self):
# verification_method': 0 works for sms and TOTP. why? ¯\_ಠ_ಠ_/¯
Expand Down Expand Up @@ -201,29 +210,30 @@ def send_request(self, endpoint, post=None, login=False, headers: dict = {}):

break
except Exception as e:
print('* Except on SendRequest (wait 60 sec and resend): {}'.format(str(e)))
logging.warning('* Except on SendRequest (wait 60 sec and resend): {}'.format(str(e)))
time.sleep(60)

if self.LastResponse.status_code == 200:
return True
elif 'two_factor_required' in self.LastJson and self.LastResponse.status_code == 400:
# even the status code isn't 200 return True if the 2FA is required
if self.LastJson['two_factor_required']:
print("Two factor required")
logging.info("Two factor required")
return True
elif 'message' in self.LastJson and self.LastResponse.status_code == 400 and self.LastJson['message'] == 'challenge_required':
path = self.LastJson['challenge']['api_path'][1:]
choice = int(input('Choose a challenge mode (0 - SMS, 1 - Email): '))
self.get_code_challenge_required(path, choice)
code = input('Enter the code: ')
self.set_code_challenge_required(path, code)
# choice = int(input('Choose a challenge mode (0 - SMS, 1 - Email): '))
self.get_code_challenge_required(path, 1)
raise Exception("challenge_required")
# code = input('Enter the code: ')
# self.set_code_challenge_required(path, code)
# if message is 'Pre-allocated media not Found.'
else:
error_message = " - "
if "message" in self.LastJson:
error_message = self.LastJson['message']
print('* ERROR({}): {}'.format(self.LastResponse.status_code, error_message))
print(self.LastResponse)
logging.error('* ERROR({}): {}'.format(self.LastResponse.status_code, error_message))
logging.error(self.LastResponse)
return False

def set_proxy(self, proxy=None):
Expand All @@ -243,27 +253,27 @@ def generate_signature(self, data, skip_quote=False):
self.IG_SIG_KEY.encode('utf-8'), data.encode('utf-8'), hashlib.sha256).hexdigest() + '.' + parsed_data

def start(self):
print("Let's do it!")
logging.info("Starting, let's do it !")
if not self.login():
print("Error {}".format(self.LastResponse.status_code))
print(json.loads(self.LastResponse.text).get("message"))
logging.error("Error {}".format(self.LastResponse.status_code))
logging.error(json.loads(self.LastResponse.text).get("message"))
else:
print("You'r logged in")
logging.info("You're logged in.")

if self.create_broadcast():
print("Broadcast ID: {}")
print("* Broadcast ID: {}".format(self.broadcast_id))
print("* Server URL: {}".format(self.stream_server))
print("* Server Key: {}".format(self.stream_key))
logging.info("Broadcast ID: {}")
logging.info("* Broadcast ID: {}".format(self.broadcast_id))
logging.info("* Server URL: {}".format(self.stream_server))
logging.info("* Server Key: {}".format(self.stream_key))

try:
pyperclip.copy(self.stream_key)
print("The stream key was automatically copied to your clipboard")
logging.info("The stream key was automatically copied to your clipboard")
except pyperclip.PyperclipException as headless_error:
print("Could not find a copy/paste mechanism for your system")
logging.error("Could not find a copy/paste mechanism for your system")
pass

print("Press Enter after your setting your streaming software.")
logging.info("Press Enter after your setting your streaming software.")

if self.start_broadcast():
self.is_running = True
Expand All @@ -285,7 +295,7 @@ def start(self):

elif cmd == 'viewers':
users, ids = self.get_viewer_list()
print(users)
logging.info(users)

elif cmd == 'comments':
self.get_comments()
Expand All @@ -295,7 +305,7 @@ def start(self):
if to_send:
self.pin_comment(to_send)
else:
print('usage: chat <text to chat>')
logging.info('usage: chat <text to chat>')

elif cmd[:5] == 'unpin':
self.unpin_comment()
Expand All @@ -305,13 +315,13 @@ def start(self):
if to_send:
self.send_comment(to_send)
else:
print('usage: chat <text to chat>')
logging.info('usage: chat <text to chat>')

elif cmd == 'wave':
users, ids = self.get_viewer_list()
for i in range(len(users)):
print(f'{i + 1}. {users[i]}')
print('Type number according to user e.g 1.')
logging.info(f'{i + 1}. {users[i]}')
logging.info('Type number according to user e.g 1.')
while True:
cmd = input('number> ')

Expand All @@ -322,10 +332,10 @@ def start(self):
self.wave(ids[user_id])
break
except:
print('Please type number e.g 1')
logging.error('Please type number e.g 1')

else:
print(
logging.info(
'Available commands:\n\t '
'"stop"\n\t '
'"mute comments"\n\t '
Expand Down Expand Up @@ -360,17 +370,17 @@ def live_info(self):
if self.send_request("live/{}/info/".format(self.broadcast_id)):
viewer_count = self.LastJson['viewer_count']

print("[*]Broadcast ID: {}".format(self.broadcast_id))
print("[*]Server URL: {}".format(self.stream_server))
print("[*]Stream Key: {}".format(self.stream_key))
print("[*]Viewer Count: {}".format(viewer_count))
print("[*]Status: {}".format(self.LastJson['broadcast_status']))
logging.info("[*]Broadcast ID: {}".format(self.broadcast_id))
logging.info("[*]Server URL: {}".format(self.stream_server))
logging.info("[*]Stream Key: {}".format(self.stream_key))
logging.info("[*]Viewer Count: {}".format(viewer_count))
logging.info("[*]Status: {}".format(self.LastJson['broadcast_status']))

def mute_comments(self):
data = json.dumps({'_uuid': self.uuid, '_uid': self.username_id, '_csrftoken': self.token})
if self.send_request(endpoint='live/{}/mute_comment/'.format(self.broadcast_id),
post=self.generate_signature(data)):
print("Comments muted")
logging.info("Comments muted")
return True

return False
Expand All @@ -379,7 +389,7 @@ def unmute_comment(self):
data = json.dumps({'_uuid': self.uuid, '_uid': self.username_id, '_csrftoken': self.token})
if self.send_request(endpoint='live/{}/unmute_comment/'.format(self.broadcast_id),
post=self.generate_signature(data)):
print("Comments un-muted")
logging.info("Comments un-muted")
return True

return False
Expand All @@ -398,6 +408,7 @@ def send_comment(self, msg):
return False

def create_broadcast(self):
logging.info("Creating broadcast...")
data = json.dumps({'_uuid': self.uuid,
'_uid': self.username_id,
'preview_height': self.previewHeight,
Expand All @@ -416,21 +427,25 @@ def create_broadcast(self):
self.stream_server = upload_url[0]
self.stream_key = "{}{}".format(str(self.broadcast_id), upload_url[1])

return True
logging.info("Broadcast created.")
return self

else:

logging.error("Error while creating broadcast.")
return False

def start_broadcast(self):
logging.info("Starting broadcast...")
data = json.dumps({'_uuid': self.uuid,
'_uid': self.username_id,
'should_send_notifications': 1,
'_csrftoken': self.token})

if self.send_request(endpoint='live/' + str(self.broadcast_id) + '/start/', post=self.generate_signature(data)):
logging.info("Broadcast started.")
return True
else:
logging.error("Error while starting broadcast.")
return False

def end_broadcast(self):
Expand All @@ -444,45 +459,6 @@ def get_post_live_thumbnails(self):
if self.send_request(endpoint="live/{}/get_post_live_thumbnails/".format(self.broadcast_id)):
return self.LastJson.get("thumbnails")[int(len(self.LastJson.get("thumbnails")) / 2)]

def upload_live_thumbnails(self):
im1 = Image.open(requests.get(self.get_post_live_thumbnails(), stream=True).raw)
size = 1080, 1920
im1 = im1.resize(size, Image.ANTIALIAS)
upload_id = str(int(time.time() * 1000))
link = os.path.join(tempfile.gettempdir(), "{}.jpg".format(upload_id))
im1.save(link, "JPEG", quality=100)

upload_idforurl = "{}_0_{}".format(upload_id, str(hash(os.path.basename(link))))

rupload_params = {
"retry_context": '{"num_step_auto_retry":0,"num_reupload":0,"num_step_manual_retry":0}',
"media_type": "1",
"broadcast_id": self.broadcast_id,
"is_post_live_igtv":"1",
"xsharing_user_ids": "[]",
"upload_id": upload_id,
"image_compression": json.dumps(
{"lib_name": "moz", "lib_version": "3.1.m", "quality": "80"}
),
}

h = {
"Accept-Encoding": "gzip",
"X-Instagram-Rupload-Params": json.dumps(rupload_params),
"X-Entity-Type": "image/jpeg",
"Offset": "0",
"X-Entity-Name": upload_idforurl,
"X-Entity-Length": str(os.path.getsize(link)),
"Content-Type": "application/octet-stream",
"Content-Length": str(os.path.getsize(link)),
"Accept-Encoding": "gzip",
}

data = open(link, 'rb').read()

if self.send_request(endpoint="../../rupload_igphoto/{}".format(upload_idforurl), post=data, headers=h):
if self.LastJson.get('status') == 'ok':
return self.LastJson.get('upload_id')

def add_post_live_to_igtv(self, description, title):
self.end_broadcast()
Expand Down Expand Up @@ -541,30 +517,36 @@ def add_post_live_to_igtv(self, description, title):
}

if self.send_request(endpoint='media/configure_to_igtv/', post=self.generate_signature(data), headers=h):
print('Live Posted to Story!')
logging.info('Live Posted to Story!')
return True
return False

def stop(self):
print('Save Live replay to IGTV ? <y/n>')
save = input('command> ')
if save == 'y':
title = input("Title: ")
description = input("Description: ")
def stop(self, save_to_igtv: bool = None, title = 'Live video', description = 'Live Instagram video.'):
final_save = save_to_igtv

if final_save is None:
logging.info('Save Live replay to IGTV ? <y/n>')
save_to_igtv = input('command> ')
if save_to_igtv == 'y':
final_save = 1
title = input("Title: ")
description = input("Description: ")

if final_save:
self.add_post_live_to_igtv(description, title)

print('Exiting...')
logging.info('Ending broadcast...')
self.end_broadcast()
self.is_running = False
print('Bye bye')
logging.info('Bye bye')

def get_comments(self):
if self.send_request("live/{}/get_comment/".format(self.broadcast_id)):
if 'comments' in self.LastJson:
for comment in self.LastJson['comments']:
print(f"{comment['user']['username']} has posted a new comment: {comment['text']}")
logging.info(f"{comment['user']['username']} has posted a new comment: {comment['text']}")
else:
print("There is no comments.")
logging.info("There is no comments.")

def pin_comment(self, to_send):
if self.send_comment(msg=to_send):
Expand All @@ -581,7 +563,7 @@ def pin_comment(self, to_send):
})
if self.send_request(endpoint='live/{}/pin_comment/'.format(self.broadcast_id),
post=self.generate_signature(data)):
print('Comment pinned!')
logging.info('Comment pinned!')
return True

return False
Expand All @@ -595,6 +577,6 @@ def unpin_comment(self):
})
if self.send_request(endpoint='live/{}/unpin_comment/'.format(self.broadcast_id),
post=self.generate_signature(data)):
print('Comment unpinned!')
logging.info('Comment unpinned!')
return True
return False
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pyperclip>=1.8.0
requests>=2.23.0
Pillow=8.1.1