Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tmonk committed Mar 6, 2024
1 parent 4601e16 commit 2b68691
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 170 deletions.
8 changes: 6 additions & 2 deletions src/snowflake/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ def add_event(
name: str,
attributes: types.Attributes = None,
) -> None:
"""Add an event name and associated attributes to the current span."""
"""
Add an event name and associated attributes to the current span.
"""
trace.get_current_span().add_event(name, attributes)

def set_span_attribute(
key: str,
value: types.AttributeValue
) -> None:
"""Set an attribute key, value pair on the current span."""
"""
Set an attribute key, value pair on the current span.
"""
trace.get_current_span().set_attribute(key, value)
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
This module is a temporary bridge from opentelemetry 1.12.0 our current
dependency, which does not have common encoder functions to the later versions
of opentelemetry, which do have common encoder functions in the
opentelemetry-exporter-otlp-proto-common package.
"""

from typing import Sequence

from opentelemetry.exporter.otlp.proto.http._log_exporter import encoder
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ExportLogsServiceRequest
from opentelemetry.proto.logs.v1.logs_pb2 import LogsData
from opentelemetry.sdk._logs import LogData


def _encode_logs(batch: Sequence[LogData]) -> ExportLogsServiceRequest:
# Will no longer rely on _encode_resource_logs after we upgrade to v1.19.0 or later
resource_logs = encoder._encode_resource_logs(batch)
resource_logs = encoder._encode_resource_logs(batch) # pylint: disable=protected-access
return ExportLogsServiceRequest(resource_logs=resource_logs)

def serialize_logs_data(batch: Sequence[LogData]) -> bytes:
return LogsData(
resource_logs=_encode_logs(batch).resource_logs
).SerializeToString()


__all__ = [
"serialize_logs_data",
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,20 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
This module is a temporary bridge from opentelemetry 1.12.0 our current
dependency, which does not have common encoder functions to the later versions
of opentelemetry, which do have common encoder functions in the
opentelemetry-exporter-otlp-proto-common package.
"""

from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ExportMetricsServiceRequest
from opentelemetry.proto.metrics.v1.metrics_pb2 import MetricsData as PB2MetricsData
from opentelemetry.sdk.metrics.export import MetricsData


_exporter = OTLPMetricExporter()

def _encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
# Will no longer rely on _translate_data after we upgrade to v1.19.0 or later
return _exporter._translate_data(data)

def serialize_metrics_data(data: MetricsData) -> bytes:
return PB2MetricsData(
resource_metrics=_encode_metrics(data).resource_metrics
).SerializeToString()


__all__ = [
"serialize_metrics_data",
]
return _exporter._translate_data(data) # pylint: disable=protected-access
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
This module is a temporary bridge from opentelemetry 1.12.0 our current
dependency, which does not have common encoder functions to the later versions
of opentelemetry, which do have common encoder functions in the
opentelemetry-exporter-otlp-proto-common package.
"""

from typing import Sequence

from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import _ProtobufEncoder
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ExportTraceServiceRequest
from opentelemetry.proto.trace.v1.trace_pb2 import TracesData
from opentelemetry.sdk.trace import ReadableSpan


Expand All @@ -15,15 +21,3 @@ def _encode_spans(
) -> ExportTraceServiceRequest:
# Will no longer rely on _ProtobufEncoder after we upgrade to v1.19.0 or later
return _ProtobufEncoder.encode(sdk_spans)

def serialize_traces_data(
sdk_spans: Sequence[ReadableSpan],
) -> bytes:
return TracesData(
resource_spans=_encode_spans(sdk_spans).resource_spans
).SerializeToString()


__all__ = [
"serialize_traces_data",
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,89 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
This module allows the user to write logs serialized as protobuf messages to
the preferred location by implementing the write_logs() abstract method. The
only classes that should be accessed outside of this module are:
- LogWriter
- SnowflakeLoggingHandler
Please see the class documentation for those classes to learn more.
"""

import abc
import enum
import logging
import logging.config
import threading
import typing
import opentelemetry.sdk.util.instrumentation as otel_instrumentation

from opentelemetry.proto.logs.v1.logs_pb2 import LogsData
from opentelemetry.sdk import resources
from opentelemetry.sdk._logs import export
from opentelemetry.sdk import _logs
from opentelemetry.util import types
from snowflake.telemetry._internal.encoder.otlp.proto.common.log_encoder import (
serialize_logs_data
_encode_logs,
)

LOGGER_NAME_TEMP_ATTRIBUTE = "__snow.logging.temp.logger_name"
FILEPATH_ATTRIBUTE = "code.filepath"
FUNCTION_NAME_ATTRIBUTE = "code.function"


class LogWriterResult(enum.Enum):
SUCCESS = 0
FAILURE = 1


# LogWriter abstract class with one abstract method that can be overwritten:
# pylint: disable=too-few-public-methods
class LogWriter(abc.ABC):

"""
LogWriter abstract base class with one abstract method that must be
implemented by the user.
"""
@abc.abstractmethod
def write_logs(self, serialized_logs: bytes) -> LogWriterResult:
"""quick"""
def write_logs(self, serialized_logs: bytes) -> None:
"""
Implement this method to write the serialized protobuf message to your
preferred location. For an example implementation, see
InMemoryLogWriter in the tests folder.
"""


class _ProtoLogExporter(export.LogExporter):
"""
_ProtoLogExporter is an internal implementing class that should not be used
or accessed outside this module.
"""

def __init__(self, log_writer: LogWriter):
super().__init__()
self.log_writer = log_writer

def export(self, batch: typing.Sequence[_logs.LogData]):
result = self.log_writer.write_logs(serialize_logs_data(batch))
if result == LogWriterResult.FAILURE:
def export(self, batch: typing.Sequence[_logs.LogData]) -> export.LogExportResult:
try:
self.log_writer.write_logs(
_ProtoLogExporter._serialize_logs_data(batch)
)
return export.LogExportResult.SUCCESS
except Exception:
return export.LogExportResult.FAILURE
return export.LogExportResult.SUCCESS

@staticmethod
def _serialize_logs_data(batch: typing.Sequence[_logs.LogData]) -> bytes:
# pylint gets confused by protobuf-generated code, that's why we must
# disable the no-member check below.
return LogsData(
resource_logs=_encode_logs(batch).resource_logs # pylint: disable=no-member
).SerializeToString()

def shutdown(self):
pass


class SnowflakeLoggingHandler(_logs.LoggingHandler):
"""A subclass of OpenTelemetry's LoggingHandler that preserves attributes discarded by the original implementation."""
"""
A subclass of OpenTelemetry's LoggingHandler that preserves attributes
discarded by the original implementation.
"""

_FILEPATH_ATTRIBUTE = "code.filepath"
_FUNCTION_NAME_ATTRIBUTE = "code.function"
LOGGER_NAME_TEMP_ATTRIBUTE = "__snow.logging.temp.logger_name"

def __init__(
self,
Expand All @@ -66,41 +97,50 @@ def __init__(
super().__init__()

@staticmethod
def _getSnowflakeLogLevelName(pyLevelName):
"""Adapted from the getSnowflakeLogLevelName in XP's logger.py"""
level = pyLevelName.upper()
def _get_snowflake_log_level_name(py_level_name):
"""
Adapted from the getSnowflakeLogLevelName method in XP's logger.py
"""
level = py_level_name.upper()
if level == "WARNING":
return "WARN"
elif level == "CRITICAL":
if level == "CRITICAL":
return "FATAL"
elif level == "NOTSET":
if level == "NOTSET":
return "TRACE"
else:
return level
return level

def _get_attributes(self, record: logging.LogRecord) -> types.Attributes:
attributes = super(SnowflakeLoggingHandler, self)._get_attributes(record)
@staticmethod
def _get_attributes(record: logging.LogRecord) -> types.Attributes:
attributes = _logs.LoggingHandler._get_attributes(record) # pylint: disable=protected-access

# Adding attributes that were discarded by the base class
attributes[FILEPATH_ATTRIBUTE] = record.pathname
attributes[FUNCTION_NAME_ATTRIBUTE] = record.funcName
# Adding attributes that were discarded by the base class's
# _get_attributes() method
# TODO (SNOW-1210317) Remove these when upgrading to opentelemetry-python 1.23
attributes[SnowflakeLoggingHandler._FILEPATH_ATTRIBUTE] = record.pathname
attributes[SnowflakeLoggingHandler._FUNCTION_NAME_ATTRIBUTE] = record.funcName

# Temporarily storing logger's name in record's attributes.
# This attribute will be removed by the emitter.
#
# TODO(SNOW-975220): Upgrade to OpenTelemetry 1.20.0 or later
# TODO(SNOW-1210317): Upgrade to OpenTelemetry 1.20.0 or later
# and use OpenTelemetry's LoggerProvider.
attributes[LOGGER_NAME_TEMP_ATTRIBUTE] = record.name
attributes[SnowflakeLoggingHandler.LOGGER_NAME_TEMP_ATTRIBUTE] = record.name
return attributes

def _translate(self, record: logging.LogRecord) -> _logs.LogRecord:
otel_record = super(SnowflakeLoggingHandler, self)._translate(record)
otel_record.severity_text = SnowflakeLoggingHandler._getSnowflakeLogLevelName(record.levelname)
otel_record = super()._translate(record)
otel_record.severity_text = SnowflakeLoggingHandler._get_snowflake_log_level_name(
record.levelname
)
return otel_record


class _SnowflakeTelemetryLogEmitter(_logs.LogEmitter):
"""A log emitter which creates an InstrumentationScope for each logger name it encounters."""
"""
A log emitter which creates an InstrumentationScope for each logger name it
encounters.
"""

def __init__(
self,
Expand All @@ -110,29 +150,29 @@ def __init__(
],
instrumentation_scope: otel_instrumentation.InstrumentationScope,
):
super(_SnowflakeTelemetryLogEmitter, self).__init__(resource, multi_log_processor, instrumentation_scope)
super().__init__(resource, multi_log_processor, instrumentation_scope)
self._lock = threading.Lock()
self.cached_scopes = {}

def emit(self, record: _logs.LogRecord):
if LOGGER_NAME_TEMP_ATTRIBUTE not in record.attributes:
if SnowflakeLoggingHandler.LOGGER_NAME_TEMP_ATTRIBUTE not in record.attributes:
# The record doesn't contain our custom attribute with a logger name,
# so we can call the superclass's `emit` method. It will emit a log
# record with the default instrumentation scope.
super(_SnowflakeTelemetryLogEmitter, self).emit(record)
super().emit(record)
return

# Creating an InstrumentationScope for each logger name,
# and caching those scopes.
logger_name = record.attributes[LOGGER_NAME_TEMP_ATTRIBUTE]
del record.attributes[LOGGER_NAME_TEMP_ATTRIBUTE]
logger_name = record.attributes[SnowflakeLoggingHandler.LOGGER_NAME_TEMP_ATTRIBUTE]
del record.attributes[SnowflakeLoggingHandler.LOGGER_NAME_TEMP_ATTRIBUTE]
with self._lock:
if logger_name in self.cached_scopes:
current_scope = self.cached_scopes[logger_name]
else:
current_scope = otel_instrumentation.InstrumentationScope(logger_name)
self.cached_scopes[logger_name] = current_scope

# Emitting a record with a scope that corresponds to the logger
# that logged it. NOT calling the superclass here for two reasons:
# 1. LogEmitter.emit takes a LogRecord, not LogData.
Expand All @@ -141,13 +181,11 @@ def emit(self, record: _logs.LogRecord):
log_data = _logs.LogData(record, current_scope)
self._multi_log_processor.emit(log_data)

# Remembering the fact that we emitted a log record.
global _log_records_emitted
_log_records_emitted = True


class _SnowflakeTelemetryLogEmitterProvider(_logs.LogEmitterProvider):
"""A log emitter provider that creates SnowflakeTelemetryLogEmitters"""
"""
A log emitter provider that creates SnowflakeTelemetryLogEmitters
"""

def get_log_emitter(
self,
Expand All @@ -165,6 +203,5 @@ def get_log_emitter(

__all__ = [
"LogWriter",
"LogWriterResult",
"SnowflakeLoggingHandler",
]
Loading

0 comments on commit 2b68691

Please sign in to comment.