Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion logfire/_internal/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..client import LogfireClient
from ..config import REGIONS, LogfireCredentials, get_base_url_from_token
from ..config_params import ParamManager
from ..server_response import install_logfire_response_hook
from ..tracer import SDKTracerProvider
from .auth import parse_auth, parse_logout
from .prompt import parse_prompt
Expand Down Expand Up @@ -434,8 +435,9 @@ def log_trace_id(response: requests.Response, context: ContextCarrier, *args: An
else:
with tracer.start_as_current_span('logfire._internal.cli'), requests.Session() as session:
context = get_context()
session.hooks = {'response': functools.partial(log_trace_id, context=context)}
session.hooks = {'response': [functools.partial(log_trace_id, context=context)]}
session.headers.update(context)
install_logfire_response_hook(session)
namespace._session = session
namespace.func(namespace)

Expand Down
23 changes: 20 additions & 3 deletions logfire/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from logfire.version import VERSION

from .auth import UserToken, UserTokenCollection
from .server_response import ServerResponseCallback, install_logfire_response_hook
from .utils import UnexpectedResponse

UA_HEADER = f'logfire/{VERSION}'
Expand All @@ -29,27 +30,43 @@ class LogfireClient:

Args:
user_token: The user token to use when authenticating against the API.
server_response_hook: Optional override for the API response hook (see
`AdvancedOptions.server_response_hook`).
"""

def __init__(self, user_token: UserToken) -> None:
def __init__(
self,
user_token: UserToken,
server_response_hook: ServerResponseCallback | None = None,
) -> None:
if user_token.is_expired:
raise RuntimeError('The provided user token is expired')
self.base_url = user_token.base_url
self._token = user_token.token
self._session = Session()
self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, server_response_hook)

@classmethod
def from_url(cls, base_url: str | None) -> Self:
def from_url(
cls,
base_url: str | None,
server_response_hook: ServerResponseCallback | None = None,
) -> Self:
"""Create a client from the provided base URL.

Args:
base_url: The base URL to use when looking for a user token. If `None`, will prompt
the user into selecting a token from the token collection (or, if only one available,
use it directly). The token collection will be created from the `~/.logfire/default.toml`
file (or an empty one if no such file exists).
server_response_hook: Optional override for the API response hook (see
`AdvancedOptions.server_response_hook`).
"""
return cls(user_token=UserTokenCollection().get_token(base_url))
return cls(
user_token=UserTokenCollection().get_token(base_url),
server_response_hook=server_response_hook,
)

def _get_raw(self, endpoint: str, params: dict[str, Any] | None = None) -> Response:
response = self._session.get(urljoin(self.base_url, endpoint), params=params)
Expand Down
36 changes: 34 additions & 2 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
from .logs import ProxyLoggerProvider
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .server_response import ServerResponseCallback, install_logfire_response_hook
from .stack_info import warn_at_user_stacklevel
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
Expand Down Expand Up @@ -201,6 +202,32 @@ class AdvancedOptions:
serialized configuration sent to child processes. See the [distributed tracing guide](https://logfire.pydantic.dev/docs/how-to-guides/distributed-tracing/#thread-and-pool-executors) for more details.
"""

server_response_hook: ServerResponseCallback | None = None
"""Optional callback invoked for every HTTP response received from the Logfire API.

This is experimental and may be modified or removed.

This applies to OTLP exports, credential / project initialisation, and the remote
variables provider. The default surfaces `X-Logfire-Warning` and `X-Logfire-Error`
headers as `LogfireServerWarning` / `LogfireServerError`.

Setting this replaces the default; pass `lambda response: None` to opt out entirely.

Example usage:

```python skip-run="true" skip-reason="needs metric/logfire setup"
from logfire.types import ServerResponseCallbackHelper

def hook(helper: ServerResponseCallbackHelper):
my_metric.inc(response.response.status_code)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
helper.default_hook() # call this to keep the default behavior of raising/logging based on headers

logfire.configure(advanced=AdvancedOptions(server_response_hook=hook))
```

Raise from the hook to abort the calling code path.
"""

def generate_base_url(self, token: str) -> str:
if self.base_url is not None:
return self.base_url
Expand Down Expand Up @@ -1078,7 +1105,7 @@ def add_span_processor(span_processor: SpanProcessor) -> None:
# If we don't have tokens or credentials from a file,
# try initializing a new project and writing a new creds file.
# note, we only do this if `send_to_logfire` is explicitly `True`, not 'if-token-present'
client = LogfireClient.from_url(self.advanced.base_url)
client = LogfireClient.from_url(self.advanced.base_url, self.advanced.server_response_hook)
credentials = LogfireCredentials.initialize_project(client=client)
credentials.write_creds_file(self.data_dir)

Expand Down Expand Up @@ -1129,6 +1156,7 @@ def check_tokens():
base_url = self.advanced.generate_base_url(token)
headers = {'User-Agent': f'logfire/{VERSION}', 'Authorization': token}
session = OTLPExporterHttpSession()
install_logfire_response_hook(session, self.advanced.server_response_hook)
span_exporter = BodySizeCheckingOTLPSpanExporter(
endpoint=urljoin(base_url, '/v1/traces'),
session=session,
Expand Down Expand Up @@ -1305,6 +1333,7 @@ def fix_pid(): # pragma: no cover
base_url=base_url,
token=self.api_key,
options=self.variables,
server_response_hook=self.advanced.server_response_hook,
)
multi_log_processor = SynchronousMultiLogRecordProcessor()
for processor in log_record_processors:
Expand Down Expand Up @@ -1437,6 +1466,7 @@ def _lazy_init_variable_provider(self) -> VariableProvider:
base_url=base_url,
token=api_key,
options=options,
server_response_hook=self.advanced.server_response_hook,
)
self._variable_provider = provider
provider.start(Logfire(config=self))
Expand All @@ -1453,7 +1483,9 @@ def warn_if_not_initialized(self, message: str):
)

def _initialize_credentials_from_token(self, token: str) -> LogfireCredentials | None:
return LogfireCredentials.from_token(token, requests.Session(), self.advanced.generate_base_url(token))
session = requests.Session()
install_logfire_response_hook(session, self.advanced.server_response_hook)
return LogfireCredentials.from_token(token, session, self.advanced.generate_base_url(token))

def _ensure_flush_after_aws_lambda(self):
"""Ensure that `force_flush` is called after an AWS Lambda invocation.
Expand Down
49 changes: 49 additions & 0 deletions logfire/_internal/server_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Surface out-of-band signals the Logfire backend wants every SDK request to know about.

The server attaches custom headers to API responses:

* `X-Logfire-Warning`: an out-of-band warning the server wants the user to see.
Surfaced via `warnings.warn(..., LogfireServerWarning)`. Python's standard
"default" filter dedupes identical messages, so a chatty server only warns once.
* `X-Logfire-Error`: an out-of-band error the server wants the SDK to raise.
Always raised as `LogfireServerError`. Callers that want to keep working past
it (the OTLP pipeline, the variables provider) already swallow exceptions from
their HTTP calls; CRUD/CLI propagate the error to the user.

`install_logfire_response_hook(session)` wires this into a `requests.Session` as
a response hook so every Logfire-bound HTTP response is inspected. Callers can
pass a custom `hook` to replace the default behavior (see
`AdvancedOptions.server_response_hook`).
"""

from __future__ import annotations

from typing import Any

import requests

from logfire.types import ServerResponseCallback, ServerResponseCallbackHelper


def install_logfire_response_hook(
session: requests.Session,
hook: ServerResponseCallback | None = None,
) -> None:
"""Install a `requests` response hook on `session` for every Logfire API response.

By default, calls ServerResponseCallbackHelper.default_hook(), which emits warnings and raises errors based
on the presence of `X-Logfire-Warning` and `X-Logfire-Error` response headers.

Pass a custom callable to replace the default behavior (e.g. opt out by passing `lambda _: None`).
"""

def _hook(response: requests.Response, *args: Any, **kwargs: Any) -> requests.Response:
helper = ServerResponseCallbackHelper(response, args, kwargs)
if hook:
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
hook(helper)
else:
helper.default_hook()
return response

response_hooks: list[Any] = session.hooks.setdefault('response', [])
response_hooks.append(_hook)
8 changes: 8 additions & 0 deletions logfire/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@

class LogfireConfigError(ValueError):
"""Error raised when there is a problem with the Logfire configuration."""


class LogfireServerError(Exception):
"""Error raised when the Logfire server returns an `X-Logfire-Error` header on a response."""


class LogfireServerWarning(UserWarning):
"""Warning emitted when the Logfire server returns an `X-Logfire-Warning` header on a response."""
58 changes: 57 additions & 1 deletion logfire/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Any, Callable

import requests

from logfire._internal.constants import (
ATTRIBUTES_LOG_LEVEL_NUM_KEY,
Expand All @@ -10,8 +12,10 @@
LevelName,
log_level_attributes,
)
from logfire._internal.stack_info import warn_at_user_stacklevel
from logfire._internal.tracer import get_parent_span
from logfire._internal.utils import canonicalize_exception_traceback
from logfire.exceptions import LogfireServerError, LogfireServerWarning

if TYPE_CHECKING:
from opentelemetry.sdk.trace import ReadableSpan, Span
Expand Down Expand Up @@ -259,3 +263,55 @@ def my_callback(helper: logfire.types.ExceptionCallbackHelper):

helper.no_record_exception()
"""


@dataclass
class ServerResponseCallbackHelper:
"""Helper object passed to the server response callback.

This is experimental and may change significantly in future releases.
"""

response: requests.Response
"""The raw HTTP response from the Logfire API."""

args: tuple[Any, ...]
"""Positional arguments passed to the response hook by `requests`."""

kwargs: dict[str, Any]
"""Keyword arguments passed to the response hook by `requests`."""

WARNING_HEADER_NAME = 'X-Logfire-Warning'
ERROR_HEADER_NAME = 'X-Logfire-Error'

@property
def warning_header(self) -> str | None:
"""Value of the Logfire warning header, or `None` if not present."""
return self.response.headers.get(self.WARNING_HEADER_NAME)

@property
def error_header(self) -> str | None:
"""Value of the Logfire error header, or `None` if not present."""
return self.response.headers.get(self.ERROR_HEADER_NAME)

def default_hook(self, *, check_warning: bool = True, check_error: bool = True) -> None:
"""The default hook behavior.

If check_warning is true, check for a warning header and raise it as a LogfireServerWarning if present.
If check_error is true, check for an error header and raise it as a LogfireServerError if present.
"""
if check_warning:
warning_message = self.warning_header
if warning_message:
warn_at_user_stacklevel(warning_message, LogfireServerWarning)
if check_error:
error_message = self.error_header
if error_message:
raise LogfireServerError(error_message)


ServerResponseCallback = Callable[[ServerResponseCallbackHelper], None]
"""Callable invoked for every Logfire API response received by the SDK.

This is experimental and may change significantly in future releases.
"""
14 changes: 13 additions & 1 deletion logfire/variables/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from logfire._internal.client import UA_HEADER
from logfire._internal.config import VariablesOptions
from logfire._internal.server_response import ServerResponseCallback, install_logfire_response_hook
from logfire._internal.utils import UnexpectedResponse
from logfire.variables.abstract import (
ResolvedVariable,
Expand Down Expand Up @@ -54,21 +55,31 @@ class LogfireRemoteVariableProvider(VariableProvider):
The threading implementation draws heavily from opentelemetry.sdk._shared_internal.BatchProcessor.
"""

def __init__(self, base_url: str, token: str, options: VariablesOptions):
def __init__(
self,
base_url: str,
token: str,
options: VariablesOptions,
server_response_hook: ServerResponseCallback | None = None,
):
"""Create a new remote variable provider.

Args:
base_url: The base URL of the Logfire API.
token: Authentication token for the Logfire API.
options: Options for retrieving remote variables.
server_response_hook: Optional override for the API response hook
(see `AdvancedOptions.server_response_hook`).
"""
block_before_first_resolve = options.block_before_first_resolve
polling_interval = options.polling_interval

self._base_url = base_url
self._token = token
self._server_response_hook = server_response_hook
self._session = Session()
self._session.headers.update({'Authorization': f'bearer {token}', 'User-Agent': UA_HEADER})
install_logfire_response_hook(self._session, server_response_hook)
self._timeout = options.timeout
self._block_before_first_fetch = block_before_first_resolve
self._polling_interval: timedelta = (
Expand Down Expand Up @@ -197,6 +208,7 @@ def _sse_listener(self): # pragma: no cover
'Cache-Control': 'no-cache',
}
)
install_logfire_response_hook(sse_session, self._server_response_hook)

# Open streaming connection
response = sse_session.get(sse_url, stream=True, timeout=(10, None))
Expand Down
Loading
Loading