From 8e59cee9534c5a47815bf5dd78519cf172038a53 Mon Sep 17 00:00:00 2001 From: Cullen Watson Date: Fri, 13 Sep 2024 18:42:40 -0500 Subject: [PATCH] enh:email variations --- staffspy/solvers/capsolver.py | 22 +++-- staffspy/utils/models.py | 69 +++++++++----- staffspy/utils/utils.py | 165 ++++++++++++++++++++-------------- 3 files changed, 158 insertions(+), 98 deletions(-) diff --git a/staffspy/solvers/capsolver.py b/staffspy/solvers/capsolver.py index 0d17af1..effd4ab 100644 --- a/staffspy/solvers/capsolver.py +++ b/staffspy/solvers/capsolver.py @@ -12,27 +12,31 @@ def is_none(value): class CapSolver(Solver): - """ https://www.capsolver.com/ """ + """https://www.capsolver.com/""" @retry(stop=stop_after_attempt(10), retry=retry_if_result(is_none)) - def solve(self, blob_data: str, page_url: str=None): + def solve(self, blob_data: str, page_url: str = None): from staffspy.utils.utils import logger - logger.info(f'Waiting on CapSolver to solve captcha...') + + logger.info(f"Waiting on CapSolver to solve captcha...") payload = { "clientKey": self.solver_api_key, "task": { - "type": 'FunCaptchaTaskProxyLess', + "type": "FunCaptchaTaskProxyLess", "websitePublicKey": self.public_key, "websiteURL": self.page_url, - "data": json.dumps({"blob": blob_data}) if blob_data else '' - } + "data": json.dumps({"blob": blob_data}) if blob_data else "", + }, } res = requests.post("https://api.capsolver.com/createTask", json=payload) resp = res.json() task_id = resp.get("taskId") if not task_id: - raise Exception("CapSolver failed to create task, try another captcha solver like 2Captcha if this persists or use browser sign in `pip install staffspy[browser]` and then remove the username/password params to the scrape_staff()",res.text) + raise Exception( + "CapSolver failed to create task, try another captcha solver like 2Captcha if this persists or use browser sign in `pip install staffspy[browser]` and then remove the username/password params to the LinkedInAccount()", + res.text, + ) logger.info(f"Received captcha solver taskId: {task_id} / Getting result...") while True: @@ -42,8 +46,8 @@ def solve(self, blob_data: str, page_url: str=None): resp = res.json() status = resp.get("status") if status == "ready": - logger.info(f'CapSolver finished solving captcha') - return resp.get("solution", {}).get('token') + logger.info(f"CapSolver finished solving captcha") + return resp.get("solution", {}).get("token") if status == "failed" or resp.get("errorId"): logger.info(f"Captcha solve failed! response: {res.text}") return None diff --git a/staffspy/utils/models.py b/staffspy/utils/models.py index 4d14722..8482ea8 100644 --- a/staffspy/utils/models.py +++ b/staffspy/utils/models.py @@ -29,7 +29,7 @@ def to_dict(self): return { "name": self.name, "endorsements": self.endorsements if self.endorsements else 0, - "passed_assessment": self.passed_assessment + "passed_assessment": self.passed_assessment, } @@ -107,25 +107,37 @@ class Staff(BaseModel): def get_top_skills(self): top_three_skills = [] if self.skills: - sorted_skills = sorted(self.skills, key=lambda x: x.endorsements, reverse=True) + sorted_skills = sorted( + self.skills, key=lambda x: x.endorsements, reverse=True + ) top_three_skills = [skill.name for skill in sorted_skills[:3]] top_three_skills += [None] * (3 - len(top_three_skills)) return top_three_skills def to_dict(self): - sorted_schools = sorted( - self.schools, key=lambda x: (x.end_date is None, x.end_date), reverse=True - ) if self.schools else [] + sorted_schools = ( + sorted( + self.schools, + key=lambda x: (x.end_date is None, x.end_date), + reverse=True, + ) + if self.schools + else [] + ) top_three_school_names = [school.school for school in sorted_schools[:3]] top_three_school_names += [None] * (3 - len(top_three_school_names)) estimated_age = self.estimate_age_based_on_education() - sorted_experiences = sorted( - self.experiences, - key=lambda x: (x.end_date is None, x.end_date), - reverse=True - ) if self.experiences else [] + sorted_experiences = ( + sorted( + self.experiences, + key=lambda x: (x.end_date is None, x.end_date), + reverse=True, + ) + if self.experiences + else [] + ) top_three_companies = [] seen_companies = set() @@ -137,16 +149,20 @@ def to_dict(self): break top_three_companies += [None] * (3 - len(top_three_companies)) - top_three_skills=self.get_top_skills() + top_three_skills = self.get_top_skills() name = filter(None, [self.first_name, self.last_name]) - self.emails_in_bio=extract_emails_from_text(self.bio) if self.bio else None - self.current_position = sorted_experiences[0].title if len(sorted_experiences) > 0 and sorted_experiences[0].end_date is None else None + self.emails_in_bio = extract_emails_from_text(self.bio) if self.bio else None + self.current_position = ( + sorted_experiences[0].title + if len(sorted_experiences) > 0 and sorted_experiences[0].end_date is None + else None + ) return { "search_term": self.search_term, "id": self.id, "profile_id": self.profile_id, - "name": self.name if self.name else ' '.join(name) if name else None, + "name": self.name if self.name else " ".join(name) if name else None, "first_name": self.first_name, "last_name": self.last_name, "location": self.location, @@ -161,7 +177,7 @@ def to_dict(self): "influencer": self.influencer, "open_to_work": self.open_to_work, "is_hiring": self.is_hiring, - "current_position":self.current_position, + "current_position": self.current_position, "current_company": top_three_companies[0], "past_company_1": top_three_companies[1], "past_company_2": top_three_companies[2], @@ -187,8 +203,10 @@ def to_dict(self): if self.certifications else None ), - "emails_in_bio": ', '.join(self.emails_in_bio) if self.emails_in_bio else None, - "potential_emails": ', '.join(self.potential_emails) if self.potential_emails else None, + "emails_in_bio": ( + ", ".join(self.emails_in_bio) if self.emails_in_bio else None + ), + "potential_emails": self.potential_emails, "profile_link": self.profile_link, "profile_photo": self.profile_photo, "banner_photo": self.banner_photo, @@ -198,14 +216,21 @@ def estimate_age_based_on_education(self): """Adds 18 to their first college start date""" college_words = ["uni", "college"] - sorted_schools = sorted( - [school for school in self.schools if school.start_date], - key=lambda x: x.start_date, - ) if self.schools else [] + sorted_schools = ( + sorted( + [school for school in self.schools if school.start_date], + key=lambda x: x.start_date, + ) + if self.schools + else [] + ) current_date = datetime.now().date() for school in sorted_schools: - if any(word in school.school.lower() for word in college_words) or school.degree: + if ( + any(word in school.school.lower() for word in college_words) + or school.degree + ): if school.start_date: years_in_education = (current_date - school.start_date).days // 365 return int(18 + years_in_education) diff --git a/staffspy/utils/utils.py b/staffspy/utils/utils.py index 22071b0..9a8d101 100644 --- a/staffspy/utils/utils.py +++ b/staffspy/utils/utils.py @@ -40,7 +40,13 @@ def extract_base_domain(url: str): def create_emails(first, last, domain): first = "".join(filter(str.isalpha, first)).lower() last = "".join(filter(str.isalpha, last)).lower() - emails = [f"{first}.{last}@{domain}", f"{first[0]}{last}@{domain}"] + emails = [ + f"{first}.{last}@{domain}", + f"{first[:1]}{last}@{domain}", + f"{first[:2]}{last}@{domain}", + f"{first}{last[:1]}@{domain}", + f"{first}{last[:2]}@{domain}", + ] return emails @@ -49,7 +55,9 @@ def get_webdriver(): from selenium import webdriver from selenium.common.exceptions import WebDriverException except ImportError as e: - raise Exception('install package `pip install staffspy[browser]` to login with browser') + raise Exception( + "install package `pip install staffspy[browser]` to login with browser" + ) for browser in [webdriver.Chrome, webdriver.Firefox]: try: @@ -61,76 +69,97 @@ def get_webdriver(): class Login: - def __init__(self, username: str,password: str, solver: Solver, session_file: str): - self.username,self.password,self.solver,self.session_file=username,password,solver,session_file + def __init__(self, username: str, password: str, solver: Solver, session_file: str): + self.username, self.password, self.solver, self.session_file = ( + username, + password, + solver, + session_file, + ) - def solve_captcha(self, session,data,payload): - url=data['challenge_url'] - r=session.post(url, data=payload) + def solve_captcha(self, session, data, payload): + url = data["challenge_url"] + r = session.post(url, data=payload) - soup = BeautifulSoup(r.text, 'html.parser') + soup = BeautifulSoup(r.text, "html.parser") - code_tag = soup.find('code', id='securedDataExchange') + code_tag = soup.find("code", id="securedDataExchange") - logger.info('Searching for captcha blob in linkedin to begin captcha solving') + logger.info("Searching for captcha blob in linkedin to begin captcha solving") if code_tag: comment = code_tag.contents[0] - extracted_code = str(comment).strip("").strip() + extracted_code = str(comment).strip('').strip() logger.debug("Extracted captcha blob:", extracted_code) - elif 'Please choose a more secure password.' in r.text: - raise Exception('linkedin is requiring a more secure password. reset pw and try again') + elif "Please choose a more secure password." in r.text: + raise Exception( + "linkedin is requiring a more secure password. reset pw and try again" + ) else: - raise BlobException('blob to solve captcha not found - rerunning the program usually solves this') + raise BlobException( + "blob to solve captcha not found - rerunning the program usually solves this" + ) if not self.solver: - raise Exception('captcha hit - provide solver_api_key and solver_service name to solve or switch to the browser-based login with `pip install staffspy[browser]`') - token = self.solver.solve(extracted_code,url) + raise Exception( + "captcha hit - provide solver_api_key and solver_service name to solve or switch to the browser-based login with `pip install staffspy[browser]`" + ) + token = self.solver.solve(extracted_code, url) if not token: - raise Exception('failed to solve captcha after 10 attempts') - - captcha_site_key = soup.find('input', {'name': 'captchaSiteKey'})['value'] - challenge_id = soup.find('input', {'name': 'challengeId'})['value'] - challenge_data = soup.find('input', {'name': 'challengeData'})['value'] - challenge_details = soup.find('input', {'name': 'challengeDetails'})['value'] - challenge_type = soup.find('input', {'name': 'challengeType'})['value'] - challenge_source = soup.find('input', {'name': 'challengeSource'})['value'] - request_submission_id = soup.find('input', {'name': 'requestSubmissionId'})['value'] - display_time = soup.find('input', {'name': 'displayTime'})['value'] - page_instance = soup.find('input', {'name': 'pageInstance'})['value'] - failure_redirect_uri = soup.find('input', {'name': 'failureRedirectUri'})['value'] - sign_in_link = soup.find('input', {'name': 'signInLink'})['value'] - join_now_link = soup.find('input', {'name': 'joinNowLink'})['value'] + raise Exception("failed to solve captcha after 10 attempts") + + captcha_site_key = soup.find("input", {"name": "captchaSiteKey"})["value"] + challenge_id = soup.find("input", {"name": "challengeId"})["value"] + challenge_data = soup.find("input", {"name": "challengeData"})["value"] + challenge_details = soup.find("input", {"name": "challengeDetails"})["value"] + challenge_type = soup.find("input", {"name": "challengeType"})["value"] + challenge_source = soup.find("input", {"name": "challengeSource"})["value"] + request_submission_id = soup.find("input", {"name": "requestSubmissionId"})[ + "value" + ] + display_time = soup.find("input", {"name": "displayTime"})["value"] + page_instance = soup.find("input", {"name": "pageInstance"})["value"] + failure_redirect_uri = soup.find("input", {"name": "failureRedirectUri"})[ + "value" + ] + sign_in_link = soup.find("input", {"name": "signInLink"})["value"] + join_now_link = soup.find("input", {"name": "joinNowLink"})["value"] for cookie in session.cookies: - if cookie.name == 'JSESSIONID': - jsession_value = cookie.value.split('ajax:')[1].strip('"') + if cookie.name == "JSESSIONID": + jsession_value = cookie.value.split("ajax:")[1].strip('"') break else: - raise Exception('jsessionid not found, raise issue on GitHub') - csrf_token=f"ajax:{jsession_value}" + raise Exception("jsessionid not found, raise issue on GitHub") + csrf_token = f"ajax:{jsession_value}" payload = { - "csrfToken":csrf_token, - "captchaSiteKey":captcha_site_key, - "challengeId":challenge_id, - "language":"en-US", - "displayTime":display_time, - "challengeType":challenge_type, - "challengeSource":challenge_source, - "requestSubmissionId":request_submission_id, - "captchaUserResponseToken":token, - "challengeData":challenge_data, - "pageInstance":page_instance, - "challengeDetails":challenge_details, - "failureRedirectUri":failure_redirect_uri, - "signInLink":sign_in_link, - "joinNowLink":join_now_link, - "_s":"CONSUMER_LOGIN" + "csrfToken": csrf_token, + "captchaSiteKey": captcha_site_key, + "challengeId": challenge_id, + "language": "en-US", + "displayTime": display_time, + "challengeType": challenge_type, + "challengeSource": challenge_source, + "requestSubmissionId": request_submission_id, + "captchaUserResponseToken": token, + "challengeData": challenge_data, + "pageInstance": page_instance, + "challengeDetails": challenge_details, + "failureRedirectUri": failure_redirect_uri, + "signInLink": sign_in_link, + "joinNowLink": join_now_link, + "_s": "CONSUMER_LOGIN", + } + encoded_payload = { + key: f'{quote(str(value), "")}' for key, value in payload.items() } - encoded_payload = {key: f'{quote(str(value), "")}' for key, value in payload.items()} - query_string = '&'.join([f'{key}={value}' for key, value in encoded_payload.items()]) - response=session.post("https://www.linkedin.com/checkpoint/challenge/verify", data=query_string) + query_string = "&".join( + [f"{key}={value}" for key, value in encoded_payload.items()] + ) + response = session.post( + "https://www.linkedin.com/checkpoint/challenge/verify", data=query_string + ) if not response.ok: - raise Exception(f'verify captcha failed {response.text[:200]}') + raise Exception(f"verify captcha failed {response.text[:200]}") @retry(stop=stop_after_attempt(5), retry=retry_if_exception_type(BlobException)) def login_requests(self): @@ -150,23 +179,25 @@ def login_requests(self): response = session.get(url) if response.status_code != 200: - raise Exception(f"failed to begin auth process: {response.status_code} {response.text}") + raise Exception( + f"failed to begin auth process: {response.status_code} {response.text}" + ) for cookie in session.cookies: - if cookie.name == 'JSESSIONID': - jsession_value = cookie.value.split('ajax:')[1].strip('"') + if cookie.name == "JSESSIONID": + jsession_value = cookie.value.split("ajax:")[1].strip('"') break else: - raise Exception('jsessionid not found, raise issue on GitHub') - session.headers['content-type'] = "application/x-www-form-urlencoded" - csrf_token=f"ajax%3A{jsession_value}" + raise Exception("jsessionid not found, raise issue on GitHub") + session.headers["content-type"] = "application/x-www-form-urlencoded" + csrf_token = f"ajax%3A{jsession_value}" payload = f"session_key={encoded_username}&session_password={encoded_password}&JSESSIONID=%22{csrf_token}%22" response = session.post(url, data=payload) - data=response.json() + data = response.json() - if data['login_result'] == 'BAD_USERNAME_OR_PASSWORD': - raise Exception('incorrect username or password') - elif data['login_result']=='CHALLENGE': - self.solve_captcha(session,data,payload) + if data["login_result"] == "BAD_USERNAME_OR_PASSWORD": + raise Exception("incorrect username or password") + elif data["login_result"] == "CHALLENGE": + self.solve_captcha(session, data, payload) session = set_csrf_token(session) return session @@ -177,7 +208,7 @@ def login_browser(self): if driver is None: logger.debug("No browser found for selenium") - raise Exception('driver not found for selenium') + raise Exception("driver not found for selenium") driver.get("https://linkedin.com/login") input("Press enter after logged in") @@ -199,7 +230,7 @@ def save_session(self, session, session_file: str): def load_session(self): """Load session from session file, otherwise login""" - session=None + session = None if not self.session_file or not os.path.exists(self.session_file): if self.username and self.password: try: