Skip to content

Commit

Permalink
Merge pull request #1677 from dlt-hub/feat/1637_stop-pagination-after…
Browse files Browse the repository at this point in the history
…-empty-page

RESTClient: stops pagination after empty page (Feat/1637)
  • Loading branch information
willi-mueller authored Aug 19, 2024
2 parents 2b9a422 + 83bab15 commit d448122
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 68 deletions.
2 changes: 1 addition & 1 deletion dlt/sources/helpers/rest_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def raise_for_status(response: Response, *args: Any, **kwargs: Any) -> None:

if paginator is None:
paginator = self.detect_paginator(response, data)
paginator.update_state(response)
paginator.update_state(response, data)
paginator.update_request(request)

# yield data with context
Expand Down
86 changes: 55 additions & 31 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import warnings
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse, urljoin

from requests import Response, Request
Expand Down Expand Up @@ -39,7 +39,7 @@ def init_request(self, request: Request) -> None: # noqa: B027, optional overri
pass

@abstractmethod
def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
"""Updates the paginator's state based on the response from the API.
This method should extract necessary pagination details (like next page
Expand Down Expand Up @@ -73,7 +73,7 @@ def __str__(self) -> str:
class SinglePagePaginator(BasePaginator):
"""A paginator for single-page API responses."""

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
self._has_next_page = False

def update_request(self, request: Request) -> None:
Expand All @@ -96,6 +96,7 @@ def __init__(
maximum_value: Optional[int] = None,
total_path: Optional[jsonpath.TJsonPath] = None,
error_message_items: str = "items",
stop_after_empty_page: Optional[bool] = True,
):
"""
Args:
Expand All @@ -116,44 +117,55 @@ def __init__(
If not provided, `maximum_value` must be specified.
error_message_items (str): The name of the items in the error message.
Defaults to 'items'.
stop_after_empty_page (bool): Whether pagination should stop when
a page contains no result items. Defaults to `True`.
"""
super().__init__()
if total_path is None and maximum_value is None:
raise ValueError("Either `total_path` or `maximum_value` must be provided.")
if total_path is None and maximum_value is None and not stop_after_empty_page:
raise ValueError(
"Either `total_path` or `maximum_value` or `stop_after_empty_page` must be provided."
)
self.param_name = param_name
self.current_value = initial_value
self.value_step = value_step
self.base_index = base_index
self.maximum_value = maximum_value
self.total_path = jsonpath.compile_path(total_path) if total_path else None
self.error_message_items = error_message_items
self.stop_after_empty_page = stop_after_empty_page

def init_request(self, request: Request) -> None:
if request.params is None:
request.params = {}

request.params[self.param_name] = self.current_value

def update_state(self, response: Response) -> None:
total = None
if self.total_path:
response_json = response.json()
values = jsonpath.find_values(self.total_path, response_json)
total = values[0] if values else None
if total is None:
self._handle_missing_total(response_json)

try:
total = int(total)
except ValueError:
self._handle_invalid_total(total)

self.current_value += self.value_step

if (total is not None and self.current_value >= total + self.base_index) or (
self.maximum_value is not None and self.current_value >= self.maximum_value
):
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
if self._stop_after_this_page(data):
self._has_next_page = False
else:
total = None
if self.total_path:
response_json = response.json()
values = jsonpath.find_values(self.total_path, response_json)
total = values[0] if values else None
if total is None:
self._handle_missing_total(response_json)

try:
total = int(total)
except ValueError:
self._handle_invalid_total(total)

self.current_value += self.value_step

if (total is not None and self.current_value >= total + self.base_index) or (
self.maximum_value is not None and self.current_value >= self.maximum_value
):
self._has_next_page = False

def _stop_after_this_page(self, data: Optional[List[Any]]=None) -> bool:
return self.stop_after_empty_page and not data

def _handle_missing_total(self, response_json: Dict[str, Any]) -> None:
raise ValueError(
Expand Down Expand Up @@ -229,6 +241,7 @@ def __init__(
page_param: str = "page",
total_path: jsonpath.TJsonPath = "total",
maximum_page: Optional[int] = None,
stop_after_empty_page: Optional[bool] = True,
):
"""
Args:
Expand All @@ -246,9 +259,13 @@ def __init__(
will stop once this page is reached or exceeded, even if more
data is available. This allows you to limit the maximum number
of pages for pagination. Defaults to None.
stop_after_empty_page (bool): Whether pagination should stop when
a page contains no result items. Defaults to `True`.
"""
if total_path is None and maximum_page is None:
raise ValueError("Either `total_path` or `maximum_page` must be provided.")
if total_path is None and maximum_page is None and not stop_after_empty_page:
raise ValueError(
"Either `total_path` or `maximum_page` or `stop_after_empty_page` must be provided."
)

page = page if page is not None else base_page

Expand All @@ -260,6 +277,7 @@ def __init__(
value_step=1,
maximum_value=maximum_page,
error_message_items="pages",
stop_after_empty_page=stop_after_empty_page,
)

def __str__(self) -> str:
Expand Down Expand Up @@ -330,6 +348,7 @@ def __init__(
limit_param: str = "limit",
total_path: jsonpath.TJsonPath = "total",
maximum_offset: Optional[int] = None,
stop_after_empty_page: Optional[bool] = True,
) -> None:
"""
Args:
Expand All @@ -347,15 +366,20 @@ def __init__(
pagination will stop once this offset is reached or exceeded,
even if more data is available. This allows you to limit the
maximum range for pagination. Defaults to None.
stop_after_empty_page (bool): Whether pagination should stop when
a page contains no result items. Defaults to `True`.
"""
if total_path is None and maximum_offset is None:
raise ValueError("Either `total_path` or `maximum_offset` must be provided.")
if total_path is None and maximum_offset is None and not stop_after_empty_page:
raise ValueError(
"Either `total_path` or `maximum_offset` or `stop_after_empty_page` must be provided."
)
super().__init__(
param_name=offset_param,
initial_value=offset,
total_path=total_path,
value_step=limit,
maximum_value=maximum_offset,
stop_after_empty_page=stop_after_empty_page,
)
self.limit_param = limit_param
self.limit = limit
Expand Down Expand Up @@ -484,7 +508,7 @@ def __init__(self, links_next_key: str = "next") -> None:
super().__init__()
self.links_next_key = links_next_key

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
"""Extracts the next page URL from the 'Link' header in the response."""
self._next_reference = response.links.get(self.links_next_key, {}).get("url")

Expand Down Expand Up @@ -539,7 +563,7 @@ def __init__(
super().__init__()
self.next_url_path = jsonpath.compile_path(next_url_path)

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
"""Extracts the next page URL from the JSON response."""
values = jsonpath.find_values(self.next_url_path, response.json())
self._next_reference = values[0] if values else None
Expand Down Expand Up @@ -618,7 +642,7 @@ def __init__(
self.cursor_path = jsonpath.compile_path(cursor_path)
self.cursor_param = cursor_param

def update_state(self, response: Response) -> None:
def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
"""Extracts the cursor value from the JSON response."""
values = jsonpath.find_values(self.cursor_path, response.json())
self._next_reference = values[0] if values else None
Expand Down
8 changes: 4 additions & 4 deletions docs/website/docs/dlt-ecosystem/verified-sources/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ You can configure the pagination for the `posts` resource like this:
{
"path": "posts",
"paginator": {
"type": "json_response",
"type": "json_link",
"next_url_path": "pagination.next",
}
}
Expand All @@ -380,7 +380,7 @@ You can configure the pagination for the `posts` resource like this:
Alternatively, you can use the paginator instance directly:

```py
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator

# ...

Expand All @@ -402,8 +402,8 @@ These are the available paginators:
| ------------ | -------------- | ----------- |
| `json_link` | [JSONLinkPaginator](../../general-usage/http/rest-client.md#jsonresponsepaginator) | The link to the next page is in the body (JSON) of the response.<br/>*Parameters:*<ul><li>`next_url_path` (str) - the JSONPath to the next page URL</li></ul> |
| `header_link` | [HeaderLinkPaginator](../../general-usage/http/rest-client.md#headerlinkpaginator) | The links to the next page are in the response headers.<br/>*Parameters:*<ul><li>`link_header` (str) - the name of the header containing the links. Default is "next".</li></ul> |
| `offset` | [OffsetPaginator](../../general-usage/http/rest-client.md#offsetpaginator) | The pagination is based on an offset parameter. With total items count either in the response body or explicitly provided.<br/>*Parameters:*<ul><li>`limit` (int) - the maximum number of items to retrieve in each request</li><li>`offset` (int) - the initial offset for the first request. Defaults to `0`</li><li>`offset_param` (str) - the name of the query parameter used to specify the offset. Defaults to "offset"</li><li>`limit_param` (str) - the name of the query parameter used to specify the limit. Defaults to "limit"</li><li>`total_path` (str) - a JSONPath expression for the total number of items. If not provided, pagination is controlled by `maximum_offset`</li><li>`maximum_offset` (int) - optional maximum offset value. Limits pagination even without total count</li></ul> |
| `page_number` | [PageNumberPaginator](../../general-usage/http/rest-client.md#pagenumberpaginator) | The pagination is based on a page number parameter. With total pages count either in the response body or explicitly provided.<br/>*Parameters:*<ul><li>`base_page` (int) - the starting page number. Defaults to `0`</li><li>`page_param` (str) - the query parameter name for the page number. Defaults to "page"</li><li>`total_path` (str) - a JSONPath expression for the total number of pages. If not provided, pagination is controlled by `maximum_page`</li><li>`maximum_page` (int) - optional maximum page number. Stops pagination once this page is reached</li></ul> |
| `offset` | [OffsetPaginator](../../general-usage/http/rest-client.md#offsetpaginator) | The pagination is based on an offset parameter. With total items count either in the response body or explicitly provided.<br/>*Parameters:*<ul><li>`limit` (int) - the maximum number of items to retrieve in each request</li><li>`offset` (int) - the initial offset for the first request. Defaults to `0`</li><li>`offset_param` (str) - the name of the query parameter used to specify the offset. Defaults to "offset"</li><li>`limit_param` (str) - the name of the query parameter used to specify the limit. Defaults to "limit"</li><li>`total_path` (str) - a JSONPath expression for the total number of items. If not provided, pagination is controlled by `maximum_offset` and `stop_after_empty_page`</li><li>`maximum_offset` (int) - optional maximum offset value. Limits pagination even without total count</li><li>`stop_after_empty_page` (bool) - Whether pagination should stop when a page contains no result items. Defaults to `True`</li></ul> |
| `page_number` | [PageNumberPaginator](../../general-usage/http/rest-client.md#pagenumberpaginator) | The pagination is based on a page number parameter. With total pages count either in the response body or explicitly provided.<br/>*Parameters:*<ul><li>`base_page` (int) - the starting page number. Defaults to `0`</li><li>`page_param` (str) - the query parameter name for the page number. Defaults to "page"</li><li>`total_path` (str) - a JSONPath expression for the total number of pages. If not provided, pagination is controlled by `maximum_page` and `stop_after_empty_page`</li><li>`maximum_page` (int) - optional maximum page number. Stops pagination once this page is reached</li><li>`stop_after_empty_page` (bool) - Whether pagination should stop when a page contains no result items. Defaults to `True`</li></ul> |
| `cursor` | [JSONResponseCursorPaginator](../../general-usage/http/rest-client.md#jsonresponsecursorpaginator) | The pagination is based on a cursor parameter. The value of the cursor is in the response body (JSON).<br/>*Parameters:*<ul><li>`cursor_path` (str) - the JSONPath to the cursor value. Defaults to "cursors.next"</li><li>`cursor_param` (str) - the query parameter name for the cursor. Defaults to "after"</li></ul> |
| `single_page` | SinglePagePaginator | The response will be interpreted as a single-page response, ignoring possible pagination metadata. |
| `auto` | `None` | Explicitly specify that the source should automatically detect the pagination method. |
Expand Down
Loading

0 comments on commit d448122

Please sign in to comment.