Skip to content

Commit 2a5e274

Browse files
authored
Stream chunks of measurements tasks instead of awaiting _all_ at once (#17)
The by default day-wise request to the OSeM API lead to a problem. When querying large time spans, we created lots of requests which couldn't all be handled before an aiohttp internal timeout. This was more a client than a server side problem.
1 parent 96f48b1 commit 2a5e274

File tree

1 file changed

+24
-15
lines changed

1 file changed

+24
-15
lines changed

src/osemclient/client.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
"""
22
contains the OpenSenseMap client; the core of this library
33
"""
4-
import asyncio
54
import logging
65
from datetime import datetime, timedelta, timezone
76
from typing import AsyncGenerator, Generator, Optional
87

98
from aiohttp import ClientSession, TCPConnector
10-
from aiostream import stream
9+
from aiostream import pipe, stream
1110
from yarl import URL
1211

1312
from osemclient.filtercriteria import BoundingBox, SensorFilterCriteria
@@ -65,12 +64,18 @@ class OpenSenseMapClient:
6564
An async HTTP client for OpenSenseMap REST API
6665
"""
6766

68-
def __init__(self, limit_per_host: int = 10):
67+
def __init__(self, limit_per_host: int = 10, number_of_concurrent_requests: int = 100):
6968
"""
70-
initializes the client and its session
69+
initializes the client and its session.
70+
The limit_per_host is a limit for the number of concurrent connections to the same host (aiohttp internal).
71+
The number_of_concurrent_requests is the number of concurrent requests which this client sends to the OSeM API.
72+
Change limit_per_host if the response time/internet connection is too slow.
73+
Change number_of_concurrent_requests if you run into aiohttp timeouts because the pending requests cannot be
74+
processed fast enough.
7175
"""
7276
self._connector = TCPConnector(limit_per_host=limit_per_host)
7377
self._session = ClientSession(connector=self._connector, raise_for_status=True)
78+
self._number_of_concurrent_requests = number_of_concurrent_requests # started at once
7479
_logger.info("Initialized aiohttp session")
7580

7681
async def get_sensebox(self, sensebox_id: str) -> Box:
@@ -100,13 +105,13 @@ async def get_senseboxes_from_area(self, bounding_box: BoundingBox) -> list[Box]
100105
_logger.debug("Retrieved %d senseboxes", len(result.root))
101106
return result.root
102107

103-
async def _perform_measurements_request(self, url: URL) -> list[Measurement]:
108+
async def _perform_measurements_request(self, url: URL) -> AsyncGenerator[list[Measurement], None]:
104109
_logger.debug("Starting download of measurements from %s", url)
105110
try:
106111
async with self._session.get(url) as response:
107112
result = _Measurements.model_validate(await response.json())
108113
_logger.debug("Retrieved %d measurements", len(result.root))
109-
return result.root
114+
yield result.root
110115
finally:
111116
_logger.debug("Finished downloading measurements from %s", url)
112117

@@ -143,7 +148,7 @@ async def get_sensor_measurements(
143148
# The OSeM API only allows retrieving 10k measurements at once.
144149
# We could try to derive the time span width from the measurement frequency, but it's easier to just always
145150
# use a width of 1 day and merge the results.
146-
measurements_tasks = [
151+
measurements_tasks = (
147152
self._perform_measurements_request(_url)
148153
for _url in _get_urls_in_date_range(
149154
from_date=_from_date,
@@ -152,15 +157,19 @@ async def get_sensor_measurements(
152157
sensebox_id=sensebox_id,
153158
sensor_id=sensor_id,
154159
)
155-
]
160+
)
161+
measurements_chunks_streamer = stream.merge(*measurements_tasks) | pipe.chunks(
162+
self._number_of_concurrent_requests
163+
) # todo: I thought you could use flatten here to unpack the chunks again but didn't get it working
156164
number_of_measurements_yielded = 0
157-
for measurements_task in asyncio.as_completed(measurements_tasks):
158-
measurements = await measurements_task
159-
for measurement in measurements:
160-
yield measurement
161-
number_of_measurements_yielded += 1
162-
if number_of_measurements_yielded % 10_000 == 0:
163-
_logger.debug("Yielded %d measurements so far...", number_of_measurements_yielded)
165+
async with measurements_chunks_streamer.stream() as measurements_chunk_stream:
166+
async for measurements_chunks in measurements_chunk_stream:
167+
for measurements_chunk in measurements_chunks:
168+
for measurement in measurements_chunk:
169+
yield measurement
170+
number_of_measurements_yielded += 1
171+
if number_of_measurements_yielded % 10_000 == 0:
172+
_logger.debug("Yielded %d measurements so far...", number_of_measurements_yielded)
164173
_logger.info(
165174
"Yielded %d measurements in total from box %s and sensor %s",
166175
number_of_measurements_yielded,

0 commit comments

Comments
 (0)