diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 24eb6d4..9d2785c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,8 +19,8 @@ jobs: - uses: actions/checkout@v4 - uses: astral-sh/setup-uv@v6 - run: uv python install ${{ matrix.python }} - - run: uv run ruff check src/obelisk/ - - run: uv run ruff format --check src/obelisk/ + - run: uv run ruff check + - run: uv run ruff format - run: uv run mypy test: name: Run tests diff --git a/hooks/pre-commit b/hooks/pre-commit index 5fcd963..aea2099 100755 --- a/hooks/pre-commit +++ b/hooks/pre-commit @@ -10,5 +10,6 @@ # Redirect output to stderr. exec 1>&2 -uv run ruff format --check src/obelisk/ +uv run ruff format --check +uv run ruff check uv run mypy diff --git a/pyproject.toml b/pyproject.toml index 41d965b..b97d87c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,3 +54,11 @@ packages = ["src/obelisk"] [tool.mypy] files = "src/obelisk" strict = true + +[tool.ruff] +include = ["src/obelisk/**/*.py"] + +[tool.ruff.lint] +select = ["E4", "E7", "E9", "F", "ASYNC", "S", "B", "FIX", "SIM", "C90", "N", "PERF", "UP"] +# Ignore N815, camelcase field names are usually for serialisation reasons +ignore = ["N815"] diff --git a/src/obelisk/asynchronous/base.py b/src/obelisk/asynchronous/base.py index 131d995..bb1856f 100644 --- a/src/obelisk/asynchronous/base.py +++ b/src/obelisk/asynchronous/base.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta import logging import base64 -from typing import Any, Optional +from typing import Any import httpx @@ -19,9 +19,9 @@ class BaseClient: _client: str = "" _secret: str = "" - _token: Optional[str] = None + _token: str | None = None """Current authentication token""" - _token_expires: Optional[datetime] = None + _token_expires: datetime | None = None """Deadline after which token is no longer useable""" grace_period: timedelta = timedelta(seconds=10) @@ -35,7 +35,7 @@ def __init__( self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), + retry_strategy: RetryStrategy = NoRetryStrategy(), # noqa: B008 # This is fine to bew shared kind: ObeliskKind = ObeliskKind.CLASSIC, ) -> None: self._client = client @@ -47,7 +47,7 @@ def __init__( async def _get_token(self) -> None: auth_string = str( - base64.b64encode(f"{self._client}:{self._secret}".encode("utf-8")), "utf-8" + base64.b64encode(f"{self._client}:{self._secret}".encode()), "utf-8" ) headers = { "Authorization": f"Basic {auth_string}", @@ -74,7 +74,7 @@ async def _get_token(self) -> None: ) response = request.json() - except Exception as e: + except Exception as e: # noqa: PERF203 # retry strategy should add delay last_error = e self.log.error(e) continue @@ -88,7 +88,7 @@ async def _get_token(self) -> None: if request.status_code != 200: if "error" in response: self.log.warning(f"Could not authenticate, {response['error']}") - raise AuthenticationError + raise AuthenticationError self._token = response["access_token"] self._token_expires = datetime.now() + timedelta( @@ -113,7 +113,7 @@ async def _verify_token(self) -> None: continue async def http_post( - self, url: str, data: Any = None, params: Optional[dict[str, str]] = None + self, url: str, data: Any = None, params: dict[str, str] | None = None ) -> httpx.Response: """ Send an HTTP POST request to Obelisk, @@ -162,7 +162,7 @@ async def http_post( return response async def http_get( - self, url: str, params: Optional[dict[str, str]] = None + self, url: str, params: dict[str, str] | None = None ) -> httpx.Response: """ Send an HTTP GET request to Obelisk, diff --git a/src/obelisk/asynchronous/client.py b/src/obelisk/asynchronous/client.py index c900673..99a5b7c 100644 --- a/src/obelisk/asynchronous/client.py +++ b/src/obelisk/asynchronous/client.py @@ -1,7 +1,8 @@ import json from datetime import datetime, timedelta from math import floor -from typing import Any, AsyncGenerator, List, Literal, Optional +from typing import Any, Literal +from collections.abc import AsyncGenerator import httpx from pydantic import ValidationError @@ -23,16 +24,16 @@ class Obelisk(BaseClient): async def fetch_single_chunk( self, - datasets: List[str], - metrics: Optional[List[str]] = None, - fields: Optional[List[str]] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict[str, Any]] = None, - filter_: Optional[dict[str, Any]] = None, - limit: Optional[int] = None, - limit_by: Optional[dict[str, Any]] = None, - cursor: Optional[str] = None, + datasets: list[str], + metrics: list[str] | None = None, + fields: list[str] | None = None, + from_timestamp: int | None = None, + to_timestamp: int | None = None, + order_by: dict[str, Any] | None = None, + filter_: dict[str, Any] | None = None, + limit: int | None = None, + limit_by: dict[str, Any] | None = None, + cursor: str | None = None, ) -> QueryResult: """ Queries one chunk of events from Obelisk for given parameters, @@ -103,24 +104,24 @@ async def fetch_single_chunk( except json.JSONDecodeError as e: msg = f"Obelisk response is not a JSON object: {e}" self.log.warning(msg) - raise ObeliskError(msg) + raise ObeliskError(msg) from e except ValidationError as e: msg = f"Response cannot be validated: {e}" self.log.warning(msg) - raise ObeliskError(msg) + raise ObeliskError(msg) from e async def query( self, - datasets: List[str], - metrics: Optional[List[str]] = None, - fields: Optional[List[str]] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict[str, Any]] = None, - filter_: Optional[dict[str, Any]] = None, - limit: Optional[int] = None, - limit_by: Optional[dict[str, Any]] = None, - ) -> List[Datapoint]: + datasets: list[str], + metrics: list[str] | None = None, + fields: list[str] | None = None, + from_timestamp: int | None = None, + to_timestamp: int | None = None, + order_by: dict[str, Any] | None = None, + filter_: dict[str, Any] | None = None, + limit: int | None = None, + limit_by: dict[str, Any] | None = None, + ) -> list[Datapoint]: """ Queries data from obelisk, automatically iterating when a cursor is returned. @@ -157,8 +158,8 @@ async def query( to a specified maximum number. """ - cursor: Optional[str] | Literal[True] = True - result_set: List[Datapoint] = [] + cursor: str | None | Literal[True] = True + result_set: list[Datapoint] = [] while cursor: actual_cursor = cursor if cursor is not True else None @@ -191,14 +192,14 @@ async def query( async def query_time_chunked( self, - datasets: List[str], - metrics: List[str], + datasets: list[str], + metrics: list[str], from_time: datetime, to_time: datetime, jump: timedelta, - filter_: Optional[dict[str, Any]] = None, + filter_: dict[str, Any] | None = None, direction: Literal["asc", "desc"] = "asc", - ) -> AsyncGenerator[List[Datapoint], None]: + ) -> AsyncGenerator[list[Datapoint], None]: """ Fetches all data matching the provided filters, yielding one chunk at a time. @@ -239,7 +240,7 @@ async def query_time_chunked( async def send( self, dataset: str, - data: List[dict[str, Any]], + data: list[dict[str, Any]], precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, mode: IngestMode = IngestMode.DEFAULT, ) -> httpx.Response: diff --git a/src/obelisk/asynchronous/core.py b/src/obelisk/asynchronous/core.py index b342120..6846f34 100644 --- a/src/obelisk/asynchronous/core.py +++ b/src/obelisk/asynchronous/core.py @@ -25,16 +25,12 @@ ) from typing import ( Annotated, - AsyncIterator, - Dict, - Iterator, - List, Literal, - Optional, Any, cast, get_args, ) +from collections.abc import AsyncIterator, Iterator from typing_extensions import Self from numbers import Number @@ -71,7 +67,7 @@ def type_suffix(metric: str) -> DataType: """Type of aggregation Obelisk can process""" -Datapoint = Dict[str, Any] +Datapoint = dict[str, Any] """Datapoints resulting from queries are modeled as simple dicts, as fields can come and go depending on query.""" @@ -92,11 +88,11 @@ class IncomingDatapoint(BaseModel): .. automethod:: check_metric_type(self) """ - timestamp: Optional[AwareDatetime] = None + timestamp: AwareDatetime | None = None metric: str value: Any - labels: Optional[Dict[str, str]] = None - location: Optional[ObeliskPosition] = None + labels: dict[str, str] | None = None + location: ObeliskPosition | None = None @model_validator(mode="after") def check_metric_type(self) -> Self: @@ -107,11 +103,11 @@ def check_metric_type(self) -> Self: f"Type suffix mismatch, expected number, got {type(self.value)}" ) - if suffix == "number[]": - if type(self.value) is not list or any( - [not isinstance(x, Number) for x in self.value] - ): - raise ValueError("Type suffix mismatch, expected value of number[]") + if suffix == "number[]" and ( + type(self.value) is not list + or any([not isinstance(x, Number) for x in self.value]) + ): + raise ValueError("Type suffix mismatch, expected value of number[]") # Do not check json, most things should be serialisable @@ -130,32 +126,29 @@ def check_metric_type(self) -> Self: class QueryParams(BaseModel): dataset: str - groupBy: Optional[List[FieldName]] = None - aggregator: Optional[Aggregator] = None - fields: Optional[List[FieldName]] = None - orderBy: Optional[List[str]] = ( + groupBy: list[FieldName] | None = None + aggregator: Aggregator | None = None + fields: list[FieldName] | None = None + orderBy: list[str] | None = ( None # More complex than just FieldName, can be prefixed with - to invert sort ) - dataType: Optional[DataType] = None - filter_: Annotated[Optional[str | Filter], Field(serialization_alias="filter")] = ( - None - ) + dataType: DataType | None = None + filter_: Annotated[str | Filter | None, Field(serialization_alias="filter")] = None """ Obelisk CORE handles filtering in `RSQL format `__ , to make it easier to also programatically write these filters, we provide the :class:`Filter` option as well. Suffix to avoid collisions. """ - cursor: Optional[str] = None + cursor: str | None = None limit: int = 1000 model_config = ConfigDict(arbitrary_types_allowed=True) @model_validator(mode="after") def check_datatype_needed(self) -> Self: - if self.fields is None or "value" in self.fields: - if self.dataType is None: - raise ValueError("Value field requested, must specify datatype") + if (self.fields is None or "value" in self.fields) and self.dataType is None: + raise ValueError("Value field requested, must specify datatype") return self @@ -165,14 +158,14 @@ def to_dict(self) -> dict[str, Any]: class ChunkedParams(BaseModel): dataset: str - groupBy: Optional[List[FieldName]] = None - aggregator: Optional[Aggregator] = None - fields: Optional[List[FieldName]] = None - orderBy: Optional[List[str]] = ( + groupBy: list[FieldName] | None = None + aggregator: Aggregator | None = None + fields: list[FieldName] | None = None + orderBy: list[str] | None = ( None # More complex than just FieldName, can be prefixed with - to invert sort ) - dataType: Optional[DataType] = None - filter_: Optional[str | Filter] = None + dataType: DataType | None = None + filter_: str | Filter | None = None """Underscore suffix to avoid name collisions""" start: datetime end: datetime @@ -182,9 +175,8 @@ class ChunkedParams(BaseModel): @model_validator(mode="after") def check_datatype_needed(self) -> Self: - if self.fields is None or "value" in self.fields: - if self.dataType is None: - raise ValueError("Value field requested, must specify datatype") + if (self.fields is None or "value" in self.fields) and self.dataType is None: + raise ValueError("Value field requested, must specify datatype") return self @@ -210,8 +202,8 @@ def chunks(self) -> Iterator[QueryParams]: class QueryResult(BaseModel): - cursor: Optional[str] = None - items: List[Datapoint] + cursor: str | None = None + items: list[Datapoint] class Client(BaseClient): @@ -222,7 +214,7 @@ def __init__( self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), + retry_strategy: RetryStrategy = NoRetryStrategy(), # noqa: B008 # This is fine to bew shared ) -> None: BaseClient.__init__( self, @@ -235,7 +227,7 @@ def __init__( async def send( self, dataset: str, - data: List[IncomingDatapoint], + data: list[IncomingDatapoint], ) -> httpx.Response: """ Publishes data to Obelisk @@ -281,15 +273,15 @@ async def fetch_single_chunk(self, params: QueryParams) -> QueryResult: except json.JSONDecodeError as e: msg = f"Obelisk response is not a JSON object: {e}" self.log.warning(msg) - raise ObeliskError(msg) + raise ObeliskError(msg) from e except ValidationError as e: msg = f"Response cannot be validated: {e}" self.log.warning(msg) - raise ObeliskError(msg) + raise ObeliskError(msg) from e - async def query(self, params: QueryParams) -> List[Datapoint]: + async def query(self, params: QueryParams) -> list[Datapoint]: params.cursor = None - result_set: List[Datapoint] = [] + result_set: list[Datapoint] = [] result_limit = params.limit # Obelisk CORE does not actually stop emitting a cursor when done, limit serves as page limit @@ -307,6 +299,6 @@ async def query(self, params: QueryParams) -> List[Datapoint]: async def query_time_chunked( self, params: ChunkedParams - ) -> AsyncIterator[List[Datapoint]]: + ) -> AsyncIterator[list[Datapoint]]: for chunk in params.chunks(): yield await self.query(chunk) diff --git a/src/obelisk/sync/client.py b/src/obelisk/sync/client.py index 90d7bf0..f955f1c 100644 --- a/src/obelisk/sync/client.py +++ b/src/obelisk/sync/client.py @@ -1,7 +1,8 @@ import asyncio from datetime import datetime, timedelta from math import floor -from typing import Any, Generator, List, Literal, Optional +from typing import Any, Literal +from collections.abc import Generator import httpx @@ -36,7 +37,7 @@ def __init__( self, client: str, secret: str, - retry_strategy: RetryStrategy = NoRetryStrategy(), + retry_strategy: RetryStrategy = NoRetryStrategy(), # noqa: B008 # This is fine to bew shared kind: ObeliskKind = ObeliskKind.CLASSIC, ): self.async_obelisk = AsyncObelisk(client, secret, retry_strategy, kind) @@ -44,16 +45,16 @@ def __init__( def fetch_single_chunk( self, - datasets: List[str], - metrics: Optional[List[str]] = None, - fields: Optional[List[str]] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict[str, Any]] = None, - filter_: Optional[dict[str, Any]] = None, - limit: Optional[int] = None, - limit_by: Optional[dict[str, Any]] = None, - cursor: Optional[str] = None, + datasets: list[str], + metrics: list[str] | None = None, + fields: list[str] | None = None, + from_timestamp: int | None = None, + to_timestamp: int | None = None, + order_by: dict[str, Any] | None = None, + filter_: dict[str, Any] | None = None, + limit: int | None = None, + limit_by: dict[str, Any] | None = None, + cursor: str | None = None, ) -> QueryResult: """ Queries one chunk of events from Obelisk for given parameters, @@ -114,16 +115,16 @@ def fetch_single_chunk( def query( self, - datasets: List[str], - metrics: Optional[List[str]] = None, - fields: Optional[List[str]] = None, - from_timestamp: Optional[int] = None, - to_timestamp: Optional[int] = None, - order_by: Optional[dict[str, Any]] = None, - filter_: Optional[dict[str, Any]] = None, - limit: Optional[int] = None, - limit_by: Optional[dict[str, Any]] = None, - ) -> List[Datapoint]: + datasets: list[str], + metrics: list[str] | None = None, + fields: list[str] | None = None, + from_timestamp: int | None = None, + to_timestamp: int | None = None, + order_by: dict[str, Any] | None = None, + filter_: dict[str, Any] | None = None, + limit: int | None = None, + limit_by: dict[str, Any] | None = None, + ) -> list[Datapoint]: """ Queries data from obelisk, automatically iterating when a cursor is returned. @@ -177,14 +178,14 @@ def query( def query_time_chunked( self, - datasets: List[str], - metrics: List[str], + datasets: list[str], + metrics: list[str], from_time: datetime, to_time: datetime, jump: timedelta, - filter_: Optional[dict[str, Any]] = None, + filter_: dict[str, Any] | None = None, direction: Literal["asc", "desc"] = "asc", - ) -> Generator[List[Datapoint], None, None]: + ) -> Generator[list[Datapoint], None, None]: """ Fetches all data matching the provided filters, yielding one chunk at a time. @@ -225,7 +226,7 @@ def query_time_chunked( def send( self, dataset: str, - data: List[dict[str, Any]], + data: list[dict[str, Any]], precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, mode: IngestMode = IngestMode.DEFAULT, ) -> httpx.Response: diff --git a/src/obelisk/types/__init__.py b/src/obelisk/types/__init__.py index d23628e..72893f4 100644 --- a/src/obelisk/types/__init__.py +++ b/src/obelisk/types/__init__.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import List, Any, Optional +from typing import Any from pydantic import BaseModel @@ -39,15 +39,15 @@ class TimestampPrecision(str, Enum): class Datapoint(BaseModel, extra="allow"): timestamp: int value: Any - dataset: Optional[str] = None - metric: Optional[str] = None - source: Optional[str] = None - userId: Optional[int] = None # Only if HFS and no other name for field + dataset: str | None = None + metric: str | None = None + source: str | None = None + userId: int | None = None # Only if HFS and no other name for field class QueryResult(BaseModel): - items: List[Datapoint] - cursor: Optional[str] = None + items: list[Datapoint] + cursor: str | None = None class ObeliskKind(str, Enum): diff --git a/src/obelisk/types/core.py b/src/obelisk/types/core.py index d00ce55..f5807b7 100644 --- a/src/obelisk/types/core.py +++ b/src/obelisk/types/core.py @@ -18,7 +18,8 @@ from __future__ import annotations from abc import ABC from datetime import datetime -from typing import Any, Iterable, List +from typing import Any +from collections.abc import Iterable FieldName = str @@ -27,7 +28,7 @@ """ -class Constraint(ABC): +class Constraint(ABC): # noqa: B024 # This is just a marker class """ Constraints are simply groups of :class:`Comparison`, such as :class:`And`, or :class:`Or`. @@ -125,7 +126,7 @@ def not_null(cls, left: FieldName) -> Comparison: class And(Constraint): - content: List[Item] + content: list[Item] def __init__(self, *args: Item): self.content = list(args) @@ -135,7 +136,7 @@ def __str__(self) -> str: class Or(Constraint): - content: List[Item] + content: list[Item] def __init__(self, *args: Item): self.content = list(args) diff --git a/src/tests/asynchronous/client_test.py b/src/tests/asynchronous/client_test.py index f6ff4a7..c906c84 100644 --- a/src/tests/asynchronous/client_test.py +++ b/src/tests/asynchronous/client_test.py @@ -5,7 +5,8 @@ client_id = "682c6c46604b3b3be35429df" client_secret = "7136832d-01be-456a-a1fe-25c7f9e130c5" -pytest_plugins = ('pytest_asyncio',) +pytest_plugins = ("pytest_asyncio",) + @pytest.mark.asyncio async def test_fetch_demo_igent(): @@ -15,7 +16,7 @@ async def test_fetch_demo_igent(): metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, - limit=2 + limit=2, ) assert len(result.items) == 2 diff --git a/src/tests/asynchronous/core_test.py b/src/tests/asynchronous/core_test.py index 8a19112..0bd15cf 100644 --- a/src/tests/asynchronous/core_test.py +++ b/src/tests/asynchronous/core_test.py @@ -1,6 +1,11 @@ from obelisk.asynchronous.core import QueryParams + def test_query_param_serialize(): - q = QueryParams(dataset="83989232", filter_="(metric=='smartphone.application::string')", dataType='string') + q = QueryParams( + dataset="83989232", + filter_="(metric=='smartphone.application::string')", + dataType="string", + ) dump = q.to_dict() assert "filter" in dump diff --git a/src/tests/sync/client_test.py b/src/tests/sync/client_test.py index cecc4ed..edd44dc 100644 --- a/src/tests/sync/client_test.py +++ b/src/tests/sync/client_test.py @@ -3,6 +3,7 @@ client_id = "682c6c46604b3b3be35429df" client_secret = "7136832d-01be-456a-a1fe-25c7f9e130c5" + def test_demo_igent_fetch(): consumer = Obelisk(client=client_id, secret=client_secret) result = consumer.fetch_single_chunk( @@ -10,11 +11,12 @@ def test_demo_igent_fetch(): metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, - limit=2 + limit=2, ) assert len(result.items) == 2 + def test_two_instances(): consumer_one = Obelisk(client=client_id, secret=client_secret) consumer_two = Obelisk(client=client_id, secret=client_secret) @@ -23,14 +25,14 @@ def test_two_instances(): metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, - limit=2 + limit=2, ) result_two = consumer_two.fetch_single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, - limit=2 + limit=2, ) assert len(result_one.items) == 2 assert len(result_two.items) == 2 diff --git a/src/tests/typetest/filter_test.py b/src/tests/typetest/filter_test.py index 46450de..9d547e3 100644 --- a/src/tests/typetest/filter_test.py +++ b/src/tests/typetest/filter_test.py @@ -4,16 +4,16 @@ def test_basic_filter(): test_dt = datetime.now() - f = Filter() \ + f = ( + Filter() .add_and( - Comparison.equal('source', 'test source'), - )\ - .add_or( - Comparison.less('timestamp', test_dt) - )\ + Comparison.equal("source", "test source"), + ) + .add_or(Comparison.less("timestamp", test_dt)) .add_or( - Comparison.is_in('metricType', ['number', 'number[]']), + Comparison.is_in("metricType", ["number", "number[]"]), ) + ) expected = f"((('source'=='test source'),'timestamp'<'{test_dt.isoformat()}'),'metricType'=in=('number', 'number[]'))" assert str(f) == expected