diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 13002a05a3..cc4e366744 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,4 +1,6 @@ import inspect +import time + from abc import ABC, abstractmethod from typing import ( Any, @@ -243,23 +245,44 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: class LimitItem(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - def __init__(self, max_items: int) -> None: + def __init__( + self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float] + ) -> None: self.max_items = max_items if max_items is not None else -1 + self.max_time = max_time + self.min_wait = min_wait def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 self.exhausted = False + self.start_time = time.time() + self.last_call_time = 0.0 return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - # detect when the limit is reached - if self.count == self.max_items: + # detect when the limit is reached, time or yield count + if (self.count == self.max_items) or ( + self.max_time and time.time() - self.start_time > self.max_time + ): self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # do not return any late arriving items if self.exhausted: return None self.count += 1 + + # if we have a min wait and the last iteration was less than min wait ago, + # we sleep on this thread a bit + if self.min_wait and (time.time() - self.last_call_time) < self.min_wait: + # NOTE: this should be interruptable? + # NOTE: this is sleeping on the main thread, we should carefully document this! + time.sleep(self.min_wait - (time.time() - self.last_call_time)) + + # remember last iteration time + if self.min_wait: + self.last_call_time = time.time() + return item diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 6e47e77aa3..cd40ccb87b 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -341,20 +341,27 @@ def add_filter( self._pipe.insert_step(FilterItem(item_filter), insert_at) return self - def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003 + def add_limit( + self: TDltResourceImpl, + max_items: Optional[int] = None, + max_time: Optional[float] = None, + min_wait: Optional[float] = None, + ) -> TDltResourceImpl: # noqa: A003 """Adds a limit `max_items` to the resource pipe. - This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. + This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. - Notes: - 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. - 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. - 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. + Notes: + 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. + 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. + 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. Args: - max_items (int): The maximum number of items to yield - Returns: - "DltResource": returns self + max_items (int): The maximum number of items to yield, set to None for no limit + max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit + min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting) + Returns: + "DltResource": returns self """ if self.is_transformer: @@ -365,7 +372,7 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no else: # remove existing limit if any self._pipe.remove_by_type(LimitItem) - self.add_step(LimitItem(max_items)) + self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait)) return self diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index d3d5016392..946b0ee2b8 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -1,4 +1,6 @@ import itertools +import time + from typing import Iterator import pytest @@ -910,6 +912,61 @@ def infinite_source(): assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3 +def test_limit_max_time() -> None: + @dlt.resource() + def r(): + for i in range(100): + time.sleep(0.1) + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + await asyncio.sleep(0.1) + yield i + + sync_list = list(r().add_limit(max_time=1)) + async_list = list(r_async().add_limit(max_time=1)) + + # we should have extracted 10 items within 1 second, sleep is included in the resource + # we allow for some variance in the number of items, as the sleep is not super precise + allowed_results = [ + list(range(12)), + list(range(11)), + list(range(10)), + list(range(9)), + list(range(8)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + +def test_limit_min_wait() -> None: + @dlt.resource() + def r(): + for i in range(100): + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + yield i + + sync_list = list(r().add_limit(max_time=1, min_wait=0.2)) + async_list = list(r_async().add_limit(max_time=1, min_wait=0.2)) + + # we should have extracted about 5 items within 1 second, sleep is done via min_wait + allowed_results = [ + list(range(3)), + list(range(4)), + list(range(5)), + list(range(6)), + list(range(7)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + def test_source_state() -> None: @dlt.source def test_source(expected_state):