Skip to content

Commit

Permalink
fix:remove company name requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
cullenwatson committed Nov 11, 2024
1 parent c882cbd commit 5eddfb3
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 56 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ Optional
| company identifier on linkedin, will search for that company if that company id does not exist
| e.g. openai from https://www.linkedin.com/company/openai
|
├── user_id (str):
| alternative to company_name, provide user identifier on linkedin, will scrape this user's company
| e.g. dougmcmillon from https://www.linkedin.com/in/dougmcmillon
|
├── search_term (str):
| staff title to search for
| e.g. software engineer
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "staffspy"
version = "0.2.19"
version = "0.2.20"
description = "Staff scraper library for LinkedIn"
authors = ["Cullen Watson <[email protected]>"]
readme = "README.md"
Expand Down
9 changes: 0 additions & 9 deletions staffspy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,20 @@ def login(self):
def scrape_staff(
self,
company_name: str = None,
user_id: str = None,
search_term: str = None,
location: str = None,
extra_profile_data: bool = False,
max_results: int = 1000,
) -> pd.DataFrame:
"""Scrape staff from Linkedin
company_name - name of company to find staff frame
user_id - alternative to company_name, fetches the company_name from the user profile
search_term - occupation / term to search for at the company
location - filter for staff at a location
extra_profile_data - fetches staff's experiences, schools, and mor
max_results - amount of results you desire
"""
li_scraper = LinkedInScraper(self.session)

if not company_name:
if not user_id:
raise ValueError("Either company_name or user_id must be provided")
company_name = li_scraper.fetch_user_profile_data_from_public_id(
"company_id"
)

staff = li_scraper.scrape_staff(
company_name=company_name,
extra_profile_data=extra_profile_data,
Expand Down
139 changes: 97 additions & 42 deletions staffspy/linkedin/linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@


class LinkedInScraper:
employees_ep = "https://www.linkedin.com/voyager/api/graphql?variables=(start:{offset},query:(flagshipSearchIntent:SEARCH_SRP,{search}queryParameters:List((key:currentCompany,value:List({company_id})),{location}(key:resultType,value:List(PEOPLE))),includeFiltersInResponse:false),count:{count})&queryId=voyagerSearchDashClusters.66adc6056cf4138949ca5dcb31bb1749"
employees_ep = "https://www.linkedin.com/voyager/api/graphql?variables=(start:{offset},query:(flagshipSearchIntent:SEARCH_SRP,{search}queryParameters:List({company_id}{location}(key:resultType,value:List(PEOPLE))),includeFiltersInResponse:false),count:{count})&queryId=voyagerSearchDashClusters.66adc6056cf4138949ca5dcb31bb1749"
company_id_ep = "https://www.linkedin.com/voyager/api/organization/companies?q=universalName&universalName="
company_search_ep = "https://www.linkedin.com/voyager/api/graphql?queryId=voyagerSearchDashClusters.02af3bc8bc85a169bb76bb4805d05759&queryName=SearchClusterCollection&variables=(query:(flagshipSearchIntent:SEARCH_SRP,keywords:{company},includeFiltersInResponse:false,queryParameters:(keywords:List({company}),resultType:List(COMPANIES))),count:10,origin:GLOBAL_SEARCH_HEADER,start:0)"
location_id_ep = "https://www.linkedin.com/voyager/api/graphql?queryId=voyagerSearchDashReusableTypeahead.57a4fa1dd92d3266ed968fdbab2d7bf5&queryName=SearchReusableTypeaheadByType&variables=(query:(showFullLastNameForConnections:false,typeaheadFilterQuery:(geoSearchTypes:List(MARKET_AREA,COUNTRY_REGION,ADMIN_DIVISION_1,CITY))),keywords:{location},type:GEO,start:0)"
public_user_id_ep = "https://www.linkedin.com/voyager/api/identity/profiles/{user_id}/profileView"
public_user_id_ep = (
"https://www.linkedin.com/voyager/api/identity/profiles/{user_id}/profileView"
)

def __init__(self, session: requests.Session):
self.session = session
Expand All @@ -54,31 +56,38 @@ def __init__(self, session: requests.Session):
def search_companies(self, company_name):
"""Get the company id and staff count from the company name."""
company_search_ep = self.company_search_ep.format(company=quote(company_name))
self.session.headers['x-li-graphql-pegasus-client'] = "true"
self.session.headers["x-li-graphql-pegasus-client"] = "true"
res = self.session.get(company_search_ep)
self.session.headers.pop('x-li-graphql-pegasus-client', '')
self.session.headers.pop("x-li-graphql-pegasus-client", "")
if res.status_code != 200:
raise Exception(
f"Failed to search for company {company_name}",
res.status_code,
res.text[:200],
)
logger.debug(f"Searched companies {res.status_code}")
companies = res.json()['data']['searchDashClustersByAll']['elements']
companies = res.json()["data"]["searchDashClustersByAll"]["elements"]
if len(companies) < 2:
raise Exception(f'No companies found for name {company_name}, Response: {res.text[:200]}')
raise Exception(
f"No companies found for name {company_name}, Response: {res.text[:200]}"
)
metadata, first_company = companies[:2]
try:
num_results = metadata['items'][0]['item']['simpleTextV2']['text']['text']
first_company = companies[1]['items'][0]['item']['entityResult']
company_link = first_company['navigationUrl']
company_name_id = unquote(re.search(r'/company/([^/]+)', company_link).group(1))
company_name_new = first_company['title']['text']
num_results = metadata["items"][0]["item"]["simpleTextV2"]["text"]["text"]
first_company = companies[1]["items"][0]["item"]["entityResult"]
company_link = first_company["navigationUrl"]
company_name_id = unquote(
re.search(r"/company/([^/]+)", company_link).group(1)
)
company_name_new = first_company["title"]["text"]
except Exception as e:
raise Exception(f'Failed to load json in search_companies {str(e)}, Response: {res.text[:200]}')
raise Exception(
f"Failed to load json in search_companies {str(e)}, Response: {res.text[:200]}"
)

logger.info(
f"Searched company {company_name} on LinkedIn and were {num_results}, using first result with company name - '{company_name_new}' and company id - '{company_name_id}'")
f"Searched company {company_name} on LinkedIn and were {num_results}, using first result with company name - '{company_name_new}' and company id - '{company_name_id}'"
)
return company_name_id

def fetch_or_search_company(self, company_name):
Expand All @@ -92,7 +101,9 @@ def fetch_or_search_company(self, company_name):
res.text[:200],
)
elif res.status_code == 404:
logger.info(f"Failed to directly use company '{company_name}' as company id, now searching for the company")
logger.info(
f"Failed to directly use company '{company_name}' as company id, now searching for the company"
)
company_name = self.search_companies(company_name)
res = self.session.get(f"{self.company_id_ep}{company_name}")
if res.status_code != 200:
Expand All @@ -113,10 +124,16 @@ def get_company_id_and_staff_count(self, company_name: str):
response_json = res.json()
except json.decoder.JSONDecodeError:
logger.debug(res.text[:200])
raise Exception(f'Failed to load json in get_company_id_and_staff_count {res.text[:200]}')
raise Exception(
f"Failed to load json in get_company_id_and_staff_count {res.text[:200]}"
)

company = response_json["elements"][0]
self.domain = utils.extract_base_domain(company["companyPageUrl"]) if company.get('companyPageUrl') else None
self.domain = (
utils.extract_base_domain(company["companyPageUrl"])
if company.get("companyPageUrl")
else None
)
staff_count = company["staffCount"]
company_id = company["trackingInfo"]["objectUrn"].split(":")[-1]
company_name = company["universalName"]
Expand Down Expand Up @@ -162,11 +179,15 @@ def parse_staff(self, elements):
)
return staff

def fetch_staff(self, offset, company_id):
"""Fetch the staff at the company using LinkedIn search"""
def fetch_staff(self, offset: int):
"""Fetch the staff using LinkedIn search"""
ep = self.employees_ep.format(
offset=offset,
company_id=company_id,
company_id=(
f"(key:currentCompany,value:List({self.company_id})),"
if self.company_id
else ""
),
count=min(50, self.max_results),
search=f"keywords:{quote(self.search_term)}," if self.search_term else "",
location=(
Expand Down Expand Up @@ -205,9 +226,19 @@ def fetch_location_id(self):
try:
res_json = res.json()
except json.decoder.JSONDecodeError:
if res.reason == 'INKApi Error':
raise Exception('Delete session file and log in again', res.status_code, res.text[:200], res.reason)
raise GeoUrnNotFound("Failed to send request to get geo id", res.status_code, res.text[:200], res.reason)
if res.reason == "INKApi Error":
raise Exception(
"Delete session file and log in again",
res.status_code,
res.text[:200],
res.reason,
)
raise GeoUrnNotFound(
"Failed to send request to get geo id",
res.status_code,
res.text[:200],
res.reason,
)

try:
elems = res_json["data"]["searchDashReusableTypeaheadByType"]["elements"]
Expand All @@ -225,20 +256,27 @@ def fetch_location_id(self):
self.location = geo_id

def scrape_staff(
self,
company_name: str,
search_term: str,
location: str,
extra_profile_data: bool,
max_results: int,
self,
company_name: str | None,
search_term: str,
location: str,
extra_profile_data: bool,
max_results: int,
):
"""Main driver function"""
self.search_term = search_term
self.company_name = company_name
self.max_results = max_results
self.raw_location = location
self.company_id = None

if self.company_name:
self.company_id, staff_count = self.get_company_id_and_staff_count(
company_name
)
else:
staff_count = 1000

company_id, staff_count = self.get_company_id_and_staff_count(company_name)
staff_list: list[Staff] = []
self.num_staff = min(staff_count, max_results, 1000)

Expand All @@ -251,7 +289,7 @@ def scrape_staff(

try:
for offset in range(0, self.num_staff, 50):
staff = self.fetch_staff(offset, company_id)
staff = self.fetch_staff(offset)
if not staff:
break
staff_list += staff
Expand Down Expand Up @@ -284,14 +322,20 @@ def fetch_all_info_for_employee(self, employee: Staff, index: int):
)

with ThreadPoolExecutor(max_workers=6) as executor:
tasks = {executor.submit(self.employees.fetch_employee, employee, self.domain): "employee",
executor.submit(self.skills.fetch_skills, employee): "skills",
executor.submit(self.experiences.fetch_experiences, employee): (
"experiences"
), executor.submit(self.certs.fetch_certifications, employee): (
tasks = {
executor.submit(
self.employees.fetch_employee, employee, self.domain
): "employee",
executor.submit(self.skills.fetch_skills, employee): "skills",
executor.submit(self.experiences.fetch_experiences, employee): (
"experiences"
),
executor.submit(self.certs.fetch_certifications, employee): (
"certifications"
), executor.submit(self.schools.fetch_schools, employee): "schools",
executor.submit(self.bio.fetch_employee_bio, employee): "bio"}
),
executor.submit(self.schools.fetch_schools, employee): "schools",
executor.submit(self.bio.fetch_employee_bio, employee): "bio",
}

for future in as_completed(tasks):
result = future.result()
Expand All @@ -305,11 +349,22 @@ def fetch_user_profile_data_from_public_id(self, user_id: str, key: str):
response_json = response.json()
except json.decoder.JSONDecodeError:
logger.debug(response.text[:200])
raise Exception(f'Failed to load JSON from endpoint', response.status_code, response.reason)
raise Exception(
f"Failed to load JSON from endpoint",
response.status_code,
response.reason,
)

keys = {
'user_id': ('positionView', 'profileId'),
'company_id': ('positionView', 'elements', 0, 'company', 'miniCompany', 'universalName')
"user_id": ("positionView", "profileId"),
"company_id": (
"positionView",
"elements",
0,
"company",
"miniCompany",
"universalName",
),
}

try:
Expand All @@ -319,6 +374,6 @@ def fetch_user_profile_data_from_public_id(self, user_id: str, key: str):
return data
except (KeyError, TypeError, IndexError) as e:
logger.warning(f"Failed to find user_id {user_id}")
if key == 'user_id':
return ''
if key == "user_id":
return ""
raise Exception(f"Failed to fetch '{key}' for user_id {user_id}: {e}")

0 comments on commit 5eddfb3

Please sign in to comment.