diff --git a/README.md b/README.md index eee5970..b00e46b 100644 --- a/README.md +++ b/README.md @@ -119,10 +119,6 @@ Optional | company identifier on linkedin, will search for that company if that company id does not exist | e.g. openai from https://www.linkedin.com/company/openai | -├── user_id (str): -| alternative to company_name, provide user identifier on linkedin, will scrape this user's company -| e.g. dougmcmillon from https://www.linkedin.com/in/dougmcmillon -| ├── search_term (str): | staff title to search for | e.g. software engineer diff --git a/pyproject.toml b/pyproject.toml index 8b7a777..cfb9d00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "staffspy" -version = "0.2.19" +version = "0.2.20" description = "Staff scraper library for LinkedIn" authors = ["Cullen Watson "] readme = "README.md" diff --git a/staffspy/__init__.py b/staffspy/__init__.py index a3d4620..541a077 100644 --- a/staffspy/__init__.py +++ b/staffspy/__init__.py @@ -50,7 +50,6 @@ def login(self): def scrape_staff( self, company_name: str = None, - user_id: str = None, search_term: str = None, location: str = None, extra_profile_data: bool = False, @@ -58,7 +57,6 @@ def scrape_staff( ) -> pd.DataFrame: """Scrape staff from Linkedin company_name - name of company to find staff frame - user_id - alternative to company_name, fetches the company_name from the user profile search_term - occupation / term to search for at the company location - filter for staff at a location extra_profile_data - fetches staff's experiences, schools, and mor @@ -66,13 +64,6 @@ def scrape_staff( """ li_scraper = LinkedInScraper(self.session) - if not company_name: - if not user_id: - raise ValueError("Either company_name or user_id must be provided") - company_name = li_scraper.fetch_user_profile_data_from_public_id( - "company_id" - ) - staff = li_scraper.scrape_staff( company_name=company_name, extra_profile_data=extra_profile_data, diff --git a/staffspy/linkedin/linkedin.py b/staffspy/linkedin/linkedin.py index 41f0736..96af12d 100644 --- a/staffspy/linkedin/linkedin.py +++ b/staffspy/linkedin/linkedin.py @@ -25,11 +25,13 @@ class LinkedInScraper: - employees_ep = "https://www.linkedin.com/voyager/api/graphql?variables=(start:{offset},query:(flagshipSearchIntent:SEARCH_SRP,{search}queryParameters:List((key:currentCompany,value:List({company_id})),{location}(key:resultType,value:List(PEOPLE))),includeFiltersInResponse:false),count:{count})&queryId=voyagerSearchDashClusters.66adc6056cf4138949ca5dcb31bb1749" + employees_ep = "https://www.linkedin.com/voyager/api/graphql?variables=(start:{offset},query:(flagshipSearchIntent:SEARCH_SRP,{search}queryParameters:List({company_id}{location}(key:resultType,value:List(PEOPLE))),includeFiltersInResponse:false),count:{count})&queryId=voyagerSearchDashClusters.66adc6056cf4138949ca5dcb31bb1749" company_id_ep = "https://www.linkedin.com/voyager/api/organization/companies?q=universalName&universalName=" company_search_ep = "https://www.linkedin.com/voyager/api/graphql?queryId=voyagerSearchDashClusters.02af3bc8bc85a169bb76bb4805d05759&queryName=SearchClusterCollection&variables=(query:(flagshipSearchIntent:SEARCH_SRP,keywords:{company},includeFiltersInResponse:false,queryParameters:(keywords:List({company}),resultType:List(COMPANIES))),count:10,origin:GLOBAL_SEARCH_HEADER,start:0)" location_id_ep = "https://www.linkedin.com/voyager/api/graphql?queryId=voyagerSearchDashReusableTypeahead.57a4fa1dd92d3266ed968fdbab2d7bf5&queryName=SearchReusableTypeaheadByType&variables=(query:(showFullLastNameForConnections:false,typeaheadFilterQuery:(geoSearchTypes:List(MARKET_AREA,COUNTRY_REGION,ADMIN_DIVISION_1,CITY))),keywords:{location},type:GEO,start:0)" - public_user_id_ep = "https://www.linkedin.com/voyager/api/identity/profiles/{user_id}/profileView" + public_user_id_ep = ( + "https://www.linkedin.com/voyager/api/identity/profiles/{user_id}/profileView" + ) def __init__(self, session: requests.Session): self.session = session @@ -54,9 +56,9 @@ def __init__(self, session: requests.Session): def search_companies(self, company_name): """Get the company id and staff count from the company name.""" company_search_ep = self.company_search_ep.format(company=quote(company_name)) - self.session.headers['x-li-graphql-pegasus-client'] = "true" + self.session.headers["x-li-graphql-pegasus-client"] = "true" res = self.session.get(company_search_ep) - self.session.headers.pop('x-li-graphql-pegasus-client', '') + self.session.headers.pop("x-li-graphql-pegasus-client", "") if res.status_code != 200: raise Exception( f"Failed to search for company {company_name}", @@ -64,21 +66,28 @@ def search_companies(self, company_name): res.text[:200], ) logger.debug(f"Searched companies {res.status_code}") - companies = res.json()['data']['searchDashClustersByAll']['elements'] + companies = res.json()["data"]["searchDashClustersByAll"]["elements"] if len(companies) < 2: - raise Exception(f'No companies found for name {company_name}, Response: {res.text[:200]}') + raise Exception( + f"No companies found for name {company_name}, Response: {res.text[:200]}" + ) metadata, first_company = companies[:2] try: - num_results = metadata['items'][0]['item']['simpleTextV2']['text']['text'] - first_company = companies[1]['items'][0]['item']['entityResult'] - company_link = first_company['navigationUrl'] - company_name_id = unquote(re.search(r'/company/([^/]+)', company_link).group(1)) - company_name_new = first_company['title']['text'] + num_results = metadata["items"][0]["item"]["simpleTextV2"]["text"]["text"] + first_company = companies[1]["items"][0]["item"]["entityResult"] + company_link = first_company["navigationUrl"] + company_name_id = unquote( + re.search(r"/company/([^/]+)", company_link).group(1) + ) + company_name_new = first_company["title"]["text"] except Exception as e: - raise Exception(f'Failed to load json in search_companies {str(e)}, Response: {res.text[:200]}') + raise Exception( + f"Failed to load json in search_companies {str(e)}, Response: {res.text[:200]}" + ) logger.info( - f"Searched company {company_name} on LinkedIn and were {num_results}, using first result with company name - '{company_name_new}' and company id - '{company_name_id}'") + f"Searched company {company_name} on LinkedIn and were {num_results}, using first result with company name - '{company_name_new}' and company id - '{company_name_id}'" + ) return company_name_id def fetch_or_search_company(self, company_name): @@ -92,7 +101,9 @@ def fetch_or_search_company(self, company_name): res.text[:200], ) elif res.status_code == 404: - logger.info(f"Failed to directly use company '{company_name}' as company id, now searching for the company") + logger.info( + f"Failed to directly use company '{company_name}' as company id, now searching for the company" + ) company_name = self.search_companies(company_name) res = self.session.get(f"{self.company_id_ep}{company_name}") if res.status_code != 200: @@ -113,10 +124,16 @@ def get_company_id_and_staff_count(self, company_name: str): response_json = res.json() except json.decoder.JSONDecodeError: logger.debug(res.text[:200]) - raise Exception(f'Failed to load json in get_company_id_and_staff_count {res.text[:200]}') + raise Exception( + f"Failed to load json in get_company_id_and_staff_count {res.text[:200]}" + ) company = response_json["elements"][0] - self.domain = utils.extract_base_domain(company["companyPageUrl"]) if company.get('companyPageUrl') else None + self.domain = ( + utils.extract_base_domain(company["companyPageUrl"]) + if company.get("companyPageUrl") + else None + ) staff_count = company["staffCount"] company_id = company["trackingInfo"]["objectUrn"].split(":")[-1] company_name = company["universalName"] @@ -162,11 +179,15 @@ def parse_staff(self, elements): ) return staff - def fetch_staff(self, offset, company_id): - """Fetch the staff at the company using LinkedIn search""" + def fetch_staff(self, offset: int): + """Fetch the staff using LinkedIn search""" ep = self.employees_ep.format( offset=offset, - company_id=company_id, + company_id=( + f"(key:currentCompany,value:List({self.company_id}))," + if self.company_id + else "" + ), count=min(50, self.max_results), search=f"keywords:{quote(self.search_term)}," if self.search_term else "", location=( @@ -205,9 +226,19 @@ def fetch_location_id(self): try: res_json = res.json() except json.decoder.JSONDecodeError: - if res.reason == 'INKApi Error': - raise Exception('Delete session file and log in again', res.status_code, res.text[:200], res.reason) - raise GeoUrnNotFound("Failed to send request to get geo id", res.status_code, res.text[:200], res.reason) + if res.reason == "INKApi Error": + raise Exception( + "Delete session file and log in again", + res.status_code, + res.text[:200], + res.reason, + ) + raise GeoUrnNotFound( + "Failed to send request to get geo id", + res.status_code, + res.text[:200], + res.reason, + ) try: elems = res_json["data"]["searchDashReusableTypeaheadByType"]["elements"] @@ -225,20 +256,27 @@ def fetch_location_id(self): self.location = geo_id def scrape_staff( - self, - company_name: str, - search_term: str, - location: str, - extra_profile_data: bool, - max_results: int, + self, + company_name: str | None, + search_term: str, + location: str, + extra_profile_data: bool, + max_results: int, ): """Main driver function""" self.search_term = search_term self.company_name = company_name self.max_results = max_results self.raw_location = location + self.company_id = None + + if self.company_name: + self.company_id, staff_count = self.get_company_id_and_staff_count( + company_name + ) + else: + staff_count = 1000 - company_id, staff_count = self.get_company_id_and_staff_count(company_name) staff_list: list[Staff] = [] self.num_staff = min(staff_count, max_results, 1000) @@ -251,7 +289,7 @@ def scrape_staff( try: for offset in range(0, self.num_staff, 50): - staff = self.fetch_staff(offset, company_id) + staff = self.fetch_staff(offset) if not staff: break staff_list += staff @@ -284,14 +322,20 @@ def fetch_all_info_for_employee(self, employee: Staff, index: int): ) with ThreadPoolExecutor(max_workers=6) as executor: - tasks = {executor.submit(self.employees.fetch_employee, employee, self.domain): "employee", - executor.submit(self.skills.fetch_skills, employee): "skills", - executor.submit(self.experiences.fetch_experiences, employee): ( - "experiences" - ), executor.submit(self.certs.fetch_certifications, employee): ( + tasks = { + executor.submit( + self.employees.fetch_employee, employee, self.domain + ): "employee", + executor.submit(self.skills.fetch_skills, employee): "skills", + executor.submit(self.experiences.fetch_experiences, employee): ( + "experiences" + ), + executor.submit(self.certs.fetch_certifications, employee): ( "certifications" - ), executor.submit(self.schools.fetch_schools, employee): "schools", - executor.submit(self.bio.fetch_employee_bio, employee): "bio"} + ), + executor.submit(self.schools.fetch_schools, employee): "schools", + executor.submit(self.bio.fetch_employee_bio, employee): "bio", + } for future in as_completed(tasks): result = future.result() @@ -305,11 +349,22 @@ def fetch_user_profile_data_from_public_id(self, user_id: str, key: str): response_json = response.json() except json.decoder.JSONDecodeError: logger.debug(response.text[:200]) - raise Exception(f'Failed to load JSON from endpoint', response.status_code, response.reason) + raise Exception( + f"Failed to load JSON from endpoint", + response.status_code, + response.reason, + ) keys = { - 'user_id': ('positionView', 'profileId'), - 'company_id': ('positionView', 'elements', 0, 'company', 'miniCompany', 'universalName') + "user_id": ("positionView", "profileId"), + "company_id": ( + "positionView", + "elements", + 0, + "company", + "miniCompany", + "universalName", + ), } try: @@ -319,6 +374,6 @@ def fetch_user_profile_data_from_public_id(self, user_id: str, key: str): return data except (KeyError, TypeError, IndexError) as e: logger.warning(f"Failed to find user_id {user_id}") - if key == 'user_id': - return '' + if key == "user_id": + return "" raise Exception(f"Failed to fetch '{key}' for user_id {user_id}: {e}")