Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion logfire/_internal/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ..client import LogfireClient
from ..config import REGIONS, LogfireCredentials, get_base_url_from_token
from ..config_params import ParamManager
from ..telemetry_header import TELEMETRY_HEADER_NAME, build_telemetry_header, install_logfire_response_hook
from ..tracer import SDKTracerProvider
from .auth import parse_auth, parse_logout
from .prompt import parse_prompt
Expand Down Expand Up @@ -434,8 +435,10 @@ def log_trace_id(response: requests.Response, context: ContextCarrier, *args: An
else:
with tracer.start_as_current_span('logfire._internal.cli'), requests.Session() as session:
context = get_context()
session.hooks = {'response': functools.partial(log_trace_id, context=context)}
session.hooks = {'response': [functools.partial(log_trace_id, context=context)]}
session.headers.update(context)
session.headers[TELEMETRY_HEADER_NAME] = build_telemetry_header()
install_logfire_response_hook(session)
namespace._session = session
namespace.func(namespace)

Expand Down
14 changes: 13 additions & 1 deletion logfire/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from logfire.version import VERSION

from .auth import UserToken, UserTokenCollection
from .telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from .utils import UnexpectedResponse

UA_HEADER = f'logfire/{VERSION}'
Expand Down Expand Up @@ -37,7 +42,14 @@ def __init__(self, user_token: UserToken) -> None:
self.base_url = user_token.base_url
self._token = user_token.token
self._session = Session()
self._session.headers.update({'Authorization': self._token, 'User-Agent': UA_HEADER})
self._session.headers.update(
{
'Authorization': self._token,
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: build_telemetry_header(),
}
)
install_logfire_response_hook(self._session)

@classmethod
def from_url(cls, base_url: str | None) -> Self:
Expand Down
39 changes: 35 additions & 4 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .stack_info import warn_at_user_stacklevel
from .telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
SeededRandomIdGenerator,
Expand Down Expand Up @@ -887,6 +892,9 @@ def __init__(
# This ensures that we only call OTEL's global set_tracer_provider once to avoid warnings.
self._has_set_providers = False
self._initialized = False
# Resolved in `_initialize` once the resource (and therefore its `service.instance.id`)
# exists; until then there is no value to advertise to the backend.
self._service_instance_id: str = ''
self._lock = RLock()

def configure(
Expand Down Expand Up @@ -996,6 +1004,9 @@ def _initialize(self) -> None:
# https://github.com/open-telemetry/semantic-conventions/blob/e44693245eef815071402b88c3a44a8f7f8f24c8/docs/resource/README.md#service-experimental
# Both recommend generating a UUID.
resource = Resource({'service.instance.id': uuid4().hex}).merge(resource)
# Cache the resolved service.instance.id so the X-Logfire-Telemetry header
# advertises the same UUID the OTLP resource attributes carry.
self._service_instance_id = str(resource.attributes.get('service.instance.id', ''))

head = self.sampling.head
sampler: Sampler | None = None
Expand Down Expand Up @@ -1125,10 +1136,16 @@ def check_tokens():
thread.start()

# Create exporters for each token
telemetry_header_value = build_telemetry_header(self)
for token in token_list:
base_url = self.advanced.generate_base_url(token)
headers = {'User-Agent': f'logfire/{VERSION}', 'Authorization': token}
headers = {
'User-Agent': f'logfire/{VERSION}',
'Authorization': token,
TELEMETRY_HEADER_NAME: telemetry_header_value,
}
session = OTLPExporterHttpSession()
install_logfire_response_hook(session)
span_exporter = BodySizeCheckingOTLPSpanExporter(
endpoint=urljoin(base_url, '/v1/traces'),
session=session,
Expand Down Expand Up @@ -1305,6 +1322,7 @@ def fix_pid(): # pragma: no cover
base_url=base_url,
token=self.api_key,
options=self.variables,
telemetry_header=build_telemetry_header(self),
)
multi_log_processor = SynchronousMultiLogRecordProcessor()
for processor in log_record_processors:
Expand Down Expand Up @@ -1437,6 +1455,7 @@ def _lazy_init_variable_provider(self) -> VariableProvider:
base_url=base_url,
token=api_key,
options=options,
telemetry_header=build_telemetry_header(self),
)
self._variable_provider = provider
provider.start(Logfire(config=self))
Expand All @@ -1453,7 +1472,14 @@ def warn_if_not_initialized(self, message: str):
)

def _initialize_credentials_from_token(self, token: str) -> LogfireCredentials | None:
return LogfireCredentials.from_token(token, requests.Session(), self.advanced.generate_base_url(token))
session = requests.Session()
install_logfire_response_hook(session)
return LogfireCredentials.from_token(
token,
session,
self.advanced.generate_base_url(token),
telemetry_header=build_telemetry_header(self),
)

def _ensure_flush_after_aws_lambda(self):
"""Ensure that `force_flush` is called after an AWS Lambda invocation.
Expand Down Expand Up @@ -1601,7 +1627,9 @@ def load_creds_file(cls, creds_dir: Path) -> Self | None:
raise LogfireConfigError(f'Invalid credentials file: {path} - {e}') from e

@classmethod
def from_token(cls, token: str, session: requests.Session, base_url: str) -> Self | None:
def from_token(
cls, token: str, session: requests.Session, base_url: str, telemetry_header: str | None = None
) -> Self | None:
"""Check that the token is valid.

Issue a warning if the Logfire API is unreachable, or we get a response other than 200 or 401.
Expand All @@ -1611,11 +1639,14 @@ def from_token(cls, token: str, session: requests.Session, base_url: str) -> Sel
Raises:
LogfireConfigError: If the token is invalid.
"""
headers: dict[str, str] = {**COMMON_REQUEST_HEADERS, 'Authorization': token}
if telemetry_header is not None:
headers[TELEMETRY_HEADER_NAME] = telemetry_header
try:
response = session.get(
urljoin(base_url, '/v1/info'),
timeout=10,
headers={**COMMON_REQUEST_HEADERS, 'Authorization': token},
headers=headers,
)
except requests.RequestException as e:
warnings.warn(f'Logfire API is unreachable, you may have trouble sending data. Error: {e}')
Expand Down
129 changes: 129 additions & 0 deletions logfire/_internal/telemetry_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""SDK <-> server out-of-band metadata exchanged via custom HTTP headers.

* `X-Logfire-Telemetry` (request): non-sensitive information about the SDK and how
it is configured, encoded as a compact JSON object. Used by the backend to
answer questions like which SDK versions are still in active use, which Python
versions we can drop, and which configuration options users actually enable.
Secrets (`token`, `api_key`, `service_name`, etc.) are never included.
* `X-Logfire-Warning` (response): an out-of-band warning the server wants the
user to see. Surfaced via `warnings.warn(...)`; the standard "default" filter
deduplicates identical messages so a chatty server only warns once.
* `X-Logfire-Error` (response): an out-of-band error the server wants the SDK
to raise. Always raised — callers that want to keep working past it (the OTLP
pipeline, the variables provider) already swallow exceptions from their HTTP
calls.
"""

from __future__ import annotations

import functools
import json
import sys
import warnings
from typing import TYPE_CHECKING, Any

import requests

from logfire.exceptions import LogfireServerError, LogfireServerWarning
from logfire.version import VERSION

if TYPE_CHECKING:
from .config import LogfireConfig


TELEMETRY_HEADER_NAME = 'X-Logfire-Telemetry'
WARNING_HEADER_NAME = 'X-Logfire-Warning'
ERROR_HEADER_NAME = 'X-Logfire-Error'


@functools.cache
def _base_telemetry_pairs() -> dict[str, Any]:
# Each field below has an explicit rationale; do not add a field unless you have one.
return {
# SDK version: the primary signal for deprecation planning — which versions
# are still in active use so we know when it is safe to drop one.
'sdk_version': VERSION,
# SDK language: lets the same backend ingestion logic distinguish python
# from future SDKs (JS, Rust) without having to parse User-Agent.
'sdk_language': 'python',
# Python version: tells us when we can drop support for an older Python.
'python_version': f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}',
# Implementation: spotting non-CPython users (pypy, graalpy) before
# changing anything that depends on CPython-specific behaviour.
'implementation': sys.implementation.name,
Comment thread
adriangb marked this conversation as resolved.
# OS: same idea — confirm Windows / Linux / macOS coverage before
# touching platform-sensitive code paths.
'os': sys.platform,
}


def _config_telemetry_pairs(config: LogfireConfig) -> dict[str, Any]:
"""Pick fields of `LogfireConfig` that are useful for product analytics.

Each field below has an explicit rationale; do not add a field unless you have
one. Everything else either duplicates information the server already knows,
isn't actionable, or risks leaking sensitive data (token, api_key,
service_name, environment, etc.).
"""
# Multi-project usage: how many users configure more than one write token in
# a single SDK instance. Drives auth/routing roadmap decisions.
token = config.token
if isinstance(token, list):
token_count = len(token)
elif token:
token_count = 1
else:
token_count = 0

pairs: dict[str, Any] = {
# Adoption signal for the `code_source=` option (newer feature): tells us
# whether the integration with the source-code link UI is worth investing in.
'code_source_set': config.code_source is not None,
# Adoption signal for the variables / feature-flag feature (newer feature):
# informs whether to keep building on it.
'variables_set': config.variables is not None,
'token_count': token_count,
}

if config._service_instance_id: # pyright: ignore[reportPrivateUsage]
# Mirrors the OTLP resource attribute of the same name
# (https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-instance-id).
# Carrying it on the header lets the backend correlate metadata with the spans
# this SDK instance is exporting, even on requests that don't carry an OTLP body
# (token validation, variables fetch, CRUD endpoints).
pairs['service_instance_id'] = config._service_instance_id # pyright: ignore[reportPrivateUsage]

return pairs


def build_telemetry_header(config: LogfireConfig | None = None) -> str:
"""Return the JSON-encoded value for the `X-Logfire-Telemetry` header."""
pairs: dict[str, Any] = {**_base_telemetry_pairs()}
if config is not None:
pairs.update(_config_telemetry_pairs(config))
return json.dumps(pairs, separators=(',', ':'))


def process_logfire_response_headers(response: requests.Response, *_args: Any, **_kwargs: Any) -> requests.Response:
"""Handle `X-Logfire-Warning` / `X-Logfire-Error` headers on a Logfire API response.

Designed to be installed as a `requests` response hook
(`session.hooks['response'].append(...)`).
"""
warning_message = response.headers.get(WARNING_HEADER_NAME)
if warning_message:
warnings.warn(warning_message, LogfireServerWarning, stacklevel=2)
error_message = response.headers.get(ERROR_HEADER_NAME)
if error_message:
raise LogfireServerError(error_message)
return response


def install_logfire_response_hook(session: requests.Session) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it should be a separate PR

"""Install `process_logfire_response_headers` as a response hook on `session`.

`requests.Session()` always initialises `hooks['response']` to a list, and every
call site here passes a freshly-built session, so we just append.
"""
response_hooks: list[Any] = session.hooks.setdefault('response', [])
response_hooks.append(process_logfire_response_headers)
8 changes: 8 additions & 0 deletions logfire/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@

class LogfireConfigError(ValueError):
"""Error raised when there is a problem with the Logfire configuration."""


class LogfireServerError(Exception):
"""Error raised when the Logfire server returns an `X-Logfire-Error` header on a response."""


class LogfireServerWarning(UserWarning):
"""Warning emitted when the Logfire server returns an `X-Logfire-Warning` header on a response."""
6 changes: 2 additions & 4 deletions logfire/experimental/query_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from __future__ import annotations

import platform
from datetime import datetime
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generic, Literal, TypedDict, TypeVar

from typing_extensions import Self

from logfire import VERSION
from logfire._internal.config import get_base_url_from_token
from logfire._internal.telemetry_header import TELEMETRY_HEADER_NAME, build_telemetry_header

try:
from httpx import AsyncClient, Client, Response, Timeout
Expand Down Expand Up @@ -87,7 +86,6 @@ def _rows_to_columns(result: RowQueryResults) -> QueryResults:


_ACCEPT = Literal['application/json', 'application/vnd.apache.arrow.stream', 'text/csv']
_USER_AGENT = f'logfire-sdk-python/{VERSION} (Python {platform.python_version()}, os {platform.platform()}, arch {platform.machine()})'
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the user agent? Why not do the opposite and put everything in there?



class _BaseLogfireQueryClient(Generic[T]):
Expand All @@ -97,7 +95,7 @@ def __init__(self, base_url: str, read_token: str, timeout: Timeout, client: typ
self.timeout = timeout
headers = client_kwargs.pop('headers', {})
headers['authorization'] = read_token
headers.setdefault('user-agent', _USER_AGENT)
headers[TELEMETRY_HEADER_NAME] = build_telemetry_header()
self.client: T = client(timeout=timeout, base_url=base_url, headers=headers, **client_kwargs)

def _build_query_params(
Expand Down
23 changes: 21 additions & 2 deletions logfire/variables/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

from logfire._internal.client import UA_HEADER
from logfire._internal.config import VariablesOptions
from logfire._internal.telemetry_header import (
TELEMETRY_HEADER_NAME,
build_telemetry_header,
install_logfire_response_hook,
)
from logfire._internal.utils import UnexpectedResponse
from logfire.variables.abstract import (
ResolvedVariable,
Expand Down Expand Up @@ -54,21 +59,33 @@ class LogfireRemoteVariableProvider(VariableProvider):
The threading implementation draws heavily from opentelemetry.sdk._shared_internal.BatchProcessor.
"""

def __init__(self, base_url: str, token: str, options: VariablesOptions):
def __init__(self, base_url: str, token: str, options: VariablesOptions, telemetry_header: str | None = None):
"""Create a new remote variable provider.

Args:
base_url: The base URL of the Logfire API.
token: Authentication token for the Logfire API.
options: Options for retrieving remote variables.
telemetry_header: Pre-built `X-Logfire-Telemetry` header value carrying the
SDK's `service.instance.id` so it matches the OTLP resource attribute.
When None (e.g. lazily instantiated outside of `_initialize`), a base
header without config-derived fields is built here.
"""
block_before_first_resolve = options.block_before_first_resolve
polling_interval = options.polling_interval

self._base_url = base_url
self._token = token
self._telemetry_header = telemetry_header if telemetry_header is not None else build_telemetry_header()
self._session = Session()
self._session.headers.update({'Authorization': f'bearer {token}', 'User-Agent': UA_HEADER})
self._session.headers.update(
{
'Authorization': f'bearer {token}',
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: self._telemetry_header,
}
)
install_logfire_response_hook(self._session)
self._timeout = options.timeout
self._block_before_first_fetch = block_before_first_resolve
self._polling_interval: timedelta = (
Expand Down Expand Up @@ -193,10 +210,12 @@ def _sse_listener(self): # pragma: no cover
{
'Authorization': f'bearer {self._token}',
'User-Agent': UA_HEADER,
TELEMETRY_HEADER_NAME: self._telemetry_header,
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
}
)
install_logfire_response_hook(sse_session)

# Open streaming connection
response = sse_session.get(sse_url, stream=True, timeout=(10, None))
Expand Down
Loading
Loading