Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions linkedin_mcp_server/scraping/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ async def extract_page(
self,
url: str,
section_name: str,
max_scrolls: int | None = None,
) -> ExtractedSection:
"""Navigate to a URL, scroll to load lazy content, and extract innerText.

Expand All @@ -661,14 +662,14 @@ async def extract_page(
Returns empty string for unexpected non-domain failures (error isolation).
"""
try:
result = await self._extract_page_once(url, section_name)
result = await self._extract_page_once(url, section_name, max_scrolls)
if result.text != _RATE_LIMITED_MSG:
return result

# Retry once after backoff
logger.info("Retrying %s after %.0fs backoff", url, _RATE_LIMIT_RETRY_DELAY)
await asyncio.sleep(_RATE_LIMIT_RETRY_DELAY)
return await self._extract_page_once(url, section_name)
return await self._extract_page_once(url, section_name, max_scrolls)

except LinkedInScraperException:
raise
Expand All @@ -689,6 +690,7 @@ async def _extract_page_once(
self,
url: str,
section_name: str,
max_scrolls: int | None = None,
) -> ExtractedSection:
"""Single attempt to navigate, scroll, and extract innerText."""
await self._navigate_to_page(url)
Expand Down Expand Up @@ -755,11 +757,38 @@ async def _extract_page_once(
except PlaywrightTimeoutError:
logger.debug("Detail section content did not appear on %s", url)

# Detail pages paginate with a "Show more" button inside <main>, not scroll.
# Click it until it disappears or the budget runs out.
if is_details:
max_clicks = max_scrolls if max_scrolls is not None else 5
for i in range(max_clicks):
button = self._page.locator("main button").filter(
has_text=re.compile(r"^Show (more|all)\b", re.IGNORECASE)
)
try:
if await button.count() == 0:
logger.debug("No 'Show more' button after %d clicks", i)
break
target = button.first
if not await target.is_visible():
break
await target.scroll_into_view_if_needed(timeout=2000)
await target.click(timeout=2000)
await asyncio.sleep(1.0)
except PlaywrightTimeoutError:
logger.debug("Show more click timed out after %d clicks", i)
break
except Exception as e:
logger.debug("Show more click failed: %s", e)
break

# Scroll to trigger lazy loading
if is_activity:
await scroll_to_bottom(self._page, pause_time=1.0, max_scrolls=10)
scrolls = max_scrolls if max_scrolls is not None else 10
await scroll_to_bottom(self._page, pause_time=1.0, max_scrolls=scrolls)
else:
await scroll_to_bottom(self._page, pause_time=0.5, max_scrolls=5)
scrolls = max_scrolls if max_scrolls is not None else 5
await scroll_to_bottom(self._page, pause_time=0.5, max_scrolls=scrolls)

# Extract text from main content area
raw_result = await self._extract_root_content(["main"])
Expand Down Expand Up @@ -864,6 +893,7 @@ async def scrape_person(
username: str,
requested: set[str],
callbacks: ProgressCallback | None = None,
max_scrolls: int | None = None,
) -> dict[str, Any]:
"""Scrape a person profile with configurable sections.

Expand Down Expand Up @@ -900,7 +930,9 @@ async def scrape_person(
)
else:
extracted = await self.extract_page(
url, section_name=section_name
url,
section_name=section_name,
max_scrolls=max_scrolls,
)

if extracted.text and extracted.text != _RATE_LIMITED_MSG:
Expand Down
17 changes: 15 additions & 2 deletions linkedin_mcp_server/tools/person.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
"""

import logging
from typing import Any
from typing import Annotated, Any

from fastmcp import Context, FastMCP
from pydantic import Field

from linkedin_mcp_server.callbacks import MCPContextProgressCallback
from linkedin_mcp_server.constants import TOOL_TIMEOUT_SECONDS
Expand All @@ -34,6 +35,7 @@ async def get_person_profile(
linkedin_username: str,
ctx: Context,
sections: str | None = None,
max_scrolls: Annotated[int, Field(ge=1, le=50)] | None = None,
extractor: Any | None = None,
) -> dict[str, Any]:
"""
Expand All @@ -47,6 +49,14 @@ async def get_person_profile(
Available sections: experience, education, interests, honors, languages, certifications, skills, projects, contact_info, posts
Examples: "experience,education", "contact_info", "skills,projects", "honors,languages", "posts"
Default (None) scrapes only the main profile page.
max_scrolls: Maximum pagination attempts per section to load more content.
On detail sections (experience, certifications, skills, etc.) this
is the max number of "Show more" button clicks. On activity/posts
it is the max scroll-to-bottom iterations. Applies to all sections
in this call. Default (None) uses 5 for detail sections and 10 for
posts. Increase when a profile has many items in a section
(e.g., 30+ certifications, max_scrolls=20). To avoid slowing down
other sections, request heavy sections in a separate call.

Returns:
Dict with url, sections (name -> raw text), and optional references.
Expand All @@ -68,7 +78,10 @@ async def get_person_profile(

cb = MCPContextProgressCallback(ctx)
result = await extractor.scrape_person(
linkedin_username, requested, callbacks=cb
linkedin_username,
requested,
callbacks=cb,
max_scrolls=max_scrolls,
)

if unknown:
Expand Down
241 changes: 241 additions & 0 deletions tests/test_scraping.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,33 @@ async def test_projects_visits_details_page(self, mock_page):
assert any("/details/projects/" in url for url in urls)
assert "projects" in result["sections"]

async def test_scrape_person_passes_max_scrolls(self, mock_page):
extractor = LinkedInExtractor(mock_page)
with (
patch.object(
extractor,
"extract_page",
new_callable=AsyncMock,
return_value=extracted("text"),
) as mock_extract,
patch.object(
extractor,
"_extract_overlay",
new_callable=AsyncMock,
return_value=extracted(""),
),
patch(
"linkedin_mcp_server.scraping.extractor.asyncio.sleep",
new_callable=AsyncMock,
),
):
await extractor.scrape_person(
"test-user", {"certifications"}, max_scrolls=15
)

for call in mock_extract.call_args_list:
assert call.kwargs.get("max_scrolls") == 15


class TestDetectConnectionState:
"""Tests for connection state detection from profile text."""
Expand Down Expand Up @@ -1948,6 +1975,220 @@ async def test_details_page_waits_for_panel_content(self, mock_page):
assert kwargs["pause_time"] == 0.5
assert kwargs["max_scrolls"] == 5

async def test_max_scrolls_override_passed_to_scroll_to_bottom(self, mock_page):
"""Custom max_scrolls on a detail page overrides the default of 5."""
mock_page.evaluate = AsyncMock(
return_value={
"source": "root",
"text": "Experience\nSoftware Engineer",
"references": [],
}
)
mock_page.wait_for_function = AsyncMock()
extractor = LinkedInExtractor(mock_page)
with (
patch(
"linkedin_mcp_server.scraping.extractor.scroll_to_bottom",
new_callable=AsyncMock,
) as mock_scroll,
patch(
"linkedin_mcp_server.scraping.extractor.detect_rate_limit",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.handle_modal_close",
new_callable=AsyncMock,
return_value=False,
),
):
await extractor._extract_page_once(
"https://www.linkedin.com/in/billgates/details/certifications/",
section_name="certifications",
max_scrolls=20,
)

mock_scroll.assert_awaited_once()
_, kwargs = mock_scroll.call_args
assert kwargs["max_scrolls"] == 20

async def test_default_scrolls_without_max_scrolls_override(self, mock_page):
"""Without max_scrolls, detail pages use the default of 5."""
mock_page.evaluate = AsyncMock(
return_value={
"source": "root",
"text": "Experience\nSoftware Engineer",
"references": [],
}
)
mock_page.wait_for_function = AsyncMock()
extractor = LinkedInExtractor(mock_page)
with (
patch(
"linkedin_mcp_server.scraping.extractor.scroll_to_bottom",
new_callable=AsyncMock,
) as mock_scroll,
patch(
"linkedin_mcp_server.scraping.extractor.detect_rate_limit",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.handle_modal_close",
new_callable=AsyncMock,
return_value=False,
),
):
await extractor._extract_page_once(
"https://www.linkedin.com/in/billgates/details/certifications/",
section_name="certifications",
)

mock_scroll.assert_awaited_once()
_, kwargs = mock_scroll.call_args
assert kwargs["max_scrolls"] == 5

async def test_details_page_clicks_show_more_until_gone(self, mock_page):
"""Detail pages click 'Show more' in a loop until the button disappears."""
mock_page.evaluate = AsyncMock(
return_value={"source": "root", "text": "text", "references": []}
)
mock_page.wait_for_function = AsyncMock()

show_more = MagicMock()
# count() returns 1, 1, 0 across iterations — button disappears on 3rd check
show_more.count = AsyncMock(side_effect=[1, 1, 0])
show_more.is_visible = AsyncMock(return_value=True)
show_more.scroll_into_view_if_needed = AsyncMock()
show_more.click = AsyncMock()
show_more.first = show_more
show_more.filter = MagicMock(return_value=show_more)

def locator_side_effect(selector):
if selector == "main button":
return show_more
return MagicMock(count=AsyncMock(return_value=0))

mock_page.locator = MagicMock(side_effect=locator_side_effect)
extractor = LinkedInExtractor(mock_page)

with (
patch(
"linkedin_mcp_server.scraping.extractor.scroll_to_bottom",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.detect_rate_limit",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.handle_modal_close",
new_callable=AsyncMock,
return_value=False,
),
patch(
"linkedin_mcp_server.scraping.extractor.asyncio.sleep",
new_callable=AsyncMock,
),
):
await extractor._extract_page_once(
"https://www.linkedin.com/in/billgates/details/certifications/",
section_name="certifications",
)

assert show_more.click.await_count == 2

async def test_details_page_show_more_respects_max_scrolls_budget(self, mock_page):
"""When 'Show more' never disappears, loop exits after max_scrolls clicks."""
mock_page.evaluate = AsyncMock(
return_value={"source": "root", "text": "text", "references": []}
)
mock_page.wait_for_function = AsyncMock()

show_more = MagicMock()
show_more.count = AsyncMock(return_value=1) # always present
show_more.is_visible = AsyncMock(return_value=True)
show_more.scroll_into_view_if_needed = AsyncMock()
show_more.click = AsyncMock()
show_more.first = show_more
show_more.filter = MagicMock(return_value=show_more)

def locator_side_effect(selector):
if selector == "main button":
return show_more
return MagicMock(count=AsyncMock(return_value=0))

mock_page.locator = MagicMock(side_effect=locator_side_effect)
extractor = LinkedInExtractor(mock_page)

with (
patch(
"linkedin_mcp_server.scraping.extractor.scroll_to_bottom",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.detect_rate_limit",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.handle_modal_close",
new_callable=AsyncMock,
return_value=False,
),
patch(
"linkedin_mcp_server.scraping.extractor.asyncio.sleep",
new_callable=AsyncMock,
),
):
await extractor._extract_page_once(
"https://www.linkedin.com/in/billgates/details/experience/",
section_name="experience",
max_scrolls=3,
)

assert show_more.click.await_count == 3

async def test_non_details_page_does_not_click_show_more(self, mock_page):
"""Non-details URLs (main profile, activity) skip the Show more loop."""
mock_page.evaluate = AsyncMock(
return_value={"source": "root", "text": "text", "references": []}
)
mock_page.wait_for_function = AsyncMock()

show_more = MagicMock()
show_more.count = AsyncMock(return_value=1)
show_more.click = AsyncMock()
show_more.first = show_more
show_more.filter = MagicMock(return_value=show_more)

def locator_side_effect(selector):
if selector == "main button":
return show_more
return MagicMock(count=AsyncMock(return_value=0))

mock_page.locator = MagicMock(side_effect=locator_side_effect)
extractor = LinkedInExtractor(mock_page)

with (
patch(
"linkedin_mcp_server.scraping.extractor.scroll_to_bottom",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.detect_rate_limit",
new_callable=AsyncMock,
),
patch(
"linkedin_mcp_server.scraping.extractor.handle_modal_close",
new_callable=AsyncMock,
return_value=False,
),
):
await extractor._extract_page_once(
"https://www.linkedin.com/in/billgates/",
section_name="main_profile",
)

show_more.click.assert_not_awaited()

async def test_activity_page_timeout_proceeds_gracefully(self, mock_page):
"""When activity feed content never loads, extraction proceeds with available text."""
from patchright.async_api import TimeoutError as PlaywrightTimeoutError
Expand Down
Loading
Loading