Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion logfire/_internal/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..client import LogfireClient
from ..config import REGIONS, LogfireCredentials, get_base_url_from_token
from ..config_params import ParamManager
from ..server_response import install_logfire_response_hook
from ..tracer import SDKTracerProvider
from .auth import parse_auth, parse_logout
from .prompt import parse_prompt
Expand Down Expand Up @@ -434,8 +435,9 @@ def log_trace_id(response: requests.Response, context: ContextCarrier, *args: An
else:
with tracer.start_as_current_span('logfire._internal.cli'), requests.Session() as session:
context = get_context()
session.hooks = {'response': functools.partial(log_trace_id, context=context)}
session.hooks = {'response': [functools.partial(log_trace_id, context=context)]}
session.headers.update(context)
install_logfire_response_hook(session)
namespace._session = session
namespace.func(namespace)

Expand Down
23 changes: 20 additions & 3 deletions logfire/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from logfire.version import VERSION

from .auth import UserToken, UserTokenCollection
from .server_response import TransportResponseHook, install_logfire_response_hook
from .utils import UnexpectedResponse

UA_HEADER = f'logfire/{VERSION}'
Expand All @@ -29,27 +30,43 @@ class LogfireClient:

Args:
user_token: The user token to use when authenticating against the API.
transport_response_hook: Optional override for the API response hook (see
`AdvancedOptions.transport_response_hook`).
"""

def __init__(self, user_token: UserToken) -> None:
def __init__(
self,
user_token: UserToken,
transport_response_hook: TransportResponseHook | None = None,
) -> None:
if user_token.is_expired:
raise RuntimeError('The provided user token is expired')
self.base_url = user_token.base_url
self._token = user_token.token
self._session = Session()
self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, transport_response_hook)

@classmethod
def from_url(cls, base_url: str | None) -> Self:
def from_url(
cls,
base_url: str | None,
transport_response_hook: TransportResponseHook | None = None,
) -> Self:
"""Create a client from the provided base URL.

Args:
base_url: The base URL to use when looking for a user token. If `None`, will prompt
the user into selecting a token from the token collection (or, if only one available,
use it directly). The token collection will be created from the `~/.logfire/default.toml`
file (or an empty one if no such file exists).
transport_response_hook: Optional override for the API response hook (see
`AdvancedOptions.transport_response_hook`).
"""
return cls(user_token=UserTokenCollection().get_token(base_url))
return cls(
user_token=UserTokenCollection().get_token(base_url),
transport_response_hook=transport_response_hook,
)

def _get_raw(self, endpoint: str, params: dict[str, Any] | None = None) -> Response:
response = self._session.get(urljoin(self.base_url, endpoint), params=params)
Expand Down
32 changes: 30 additions & 2 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
from .logs import ProxyLoggerProvider
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .server_response import TransportResponseHook, install_logfire_response_hook
from .stack_info import warn_at_user_stacklevel
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
Expand Down Expand Up @@ -201,6 +202,29 @@ class AdvancedOptions:
serialized configuration sent to child processes. See the [distributed tracing guide](https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#thread-and-pool-executors) for more details.
"""

transport_response_hook: TransportResponseHook | None = None
"""Optional callback invoked for every HTTP response received from the Logfire API.

This applies to OTLP exports, credential / project initialisation, and the remote
variables provider. The default surfaces `X-Logfire-Warning` and `X-Logfire-Error`
headers as `LogfireServerWarning` / `LogfireServerError`.

Setting this replaces the default; pass `lambda response: None` to opt out entirely,
or compose your own logic on top of `process_logfire_response_headers`:

```python
from logfire._internal.server_response import process_logfire_response_headers

def hook(response):
my_metric.inc(response.status_code)
process_logfire_response_headers(response)

logfire.configure(advanced=AdvancedOptions(transport_response_hook=hook))
```

Raise from the hook to abort the calling code path.
"""

def generate_base_url(self, token: str) -> str:
if self.base_url is not None:
return self.base_url
Expand Down Expand Up @@ -1078,7 +1102,7 @@ def add_span_processor(span_processor: SpanProcessor) -> None:
# If we don't have tokens or credentials from a file,
# try initializing a new project and writing a new creds file.
# note, we only do this if `send_to_logfire` is explicitly `True`, not 'if-token-present'
client = LogfireClient.from_url(self.advanced.base_url)
client = LogfireClient.from_url(self.advanced.base_url, self.advanced.transport_response_hook)
credentials = LogfireCredentials.initialize_project(client=client)
credentials.write_creds_file(self.data_dir)

Expand Down Expand Up @@ -1129,6 +1153,7 @@ def check_tokens():
base_url = self.advanced.generate_base_url(token)
headers = {'User-Agent': f'logfire/{VERSION}', 'Authorization': token}
session = OTLPExporterHttpSession()
install_logfire_response_hook(session, self.advanced.transport_response_hook)
span_exporter = BodySizeCheckingOTLPSpanExporter(
endpoint=urljoin(base_url, '/v1/traces'),
session=session,
Expand Down Expand Up @@ -1305,6 +1330,7 @@ def fix_pid(): # pragma: no cover
base_url=base_url,
token=self.api_key,
options=self.variables,
transport_response_hook=self.advanced.transport_response_hook,
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
)
multi_log_processor = SynchronousMultiLogRecordProcessor()
for processor in log_record_processors:
Expand Down Expand Up @@ -1453,7 +1479,9 @@ def warn_if_not_initialized(self, message: str):
)

def _initialize_credentials_from_token(self, token: str) -> LogfireCredentials | None:
return LogfireCredentials.from_token(token, requests.Session(), self.advanced.generate_base_url(token))
session = requests.Session()
install_logfire_response_hook(session, self.advanced.transport_response_hook)
return LogfireCredentials.from_token(token, session, self.advanced.generate_base_url(token))

def _ensure_flush_after_aws_lambda(self):
"""Ensure that `force_flush` is called after an AWS Lambda invocation.
Expand Down
64 changes: 64 additions & 0 deletions logfire/_internal/server_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Surface out-of-band signals the Logfire backend wants every SDK request to know about.

The server attaches custom headers to API responses:

* `X-Logfire-Warning`: an out-of-band warning the server wants the user to see.
Surfaced via `warnings.warn(..., LogfireServerWarning)`. Python's standard
"default" filter dedupes identical messages, so a chatty server only warns once.
* `X-Logfire-Error`: an out-of-band error the server wants the SDK to raise.
Always raised as `LogfireServerError`. Callers that want to keep working past
it (the OTLP pipeline, the variables provider) already swallow exceptions from
their HTTP calls; CRUD/CLI propagate the error to the user.

`install_logfire_response_hook(session)` wires this into a `requests.Session` as
a response hook so every Logfire-bound HTTP response is inspected. Callers can
pass a custom `hook` to replace the default behaviour (see
`AdvancedOptions.transport_response_hook`).
"""

from __future__ import annotations

import warnings
from typing import Any, Callable

import requests

from logfire.exceptions import LogfireServerError, LogfireServerWarning

WARNING_HEADER_NAME = 'X-Logfire-Warning'
ERROR_HEADER_NAME = 'X-Logfire-Error'

TransportResponseHook = Callable[[requests.Response], object]
"""Callable invoked for every Logfire API response received by the SDK.

The return value is ignored; raise to abort the call.
"""


def process_logfire_response_headers(response: requests.Response) -> None:
"""Default transport response hook: surface `X-Logfire-Warning` / `X-Logfire-Error` headers."""
warning_message = response.headers.get(WARNING_HEADER_NAME)
if warning_message:
warnings.warn(warning_message, LogfireServerWarning, stacklevel=2)
error_message = response.headers.get(ERROR_HEADER_NAME)
if error_message:
raise LogfireServerError(error_message)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that users might have a reason to opt out of this and won't be able to. Maybe that's paranoid.

We could theoretically make this hook configurable, defaulting to this one, but allowing users to take any action for all API responses. Probably overkill.

I'd like to know what concrete use cases we have in mind for both warnings and errors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a way to opt out in 278a76c

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to know what concrete use cases we have in mind for both warnings and errors.

From the PR description:

to deprecate endpoints or signal hard failures to any client running this SDK

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd deprecate endpoints by changing code here though, right? or is the point to push users to upgrade their sdk?

hard failures like what? where would the server respond with the error header instead of a 4xx/5xx which would have a similar effect here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the point is that we can propagate deprecation to old versions of the SDK.

hard failures like what?

The error part was a bit more just "seems like it could be useful". But I imagine it might be more informative to raise a LogfireError: Some explanation Than hope the 4xx bubbles up correctly in a way that is usable by users.

I also wonder if we could emit an error / warn logfire log so it shows up in users projects 🤔



def install_logfire_response_hook(
session: requests.Session,
hook: TransportResponseHook | None = None,
) -> None:
"""Install a `requests` response hook on `session` for every Logfire API response.

`hook` defaults to `process_logfire_response_headers`. Pass a custom callable
to replace the default behaviour (e.g. opt out by passing `lambda response: None`).
"""
user_hook = hook if hook is not None else process_logfire_response_headers

def _hook(response: requests.Response, *_args: Any, **_kwargs: Any) -> requests.Response:
user_hook(response)
return response

response_hooks: list[Any] = session.hooks.setdefault('response', [])
response_hooks.append(_hook)
8 changes: 8 additions & 0 deletions logfire/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@

class LogfireConfigError(ValueError):
"""Error raised when there is a problem with the Logfire configuration."""


class LogfireServerError(Exception):
"""Error raised when the Logfire server returns an `X-Logfire-Error` header on a response."""


class LogfireServerWarning(UserWarning):
"""Warning emitted when the Logfire server returns an `X-Logfire-Warning` header on a response."""
14 changes: 13 additions & 1 deletion logfire/variables/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from logfire._internal.client import UA_HEADER
from logfire._internal.config import VariablesOptions
from logfire._internal.server_response import TransportResponseHook, install_logfire_response_hook
from logfire._internal.utils import UnexpectedResponse
from logfire.variables.abstract import (
ResolvedVariable,
Expand Down Expand Up @@ -54,21 +55,31 @@ class LogfireRemoteVariableProvider(VariableProvider):
The threading implementation draws heavily from opentelemetry.sdk._shared_internal.BatchProcessor.
"""

def __init__(self, base_url: str, token: str, options: VariablesOptions):
def __init__(
self,
base_url: str,
token: str,
options: VariablesOptions,
transport_response_hook: TransportResponseHook | None = None,
):
"""Create a new remote variable provider.

Args:
base_url: The base URL of the Logfire API.
token: Authentication token for the Logfire API.
options: Options for retrieving remote variables.
transport_response_hook: Optional override for the API response hook
(see `AdvancedOptions.transport_response_hook`).
"""
block_before_first_resolve = options.block_before_first_resolve
polling_interval = options.polling_interval

self._base_url = base_url
self._token = token
self._transport_response_hook = transport_response_hook
self._session = Session()
self._session.headers.update({'Authorization': f'bearer {token}', 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, transport_response_hook)
self._timeout = options.timeout
self._block_before_first_fetch = block_before_first_resolve
self._polling_interval: timedelta = (
Expand Down Expand Up @@ -197,6 +208,7 @@ def _sse_listener(self): # pragma: no cover
'Cache-Control': 'no-cache',
}
)
install_logfire_response_hook(sse_session, self._transport_response_hook)

# Open streaming connection
response = sse_session.get(sse_url, stream=True, timeout=(10, None))
Expand Down
Loading
Loading