Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion logfire/_internal/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..client import LogfireClient
from ..config import REGIONS, LogfireCredentials, get_base_url_from_token
from ..config_params import ParamManager
from ..telemetry_header import TELEMETRY_HEADER_NAME, build_telemetry_header, install_logfire_response_hook
from ..tracer import SDKTracerProvider
from .auth import parse_auth, parse_logout
from .prompt import parse_prompt
Expand Down Expand Up @@ -434,8 +435,10 @@ def log_trace_id(response: requests.Response, context: ContextCarrier, *args: An
else:
with tracer.start_as_current_span('logfire._internal.cli'), requests.Session() as session:
context = get_context()
session.hooks = {'response': functools.partial(log_trace_id, context=context)}
session.hooks = {'response': [functools.partial(log_trace_id, context=context)]}
session.headers.update(context)
session.headers[TELEMETRY_HEADER_NAME] = build_telemetry_header()
install_logfire_response_hook(session)
namespace._session = session
namespace.func(namespace)

Expand Down
14 changes: 13 additions & 1 deletion logfire/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from logfire.version import VERSION

from .auth import UserToken, UserTokenCollection
from .telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from .utils import UnexpectedResponse

UA_HEADER = f'logfire/{VERSION}'
Expand Down Expand Up @@ -37,7 +42,14 @@ def __init__(self, user_token: UserToken) -> None:
self.base_url = user_token.base_url
self._token = user_token.token
self._session = Session()
self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER})
self._session.headers.update(
{
'Authorization': self._token,
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: build_telemetry_header(),
}
)
install_logfire_response_hook(self._session)

@classmethod
def from_url(cls, base_url: str | None) -> Self:
Expand Down
28 changes: 24 additions & 4 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .stack_info import warn_at_user_stacklevel
from .telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
SeededRandomIdGenerator,
Expand Down Expand Up @@ -1125,10 +1130,16 @@ def check_tokens():
thread.start()

# Create exporters for each token
telemetry_header_value = build_telemetry_header(self)
for token in token_list:
base_url = self.advanced.generate_base_url(token)
headers = {'User-Agent': f'logfire/{VERSION}', 'Authorization': token}
headers = {
'User-Agent': f'logfire/{VERSION}',
'Authorization': token,
TELEMETRY_HEADER_NAME: telemetry_header_value,
}
session = OTLPExporterHttpSession()
install_logfire_response_hook(session)
span_exporter = BodySizeCheckingOTLPSpanExporter(
endpoint=urljoin(base_url, '/v1/traces'),
session=session,
Expand Down Expand Up @@ -1453,7 +1464,11 @@ def warn_if_not_initialized(self, message: str):
)

def _initialize_credentials_from_token(self, token: str) -> LogfireCredentials | None:
return LogfireCredentials.from_token(token, requests.Session(), self.advanced.generate_base_url(token))
session = requests.Session()
install_logfire_response_hook(session)
return LogfireCredentials.from_token(
token, session, self.advanced.generate_base_url(token), telemetry_header=build_telemetry_header(self)
)

def _ensure_flush_after_aws_lambda(self):
"""Ensure that `force_flush` is called after an AWS Lambda invocation.
Expand Down Expand Up @@ -1601,7 +1616,9 @@ def load_creds_file(cls, creds_dir: Path) -> Self | None:
raise LogfireConfigError(f'Invalid credentials file: {path} - {e}') from e

@classmethod
def from_token(cls, token: str, session: requests.Session, base_url: str) -> Self | None:
def from_token(
cls, token: str, session: requests.Session, base_url: str, telemetry_header: str | None = None
) -> Self | None:
"""Check that the token is valid.

Issue a warning if the Logfire API is unreachable, or we get a response other than 200 or 401.
Expand All @@ -1611,11 +1628,14 @@ def from_token(cls, token: str, session: requests.Session, base_url: str) -> Sel
Raises:
LogfireConfigError: If the token is invalid.
"""
headers: dict[str, str] = {**COMMON_REQUEST_HEADERS, 'Authorization': token}
if telemetry_header is not None:
headers[TELEMETRY_HEADER_NAME] = telemetry_header
try:
response = session.get(
urljoin(base_url, '/v1/info'),
timeout=10,
headers={**COMMON_REQUEST_HEADERS, 'Authorization': token},
headers=headers,
)
except requests.RequestException as e:
warnings.warn(f'Logfire API is unreachable, you may have trouble sending data. Error: {e}')
Expand Down
127 changes: 127 additions & 0 deletions logfire/_internal/telemetry_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""SDK <-> server out-of-band metadata exchanged via custom HTTP headers.

* `X-Logfire-Telemetry` (request): non-sensitive information about the SDK and how
it is configured. Used by the backend to answer questions like which SDK
versions are still in active use, which Python versions we can drop, and which
configuration options users actually enable. Secrets (`token`, `api_key`,
`service_name`, etc.) are never included.
* `X-Logfire-Warning` (response): an out-of-band warning the server wants the
user to see. Surfaced via `warnings.warn(...)`; the standard "default" filter
deduplicates identical messages so a chatty server only warns once.
* `X-Logfire-Error` (response): an out-of-band error the server wants the SDK
to raise. Always raised — callers that want to keep working past it (the OTLP
pipeline, the variables provider) already swallow exceptions from their HTTP
calls.
"""

from __future__ import annotations

import platform
import sys
import warnings
from typing import TYPE_CHECKING, Any

import requests

from logfire.exceptions import LogfireServerError, LogfireServerWarning
from logfire.version import VERSION

if TYPE_CHECKING:
from .config import _LogfireConfigData # pyright: ignore[reportPrivateUsage]


TELEMETRY_HEADER_NAME = 'X-Logfire-Telemetry'
WARNING_HEADER_NAME = 'X-Logfire-Warning'
ERROR_HEADER_NAME = 'X-Logfire-Error'


def _format_value(value: object) -> str:
if isinstance(value, bool):
return 'true' if value else 'false'
if value is None:
return 'none'
return str(value)


def _base_telemetry_pairs() -> dict[str, str]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't hurt to:

Suggested change
def _base_telemetry_pairs() -> dict[str, str]:
@functools.cache
def _base_telemetry_pairs() -> dict[str, str]:

return {
'sdk_version': VERSION,
'sdk_language': 'python',
'python_version': platform.python_version(),
'runtime': sys.implementation.name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like implementation better matches the Python documentation wording:

Suggested change
'runtime': sys.implementation.name,
'implementation': sys.implementation.name,

'os': sys.platform,
}


def _config_telemetry_pairs(config: _LogfireConfigData) -> dict[str, str]:
"""Pick fields of `_LogfireConfigData` that are useful for product analytics.

Only non-sensitive booleans / counts / numeric values are included; never the token,
api_key, service_name, environment, or anything else that could identify a user
or their deployment.
"""
pairs: dict[str, str] = {}
pairs['send_to_logfire'] = _format_value(config.send_to_logfire)
pairs['inspect_arguments'] = _format_value(config.inspect_arguments)
pairs['distributed_tracing'] = _format_value(config.distributed_tracing)
pairs['add_baggage_to_attributes'] = _format_value(config.add_baggage_to_attributes)
pairs['min_level'] = _format_value(config.min_level)
pairs['console_enabled'] = _format_value(config.console is not False)
pairs['scrubbing_enabled'] = _format_value(config.scrubbing is not False)
pairs['code_source_set'] = _format_value(config.code_source is not None)
pairs['variables_set'] = _format_value(config.variables is not None)
pairs['service_version_set'] = _format_value(config.service_version is not None)
pairs['environment_set'] = _format_value(config.environment is not None)
pairs['additional_span_processors'] = _format_value(len(config.additional_span_processors or ()))

token = config.token
if isinstance(token, list):
token_count = len(token)
elif token:
token_count = 1
else:
token_count = 0
pairs['token_count'] = _format_value(token_count)

sampling = getattr(config, 'sampling', None)
if sampling is not None:
head = sampling.head
if isinstance(head, (int, float)):
pairs['sampling_head'] = _format_value(head)
else:
pairs['sampling_head'] = 'custom'
pairs['sampling_tail'] = _format_value(sampling.tail is not None)

return pairs


def build_telemetry_header(config: _LogfireConfigData | None = None) -> str:
"""Return the `key=val,key2=val` value for the `X-Logfire-Telemetry` header."""
pairs = _base_telemetry_pairs()
if config is not None:
pairs.update(_config_telemetry_pairs(config))
return ','.join(f'{key}={value}' for key, value in pairs.items())


def process_logfire_response_headers(response: requests.Response, *_args: Any, **_kwargs: Any) -> requests.Response:
"""Handle `X-Logfire-Warning` / `X-Logfire-Error` headers on a Logfire API response.

Designed to be installed as a `requests` response hook
(`session.hooks['response'].append(...)`).
"""
warning_message = response.headers.get(WARNING_HEADER_NAME)
if warning_message:
warnings.warn(warning_message, LogfireServerWarning, stacklevel=2)
error_message = response.headers.get(ERROR_HEADER_NAME)
if error_message:
raise LogfireServerError(error_message)
return response


def install_logfire_response_hook(session: requests.Session) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it should be a separate PR

"""Install `process_logfire_response_headers` as a response hook on `session`."""
existing: Any = session.hooks.setdefault('response', [])
hooks: list[Any] = list(existing) if isinstance(existing, list) else [existing] # pyright: ignore[reportUnknownArgumentType]
if process_logfire_response_headers not in hooks:
hooks.append(process_logfire_response_headers)
session.hooks['response'] = hooks
8 changes: 8 additions & 0 deletions logfire/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@

class LogfireConfigError(ValueError):
"""Error raised when there is a problem with the Logfire configuration."""


class LogfireServerError(Exception):
"""Error raised when the Logfire server returns an `X-Logfire-Error` header on a response."""


class LogfireServerWarning(UserWarning):
"""Warning emitted when the Logfire server returns an `X-Logfire-Warning` header on a response."""
17 changes: 16 additions & 1 deletion logfire/variables/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

from logfire._internal.client import UA_HEADER
from logfire._internal.config import VariablesOptions
from logfire._internal.telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from logfire._internal.utils import UnexpectedResponse
from logfire.variables.abstract import (
ResolvedVariable,
Expand Down Expand Up @@ -67,8 +72,16 @@ def __init__(self, base_url: str, token: str, options: VariablesOptions):

self._base_url = base_url
self._token = token
self._telemetry_header = build_telemetry_header()
self._session = Session()
self._session.headers.update({'Authorization': f'bearer {token}', 'User-Agent': UA_HEADER})
self._session.headers.update(
{
'Authorization': f'bearer {token}',
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: self._telemetry_header,
}
)
install_logfire_response_hook(self._session)
self._timeout = options.timeout
self._block_before_first_fetch = block_before_first_resolve
self._polling_interval: timedelta = (
Expand Down Expand Up @@ -193,10 +206,12 @@ def _sse_listener(self): # pragma: no cover
{
'Authorization': f'bearer {self._token}',
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: self._telemetry_header,
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
)
install_logfire_response_hook(sse_session)

# Open streaming connection
response = sse_session.get(sse_url, stream=True, timeout=(10, None))
Expand Down
Loading
Loading