Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion logfire/_internal/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..client import LogfireClient
from ..config import REGIONS, LogfireCredentials, get_base_url_from_token
from ..config_params import ParamManager
from ..server_response import install_logfire_response_hook
from ..tracer import SDKTracerProvider
from .auth import parse_auth, parse_logout
from .prompt import parse_prompt
Expand Down Expand Up @@ -434,8 +435,9 @@ def log_trace_id(response: requests.Response, context: ContextCarrier, *args: An
else:
with tracer.start_as_current_span('logfire._internal.cli'), requests.Session() as session:
context = get_context()
session.hooks = {'response': functools.partial(log_trace_id, context=context)}
session.hooks = {'response': [functools.partial(log_trace_id, context=context)]}
session.headers.update(context)
install_logfire_response_hook(session)
namespace._session = session
namespace.func(namespace)

Expand Down
23 changes: 20 additions & 3 deletions logfire/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from logfire.version import VERSION

from .auth import UserToken, UserTokenCollection
from .server_response import ServerResponseCallback, install_logfire_response_hook
from .utils import UnexpectedResponse

UA_HEADER = f'logfire/{VERSION}'
Expand All @@ -29,27 +30,43 @@ class LogfireClient:

Args:
user_token: The user token to use when authenticating against the API.
server_response_hook: Optional override for the API response hook (see
`AdvancedOptions.server_response_hook`).
"""

def __init__(self, user_token: UserToken) -> None:
def __init__(
self,
user_token: UserToken,
server_response_hook: ServerResponseCallback | None = None,
) -> None:
if user_token.is_expired:
raise RuntimeError('The provided user token is expired')
self.base_url = user_token.base_url
self._token = user_token.token
self._session = Session()
self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, server_response_hook)

@classmethod
def from_url(cls, base_url: str | None) -> Self:
def from_url(
cls,
base_url: str | None,
server_response_hook: ServerResponseCallback | None = None,
) -> Self:
"""Create a client from the provided base URL.

Args:
base_url: The base URL to use when looking for a user token. If `None`, will prompt
the user into selecting a token from the token collection (or, if only one available,
use it directly). The token collection will be created from the `~/.logfire/default.toml`
file (or an empty one if no such file exists).
server_response_hook: Optional override for the API response hook (see
`AdvancedOptions.server_response_hook`).
"""
return cls(user_token=UserTokenCollection().get_token(base_url))
return cls(
user_token=UserTokenCollection().get_token(base_url),
server_response_hook=server_response_hook,
)

def _get_raw(self, endpoint: str, params: dict[str, Any] | None = None) -> Response:
response = self._session.get(urljoin(self.base_url, endpoint), params=params)
Expand Down
36 changes: 34 additions & 2 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
from .logs import ProxyLoggerProvider
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .server_response import ServerResponseCallback, install_logfire_response_hook
from .stack_info import warn_at_user_stacklevel
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
Expand Down Expand Up @@ -201,6 +202,32 @@ class AdvancedOptions:
serialized configuration sent to child processes. See the [distributed tracing guide](https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#thread-and-pool-executors) for more details.
"""

server_response_hook: ServerResponseCallback | None = None
"""Optional callback invoked for every HTTP response received from the Logfire API.

This is experimental and may be modified or removed.

This applies to OTLP exports, credential / project initialisation, and the remote
variables provider. The default surfaces the `X-Logfire-Warning` header as a
`LogfireServerWarning`.

Setting this replaces the default; pass `lambda response: None` to opt out entirely.

Example usage:

```python skip-run="true" skip-reason="needs metric/logfire setup"
from logfire.types import ServerResponseCallbackHelper

def hook(helper: ServerResponseCallbackHelper):
my_metric.inc(helper.response.status_code)
helper.default_hook() # call this to keep the default warning behavior

logfire.configure(advanced=AdvancedOptions(server_response_hook=hook))
```

Raise from the hook to abort the calling code path.
"""

def generate_base_url(self, token: str) -> str:
if self.base_url is not None:
return self.base_url
Expand Down Expand Up @@ -1078,7 +1105,7 @@ def add_span_processor(span_processor: SpanProcessor) -> None:
# If we don't have tokens or credentials from a file,
# try initializing a new project and writing a new creds file.
# note, we only do this if `send_to_logfire` is explicitly `True`, not 'if-token-present'
client = LogfireClient.from_url(self.advanced.base_url)
client = LogfireClient.from_url(self.advanced.base_url, self.advanced.server_response_hook)
credentials = LogfireCredentials.initialize_project(client=client)
credentials.write_creds_file(self.data_dir)

Expand Down Expand Up @@ -1129,6 +1156,7 @@ def check_tokens():
base_url = self.advanced.generate_base_url(token)
headers = {'User-Agent': f'logfire/{VERSION}', 'Authorization': token}
session = OTLPExporterHttpSession()
install_logfire_response_hook(session, self.advanced.server_response_hook)
span_exporter = BodySizeCheckingOTLPSpanExporter(
endpoint=urljoin(base_url, '/v1/traces'),
session=session,
Expand Down Expand Up @@ -1305,6 +1333,7 @@ def fix_pid(): # pragma: no cover
base_url=base_url,
token=self.api_key,
options=self.variables,
server_response_hook=self.advanced.server_response_hook,
)
multi_log_processor = SynchronousMultiLogRecordProcessor()
for processor in log_record_processors:
Expand Down Expand Up @@ -1437,6 +1466,7 @@ def _lazy_init_variable_provider(self) -> VariableProvider:
base_url=base_url,
token=api_key,
options=options,
server_response_hook=self.advanced.server_response_hook,
)
self._variable_provider = provider
provider.start(Logfire(config=self))
Expand All @@ -1453,7 +1483,9 @@ def warn_if_not_initialized(self, message: str):
)

def _initialize_credentials_from_token(self, token: str) -> LogfireCredentials | None:
return LogfireCredentials.from_token(token, requests.Session(), self.advanced.generate_base_url(token))
session = requests.Session()
install_logfire_response_hook(session, self.advanced.server_response_hook)
return LogfireCredentials.from_token(token, session, self.advanced.generate_base_url(token))

def _ensure_flush_after_aws_lambda(self):
"""Ensure that `force_flush` is called after an AWS Lambda invocation.
Expand Down
44 changes: 44 additions & 0 deletions logfire/_internal/server_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Surface out-of-band signals the Logfire backend wants every SDK request to know about.

The server attaches the `X-Logfire-Warning` header to API responses to signal an
out-of-band warning the server wants the user to see. It is surfaced via
`warnings.warn(..., LogfireServerWarning)`. Python's standard "default" filter
dedupes identical messages, so a chatty server only warns once.

`install_logfire_response_hook(session)` wires this into a `requests.Session` as
a response hook so every Logfire-bound HTTP response is inspected. Callers can
pass a custom `hook` to replace the default behavior (see
`AdvancedOptions.server_response_hook`).
"""

from __future__ import annotations

from typing import Any

import requests

from logfire.types import ServerResponseCallback, ServerResponseCallbackHelper


def install_logfire_response_hook(
session: requests.Session,
hook: ServerResponseCallback | None = None,
) -> None:
"""Install a `requests` response hook on `session` for every Logfire API response.

By default, calls `ServerResponseCallbackHelper.default_hook()`, which emits a warning
if the `X-Logfire-Warning` response header is present.

Pass a custom callable to replace the default behavior (e.g. opt out by passing `lambda _: None`).
"""

def _hook(response: requests.Response, *args: Any, **kwargs: Any) -> requests.Response:
helper = ServerResponseCallbackHelper(response, args, kwargs)
if hook is not None:
hook(helper)
else:
helper.default_hook()
return response

response_hooks: list[Any] = session.hooks.setdefault('response', [])
response_hooks.append(_hook)
4 changes: 4 additions & 0 deletions logfire/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class LogfireConfigError(ValueError):
"""Error raised when there is a problem with the Logfire configuration."""


class LogfireServerWarning(UserWarning):
"""Warning emitted when the Logfire server returns an `X-Logfire-Warning` header on a response."""
43 changes: 42 additions & 1 deletion logfire/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Any, Callable

import requests

from logfire._internal.constants import (
ATTRIBUTES_LOG_LEVEL_NUM_KEY,
Expand All @@ -10,8 +12,10 @@
LevelName,
log_level_attributes,
)
from logfire._internal.stack_info import warn_at_user_stacklevel
from logfire._internal.tracer import get_parent_span
from logfire._internal.utils import canonicalize_exception_traceback
from logfire.exceptions import LogfireServerWarning

if TYPE_CHECKING:
from opentelemetry.sdk.trace import ReadableSpan, Span
Expand Down Expand Up @@ -259,3 +263,40 @@ def my_callback(helper: logfire.types.ExceptionCallbackHelper):

helper.no_record_exception()
"""


@dataclass
class ServerResponseCallbackHelper:
"""Helper object passed to the server response callback.

This is experimental and may change significantly in future releases.
"""

response: requests.Response
"""The raw HTTP response from the Logfire API."""

args: tuple[Any, ...]
"""Positional arguments passed to the response hook by `requests`."""

kwargs: dict[str, Any]
"""Keyword arguments passed to the response hook by `requests`."""

WARNING_HEADER_NAME = 'X-Logfire-Warning'

@property
def warning_header(self) -> str | None:
"""Value of the Logfire warning header, or `None` if not present."""
return self.response.headers.get(self.WARNING_HEADER_NAME)

def default_hook(self) -> None:
"""The default hook behavior: emit a `LogfireServerWarning` if the warning header is present."""
warning_message = self.warning_header
if warning_message:
warn_at_user_stacklevel(warning_message, LogfireServerWarning)
Comment on lines +291 to +295
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: default_hook() no longer processes X-Logfire-Error, so server-signaled hard failures are ignored.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At logfire/types.py, line 291:

<comment>`default_hook()` no longer processes `X-Logfire-Error`, so server-signaled hard failures are ignored.</comment>

<file context>
@@ -282,32 +282,17 @@ class ServerResponseCallbackHelper:
-            error_message = self.error_header
-            if error_message:
-                raise LogfireServerError(error_message)
+    def default_hook(self) -> None:
+        """The default hook behavior: emit a `LogfireServerWarning` if the warning header is present."""
+        warning_message = self.warning_header
</file context>
Suggested change
def default_hook(self) -> None:
"""The default hook behavior: emit a `LogfireServerWarning` if the warning header is present."""
warning_message = self.warning_header
if warning_message:
warn_at_user_stacklevel(warning_message, LogfireServerWarning)
def default_hook(self) -> None:
"""The default hook behavior: emit a warning and raise on server error headers if present."""
warning_message = self.warning_header
if warning_message:
warn_at_user_stacklevel(warning_message, LogfireServerWarning)
error_message = self.response.headers.get('X-Logfire-Error')
if error_message:
from logfire.exceptions import LogfireServerError
raise LogfireServerError(error_message)

Tip: Review your code locally with the cubic CLI to iterate faster.



ServerResponseCallback = Callable[[ServerResponseCallbackHelper], None]
"""Callable invoked for every Logfire API response received by the SDK.

This is experimental and may change significantly in future releases.
"""
14 changes: 13 additions & 1 deletion logfire/variables/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from logfire._internal.client import UA_HEADER
from logfire._internal.config import VariablesOptions
from logfire._internal.server_response import ServerResponseCallback, install_logfire_response_hook
from logfire._internal.utils import UnexpectedResponse
from logfire.variables.abstract import (
ResolvedVariable,
Expand Down Expand Up @@ -54,21 +55,31 @@ class LogfireRemoteVariableProvider(VariableProvider):
The threading implementation draws heavily from opentelemetry.sdk._shared_internal.BatchProcessor.
"""

def __init__(self, base_url: str, token: str, options: VariablesOptions):
def __init__(
self,
base_url: str,
token: str,
options: VariablesOptions,
server_response_hook: ServerResponseCallback | None = None,
):
"""Create a new remote variable provider.

Args:
base_url: The base URL of the Logfire API.
token: Authentication token for the Logfire API.
options: Options for retrieving remote variables.
server_response_hook: Optional override for the API response hook
(see `AdvancedOptions.server_response_hook`).
"""
block_before_first_resolve = options.block_before_first_resolve
polling_interval = options.polling_interval

self._base_url = base_url
self._token = token
self._server_response_hook = server_response_hook
self._session = Session()
self._session.headers.update({'Authorization': f'bearer {token}', 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, server_response_hook)
self._timeout = options.timeout
self._block_before_first_fetch = block_before_first_resolve
self._polling_interval: timedelta = (
Expand Down Expand Up @@ -197,6 +208,7 @@ def _sse_listener(self): # pragma: no cover
'Cache-Control': 'no-cache',
}
)
install_logfire_response_hook(sse_session, self._server_response_hook)

# Open streaming connection
response = sse_session.get(sse_url, stream=True, timeout=(10, None))
Expand Down
Loading
Loading