Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ module State
# The AsynchronousMetricStream class provides SDK internal functionality that is not a part of the
# public API. It extends MetricStream to support asynchronous instruments.
class AsynchronousMetricStream < MetricStream
DEFAULT_TIMEOUT = 30

def initialize(
name,
description,
Expand Down Expand Up @@ -48,23 +50,21 @@ def collect(start_time, end_time)
def invoke_callback(timeout, attributes)
if @registered_views.empty?
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
@default_aggregation.update(value, attributes, @data_points)
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
@default_aggregation.update(value, attributes, @data_points) if value.is_a?(Numeric)
end
end
else
@registered_views.each do |view|
@registered_views.each do |view, data_points|
@mutex.synchronize do
Timeout.timeout(timeout || 30) do
@callback.each do |cb|
value = cb.call
merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
end
@callback.each do |cb|
value = safe_guard_callback(cb, timeout: timeout)
next unless value.is_a?(Numeric) # ignore if value is not valid number

merged_attributes = attributes || {}
merged_attributes.merge!(view.attribute_keys)
view.aggregation.update(value, merged_attributes, data_points) if view.valid_aggregation?
end
end
end
Expand All @@ -74,6 +74,29 @@ def invoke_callback(timeout, attributes)
def now_in_nano
(Time.now.to_r * 1_000_000_000).to_i
end

private

def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
result = nil
thread = Thread.new do
result = callback.call
rescue StandardError => e
OpenTelemetry.logger.error("Error invoking callback: #{e.message}")
result = :error
end

unless thread.join(timeout)
thread.kill
OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds")
return nil
end

result == :error ? nil : result
rescue StandardError => e
OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}")
nil
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def initialize(
@instrumentation_scope = instrumentation_scope
@default_aggregation = aggregation
@data_points = {}
@registered_views = []
@registered_views = {}

find_registered_view
@mutex = Mutex.new
Expand All @@ -43,36 +43,40 @@ def collect(start_time, end_time)
metric_data = []

# data points are required to export over OTLP
return metric_data if @data_points.empty?
return metric_data if empty_data_point?

if @registered_views.empty?
metric_data << aggregate_metric_data(start_time, end_time)
else
@registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) }
@registered_views.each do |view, data_points|
metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation, data_points: data_points)
end
end

metric_data
end
end

# view will modify the data_point that is not suitable when there are multiple views
def update(value, attributes)
if @registered_views.empty?
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
else
@registered_views.each do |view|
@registered_views.each do |view, data_points|
@mutex.synchronize do
attributes ||= {}
attributes.merge!(view.attribute_keys)
view.aggregation.update(value, attributes, @data_points) if view.valid_aggregation?
view.aggregation.update(value, attributes, data_points) if view.valid_aggregation?
end
end
end
end

def aggregate_metric_data(start_time, end_time, aggregation: nil)
def aggregate_metric_data(start_time, end_time, aggregation: nil, data_points: nil)
aggregator = aggregation || @default_aggregation
is_monotonic = aggregator.respond_to?(:monotonic?) ? aggregator.monotonic? : nil
aggregation_temporality = aggregator.respond_to?(:aggregation_temporality) ? aggregator.aggregation_temporality : nil
data_point = data_points || @data_points

MetricData.new(
@name,
Expand All @@ -81,7 +85,7 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil)
@instrument_kind,
@meter_provider.resource,
@instrumentation_scope,
aggregator.collect(start_time, end_time, @data_points),
aggregator.collect(start_time, end_time, data_point),
aggregation_temporality,
start_time,
end_time,
Expand All @@ -92,7 +96,17 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil)
def find_registered_view
return if @meter_provider.nil?

@meter_provider.registered_views.each { |view| @registered_views << view if view.match_instrument?(self) }
@meter_provider.registered_views.each { |view| @registered_views[view] = {} if view.match_instrument?(self) }
end

def empty_data_point?
if @registered_views.empty?
@data_points.empty?
else
@registered_views.each_value do |data_points|
return false unless data_points.empty?
end
end
end

def to_s
Expand Down
Loading
Loading