diff --git a/linkedin_mcp_server/scraping/extractor.py b/linkedin_mcp_server/scraping/extractor.py index 42c2e774..436e632c 100644 --- a/linkedin_mcp_server/scraping/extractor.py +++ b/linkedin_mcp_server/scraping/extractor.py @@ -4,7 +4,7 @@ from dataclasses import dataclass import logging import re -from typing import Any, Literal +from typing import Any, Callable, Awaitable, Literal from urllib.parse import quote_plus from patchright.async_api import Page, TimeoutError as PlaywrightTimeoutError @@ -17,6 +17,7 @@ from linkedin_mcp_server.core.exceptions import ( AuthenticationError, LinkedInScraperException, + RateLimitError, ) from linkedin_mcp_server.debug_trace import record_page_trace from linkedin_mcp_server.debug_utils import stabilize_navigation @@ -155,6 +156,87 @@ def _truncate_linkedin_noise(text: str) -> str: return text[:earliest].strip() +def _parse_contact_record( + profile_text: str, contact_text: str +) -> dict[str, str | None]: + """Parse raw innerText blobs into structured contact fields. + + Profile text layout (first lines): + Name\\n\\n· 1st\\n\\nHeadline\\n\\nLocation\\n\\n·\\n\\nContact info\\n\\nCompany + + Contact info overlay layout: + Email\\n\\nuser@example.com\\n\\nPhone\\n\\n+123...\\n\\n... + """ + result: dict[str, str | None] = { + "first_name": None, + "last_name": None, + "headline": None, + "location": None, + "company": None, + "email": None, + "phone": None, + "website": None, + "birthday": None, + } + + # --- Parse profile text --- + if profile_text: + lines = [ln.strip() for ln in profile_text.split("\n")] + non_empty = [ln for ln in lines if ln] + + if non_empty: + # Line 1 → full name + full_name = non_empty[0] + parts = full_name.split(None, 1) + result["first_name"] = parts[0] if parts else full_name + result["last_name"] = parts[1] if len(parts) > 1 else None + + # Find connection degree marker (· 1st, · 2nd, · 3rd, · 3rd+) + degree_idx: int | None = None + for i, ln in enumerate(non_empty): + if re.match(r"^·\s*\d+(st|nd|rd|th)\+?$", ln): + degree_idx = i + break + + if degree_idx is not None and degree_idx + 1 < len(non_empty): + result["headline"] = non_empty[degree_idx + 1] + + # Location is the next non-empty line after headline + if degree_idx + 2 < len(non_empty): + candidate = non_empty[degree_idx + 2] + # Skip if it's just the "·" separator or "Contact info" + if candidate not in ("·", "Contact info"): + result["location"] = candidate + + # Company: line after "Contact info" + for i, ln in enumerate(non_empty): + if ln == "Contact info" and i + 1 < len(non_empty): + result["company"] = non_empty[i + 1] + break + + # --- Parse contact info overlay --- + if contact_text: + # Extract labeled fields: "Label\n\nvalue" + for field, label in [ + ("email", "Email"), + ("phone", "Phone"), + ("birthday", "Birthday"), + ]: + match = re.search( + rf"(?:^|\n){re.escape(label)}\s*\n\s*\n\s*(.+)", + contact_text, + ) + if match: + result[field] = match.group(1).strip() + + # Website may include a type annotation like "(Blog)" or "(Portfolio)" + match = re.search(r"(?:^|\n)Website\s*\n\s*\n\s*(.+)", contact_text) + if match: + result["website"] = match.group(1).strip() + + return result + + class LinkedInExtractor: """Extracts LinkedIn page content via navigate-scroll-innerText pattern.""" @@ -1015,15 +1097,25 @@ async def search_people( self, keywords: str, location: str | None = None, + network: str | None = None, ) -> dict[str, Any]: """Search for people and extract the results page. + Args: + keywords: Search keywords. + location: Optional location filter. + network: Optional connection degree filter. + "F" = 1st degree, "S" = 2nd degree, "O" = 3rd+. + Returns: {url, sections: {name: text}} """ params = f"keywords={quote_plus(keywords)}" if location: params += f"&location={quote_plus(location)}" + if network: + # LinkedIn expects network=%5B%22F%22%5D (URL-encoded ["F"]) + params += f"&network=%5B%22{quote_plus(network)}%22%5D" url = f"https://www.linkedin.com/search/results/people/?{params}" extracted = await self.extract_page(url, section_name="search_results") @@ -1156,3 +1248,232 @@ async def _extract_root_content( {"selectors": selectors}, ) return result + + # ------------------------------------------------------------------ + # Connections bulk export + # ------------------------------------------------------------------ + + async def scrape_connections_list( + self, + limit: int = 0, + max_scrolls: int = 50, + ) -> dict[str, Any]: + """Scrape the authenticated user's connections list via infinite scroll. + + Args: + limit: Maximum connections to return (0 = unlimited). + max_scrolls: Maximum scroll iterations (~1s pause each). + + Returns: + {connections: [{username, name, headline}, ...], total, url, pages_visited} + """ + url = "https://www.linkedin.com/mynetwork/invite-connect/connections/" + + # Navigate — handle ERR_ABORTED (page already loaded / redirect race) + try: + await self._page.goto(url, wait_until="domcontentloaded", timeout=30000) + except Exception as nav_err: + if "ERR_ABORTED" in str(nav_err): + logger.info("Navigation aborted (page may already be loaded), retrying") + await asyncio.sleep(2.0) + await self._page.goto(url, wait_until="domcontentloaded", timeout=30000) + else: + raise + + await detect_rate_limit(self._page) + + try: + await self._page.wait_for_selector("main", timeout=10000) + except PlaywrightTimeoutError: + logger.debug("No
element on connections page") + + await handle_modal_close(self._page) + + # Deep scroll to load all connections (infinite scroll) + await scroll_to_bottom(self._page, pause_time=1.0, max_scrolls=max_scrolls) + + # Stabilize — LinkedIn may trigger lazy navigations during scroll + await asyncio.sleep(1.0) + + # Ensure we're still on the connections page; re-navigate if needed + current_url = self._page.url + if "/connections" not in current_url: + logger.warning( + "Page navigated away to %s during scroll, re-navigating", + current_url, + ) + await self._page.goto(url, wait_until="domcontentloaded", timeout=30000) + await asyncio.sleep(2.0) + + # Extract connection data from profile link elements + raw_connections: list[dict[str, str]] = await self._page.evaluate( + """() => { + const results = []; + const seen = new Set(); + const links = document.querySelectorAll('main a[href*="/in/"]'); + for (const a of links) { + const href = a.getAttribute('href') || ''; + const match = href.match(/\\/in\\/([^/?#]+)/); + if (!match) continue; + const username = match[1]; + if (seen.has(username)) continue; + seen.add(username); + + // Walk up to the connection card container + const card = a.closest('li') || a.parentElement; + + // Name: try known selectors, then the link's own visible text + let name = ''; + if (card) { + const nameEl = card.querySelector( + '.mn-connection-card__name, .entity-result__title-text, span[dir="ltr"], span.t-bold' + ); + if (nameEl) name = nameEl.innerText.trim(); + } + if (!name) { + // The profile link itself often contains the person's name + const linkText = a.innerText.trim(); + if (linkText && linkText.length < 80) name = linkText; + } + + // Headline: try known selectors, then parse card text + let headline = ''; + if (card) { + const headlineEl = card.querySelector( + '.mn-connection-card__occupation, .entity-result__primary-subtitle, span.t-normal' + ); + if (headlineEl) headline = headlineEl.innerText.trim(); + } + if (!headline && card) { + // Fallback: split card text by newlines, second non-empty line is usually headline + const lines = card.innerText.split('\\n').map(l => l.trim()).filter(Boolean); + if (lines.length >= 2) headline = lines[1]; + } + + results.push({ username, name, headline }); + } + return results; + }""" + ) + + # Apply limit + if limit > 0: + raw_connections = raw_connections[:limit] + + return { + "connections": raw_connections, + "total": len(raw_connections), + "url": url, + "pages_visited": [url], + } + + async def scrape_contact_batch( + self, + usernames: list[str], + chunk_size: int = 5, + chunk_delay: float = 30.0, + progress_cb: Callable[[int, int], Awaitable[None]] | None = None, + ) -> dict[str, Any]: + """Enrich a list of profiles with contact details in chunked batches. + + For each username: scrapes main profile + contact_info overlay. + + Args: + usernames: List of LinkedIn usernames to enrich. + chunk_size: Profiles per chunk before a long pause. + chunk_delay: Seconds to pause between chunks. + progress_cb: Optional async callback(completed, total) for progress. + + Returns: + {contacts: [{username, first_name, last_name, email, phone, + headline, location, company, website, birthday, + profile_raw, contact_info_raw}], + total, failed, rate_limited, pages_visited} + """ + if chunk_size <= 0: + raise ValueError(f"chunk_size must be a positive integer, got {chunk_size}") + + contacts: list[dict[str, Any]] = [] + failed: list[str] = [] + pages_visited: list[str] = [] + total = len(usernames) + rate_limited = False + + for chunk_idx in range(0, total, chunk_size): + chunk = usernames[chunk_idx : chunk_idx + chunk_size] + + for username in chunk: + profile_url = f"https://www.linkedin.com/in/{username}/" + contact_url = ( + f"https://www.linkedin.com/in/{username}/overlay/contact-info/" + ) + + try: + # Scrape main profile page + profile_text = await self.extract_page(profile_url) + pages_visited.append(profile_url) + + if profile_text == _RATE_LIMITED_MSG: + logger.warning( + "Soft rate limit on profile %s, skipping", username + ) + failed.append(username) + await asyncio.sleep(_NAV_DELAY) + continue + + # Scrape contact info overlay + contact_text = await self._extract_overlay(contact_url) + pages_visited.append(contact_url) + + if contact_text == _RATE_LIMITED_MSG: + contact_text = ( + "" # fall back to empty; parsed fields will be None + ) + + parsed = _parse_contact_record(profile_text, contact_text) + contacts.append( + { + "username": username, + **parsed, + "profile_raw": profile_text, + "contact_info_raw": contact_text, + } + ) + + except RateLimitError: + logger.warning("Rate limited during contact batch at %s", username) + failed.append(username) + rate_limited = True + break + except Exception as e: + logger.warning("Failed to scrape %s: %s", username, e) + failed.append(username) + + # Brief delay between individual profiles + await asyncio.sleep(_NAV_DELAY) + + if rate_limited: + break + + # Report progress after each chunk + completed = min(chunk_idx + len(chunk), total) + if progress_cb: + await progress_cb(completed, total) + + # Pause between chunks (skip after last chunk) + if chunk_idx + chunk_size < total: + logger.info( + "Chunk complete (%d/%d). Pausing %.0fs...", + completed, + total, + chunk_delay, + ) + await asyncio.sleep(chunk_delay) + + return { + "contacts": contacts, + "total": len(contacts), + "failed": failed, + "rate_limited": rate_limited, + "pages_visited": pages_visited, + } diff --git a/linkedin_mcp_server/server.py b/linkedin_mcp_server/server.py index bd6a6e4e..fa8a6f9a 100644 --- a/linkedin_mcp_server/server.py +++ b/linkedin_mcp_server/server.py @@ -23,6 +23,7 @@ SequentialToolExecutionMiddleware, ) from linkedin_mcp_server.tools.company import register_company_tools +from linkedin_mcp_server.tools.connections import register_connections_tools from linkedin_mcp_server.tools.job import register_job_tools from linkedin_mcp_server.tools.person import register_person_tools @@ -58,6 +59,7 @@ def create_mcp_server() -> FastMCP: register_person_tools(mcp) register_company_tools(mcp) register_job_tools(mcp) + register_connections_tools(mcp) # Register session management tool @mcp.tool( diff --git a/linkedin_mcp_server/tools/connections.py b/linkedin_mcp_server/tools/connections.py new file mode 100644 index 00000000..bc73a920 --- /dev/null +++ b/linkedin_mcp_server/tools/connections.py @@ -0,0 +1,172 @@ +""" +LinkedIn connections bulk export tools. + +Provides tools for collecting connection usernames via infinite scroll +and enriching profiles with contact details in chunked batches. +""" + +import logging +from typing import Any + +from fastmcp import Context, FastMCP +from mcp.types import ToolAnnotations + +from linkedin_mcp_server.drivers.browser import ( + ensure_authenticated, + get_or_create_browser, +) +from linkedin_mcp_server.error_handler import handle_tool_error +from linkedin_mcp_server.scraping import LinkedInExtractor + +logger = logging.getLogger(__name__) + + +def register_connections_tools(mcp: FastMCP) -> None: + """Register all connections-related tools with the MCP server.""" + + @mcp.tool( + annotations=ToolAnnotations( + title="Get My Connections", + readOnlyHint=True, + destructiveHint=False, + openWorldHint=True, + ) + ) + async def get_my_connections( + ctx: Context, + limit: int = 0, + max_scrolls: int = 50, + ) -> dict[str, Any]: + """ + Collect the authenticated user's LinkedIn connections via infinite scroll. + + Navigates to the connections page and scrolls to load all connection cards, + then extracts username, name, and headline from each. + + Args: + ctx: FastMCP context for progress reporting + limit: Maximum connections to return (0 = unlimited, default 0) + max_scrolls: Maximum scroll iterations, ~1s pause each (default 50) + + Returns: + Dict with connections (list of {username, name, headline}), total count, + url visited, and pages_visited list. + """ + try: + await ensure_authenticated() + + logger.info( + "Collecting connections (limit=%d, max_scrolls=%d)", limit, max_scrolls + ) + + browser = await get_or_create_browser() + extractor = LinkedInExtractor(browser.page) + + await ctx.report_progress( + progress=0, total=100, message="Loading connections page" + ) + + result = await extractor.scrape_connections_list( + limit=limit, max_scrolls=max_scrolls + ) + + await ctx.report_progress(progress=100, total=100, message="Complete") + + return result + + except Exception as e: + return handle_tool_error(e, "get_my_connections") + + @mcp.tool( + annotations=ToolAnnotations( + title="Extract Contact Details", + readOnlyHint=True, + destructiveHint=False, + openWorldHint=True, + ) + ) + async def extract_contact_details( + usernames: str, + ctx: Context, + chunk_size: int = 5, + chunk_delay: float = 30.0, + ) -> dict[str, Any]: + """ + Enrich LinkedIn profiles with contact details (email, phone, etc.) in chunked batches. + + For each username, scrapes the main profile page and the contact info overlay. + Processes profiles in chunks with configurable delays to avoid rate limiting. + + Args: + usernames: Comma-separated LinkedIn usernames (e.g. "johndoe,janedoe,bobsmith") + ctx: FastMCP context for progress reporting + chunk_size: Number of profiles per chunk before pausing (default 5) + chunk_delay: Seconds to pause between chunks (default 30) + + Returns: + Dict with contacts (list of structured records), total enriched, + failed usernames, rate_limited flag, and pages_visited. + + Each contact record contains: + - username, first_name, last_name: Identity fields + - email, phone, website, birthday: From the contact info overlay + - headline, location, company: From the main profile page + - profile_raw, contact_info_raw: Original innerText as fallback + """ + try: + await ensure_authenticated() + + username_list = list( + dict.fromkeys(u.strip() for u in usernames.split(",") if u.strip()) + ) + + if not username_list: + return { + "error": "invalid_input", + "message": "No valid usernames provided. Pass comma-separated usernames.", + } + + logger.info( + "Enriching %d profiles (chunk_size=%d, chunk_delay=%.0fs)", + len(username_list), + chunk_size, + chunk_delay, + ) + + browser = await get_or_create_browser() + extractor = LinkedInExtractor(browser.page) + + total = len(username_list) + + await ctx.report_progress( + progress=0, + total=total, + message=f"Starting enrichment of {total} profiles", + ) + + async def on_progress(completed: int, total: int) -> None: + await ctx.report_progress( + progress=completed, + total=total, + message=f"Enriched {completed}/{total} profiles", + ) + + result = await extractor.scrape_contact_batch( + usernames=username_list, + chunk_size=chunk_size, + chunk_delay=chunk_delay, + progress_cb=on_progress, + ) + + completed = result["total"] + msg = ( + "Complete" + if not result.get("rate_limited") + else f"Stopped early due to rate limit ({completed}/{total} processed)" + ) + await ctx.report_progress(progress=completed, total=total, message=msg) + + return result + + except Exception as e: + return handle_tool_error(e, "extract_contact_details") diff --git a/linkedin_mcp_server/tools/person.py b/linkedin_mcp_server/tools/person.py index 79053c24..40a8a5c2 100644 --- a/linkedin_mcp_server/tools/person.py +++ b/linkedin_mcp_server/tools/person.py @@ -91,6 +91,7 @@ async def search_people( keywords: str, ctx: Context, location: str | None = None, + network: str | None = None, extractor: Any | None = None, ) -> dict[str, Any]: """ @@ -100,6 +101,8 @@ async def search_people( keywords: Search keywords (e.g., "software engineer", "recruiter at Google") ctx: FastMCP context for progress reporting location: Optional location filter (e.g., "New York", "Remote") + network: Optional connection degree filter. + "F" = 1st degree, "S" = 2nd degree, "O" = 3rd+. Returns: Dict with url, sections (name -> raw text), and optional references. @@ -110,16 +113,17 @@ async def search_people( ctx, tool_name="search_people" ) logger.info( - "Searching people: keywords='%s', location='%s'", + "Searching people: keywords='%s', location='%s', network='%s'", keywords, location, + network, ) await ctx.report_progress( progress=0, total=100, message="Starting people search" ) - result = await extractor.search_people(keywords, location) + result = await extractor.search_people(keywords, location, network) await ctx.report_progress(progress=100, total=100, message="Complete")