Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions plugins/processors/awsentity/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Config struct {
// EntityType determines the type of entity processing done for
// telemetry. Possible values are Service and Resource
EntityType string `mapstructure:"entity_type,omitempty"`
// AttributeAllowList is a list of entity resource attributes to append to the metric
// If this is not specified, the processor will append all entity attributes
AttributeAllowList []string `mapstructure:"attribute_allow_list,omitempty"`
}

// Verify Config implements Processor interface.
Expand Down
50 changes: 25 additions & 25 deletions plugins/processors/awsentity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metr
if p.config.KubernetesMode != "" {
switch p.config.KubernetesMode {
case config.ModeEKS:
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityEKSPlatform)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityEKSPlatform)
default:
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityK8sPlatform)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityPlatformType, entityattributes.AttributeEntityK8sPlatform)
}
} else if p.config.Platform == config.ModeEC2 {
// ec2tagger processor may have picked up the ASG name from an ec2:DescribeTags call
Expand All @@ -166,11 +166,11 @@ func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metr
}
ec2Info = getEC2InfoFromEntityStore()
if ec2Info.GetInstanceID() != EMPTY {
resourceAttrs.PutStr(entityattributes.AttributeEntityType, entityattributes.AttributeEntityAWSResource)
resourceAttrs.PutStr(entityattributes.AttributeEntityResourceType, entityattributes.AttributeEntityEC2InstanceResource)
resourceAttrs.PutStr(entityattributes.AttributeEntityIdentifier, ec2Info.GetInstanceID())
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.AttributeEntityAWSResource)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityResourceType, entityattributes.AttributeEntityEC2InstanceResource)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityIdentifier, ec2Info.GetInstanceID())
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
}
case entityattributes.Service:
if logGroupNamesAttr, ok := resourceAttrs.Get(attributeAwsLogGroupNames); ok {
Expand Down Expand Up @@ -249,22 +249,22 @@ func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metr
InstanceId: ec2Info.GetInstanceID(),
ServiceNameSource: entityServiceNameSource,
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)

if err := validate.Struct(eksAttributes); err == nil {
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
resourceAttrs.PutStr(entityattributes.AttributeEntityCluster, eksAttributes.Cluster)
resourceAttrs.PutStr(entityattributes.AttributeEntityNamespace, eksAttributes.Namespace)
resourceAttrs.PutStr(entityattributes.AttributeEntityWorkload, eksAttributes.Workload)
resourceAttrs.PutStr(entityattributes.AttributeEntityNode, eksAttributes.Node)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityPlatformType, entityPlatformType)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityCluster, eksAttributes.Cluster)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityNamespace, eksAttributes.Namespace)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityWorkload, eksAttributes.Workload)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityNode, eksAttributes.Node)
//Add Instance id attribute only if the application node is same as agent node
if eksAttributes.Node == os.Getenv("K8S_NODE_NAME") {
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, eksAttributes.InstanceId)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, eksAttributes.InstanceId)
}
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, entityServiceNameSource)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, entityServiceNameSource)
}
p.k8sscraper.Reset()
} else if p.config.Platform == config.ModeEC2 {
Expand All @@ -290,21 +290,21 @@ func (p *awsEntityProcessor) processMetrics(ctx context.Context, md pmetric.Metr
}
}

AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityType, entityattributes.Service)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceName, entityServiceName)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityDeploymentEnvironment, entityEnvironmentName)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAwsAccountId, ec2Info.GetAccountID())

ec2Attributes := EC2ServiceAttributes{
InstanceId: ec2Info.GetInstanceID(),
AutoScalingGroup: getAutoScalingGroupFromEntityStore(),
ServiceNameSource: entityServiceNameSource,
}
if err := validate.Struct(ec2Attributes); err == nil {
resourceAttrs.PutStr(entityattributes.AttributeEntityPlatformType, entityPlatformType)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, ec2Attributes.InstanceId)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAutoScalingGroup, ec2Attributes.AutoScalingGroup)
AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, ec2Attributes.ServiceNameSource)
p.PutAttribute(resourceAttrs, entityattributes.AttributeEntityPlatformType, entityPlatformType)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityInstanceID, ec2Attributes.InstanceId)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityAutoScalingGroup, ec2Attributes.AutoScalingGroup)
p.AddAttributeIfNonEmpty(resourceAttrs, entityattributes.AttributeEntityServiceNameSource, ec2Attributes.ServiceNameSource)
}
}
if logGroupNames == EMPTY || (serviceName == EMPTY && environmentName == EMPTY) {
Expand Down
16 changes: 13 additions & 3 deletions plugins/processors/awsentity/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@

package awsentity

import "go.opentelemetry.io/collector/pdata/pcommon"
import (
"slices"

func AddAttributeIfNonEmpty(p pcommon.Map, key string, value string) {
"go.opentelemetry.io/collector/pdata/pcommon"
)

func (p *awsEntityProcessor) PutAttribute(resourceAttributes pcommon.Map, k string, v string) {
if len(p.config.AttributeAllowList) == 0 || slices.Contains(p.config.AttributeAllowList, k) {
resourceAttributes.PutStr(k, v)
}
}

func (p *awsEntityProcessor) AddAttributeIfNonEmpty(resourceAttributes pcommon.Map, key string, value string) {
if value != "" {
p.PutStr(key, value)
p.PutAttribute(resourceAttributes, key, value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,11 @@ processors:
kubernetes_mode: EKS
platform: ec2
awsentity/resource/containerinsights:
cluster_name: TestCluster
entity_type: Resource
kubernetes_mode: EKS
platform: ec2
cluster_name: TestCluster
entity_type: Resource
kubernetes_mode: EKS
platform: ec2
attribute_allow_list: ["com.amazonaws.cloudwatch.entity.internal.platform.type"]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ processors:
entity_type: Resource
kubernetes_mode: K8sEC2
platform: ec2
attribute_allow_list: ["com.amazonaws.cloudwatch.entity.internal.platform.type"]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ processors:
entity_type: Resource
kubernetes_mode: EKS
platform: ec2
attribute_allow_list: ["com.amazonaws.cloudwatch.entity.internal.platform.type"]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ processors:
entity_type: Resource
kubernetes_mode: EKS
platform: ec2
attribute_allow_list: ["com.amazonaws.cloudwatch.entity.internal.platform.type"]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: ec2
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: ec2
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: onPremise
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: onPremise
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: onPremise
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: onPremise
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: ec2
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ processors:
awsentity/resource/containerinsights:
entity_type: Resource
platform: ec2
attribute_allow_list: [ "com.amazonaws.cloudwatch.entity.internal.platform.type" ]
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"

"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/entityattributes"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsemf"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
Expand All @@ -31,6 +32,10 @@ var (
baseKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey)
eksKey = common.ConfigKey(baseKey, common.KubernetesKey)
ecsKey = common.ConfigKey(baseKey, common.ECSKey)

entityAllowList = []string{
entityattributes.AttributeEntityPlatformType,
}
)

type translator struct {
Expand Down Expand Up @@ -78,7 +83,7 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
switch t.pipelineName {
case ciPipelineName:
if conf.IsSet(eksKey) {
processors.Set(awsentity.NewTranslatorWithEntityType(awsentity.Resource, common.PipelineNameContainerInsights, false))
processors.Set(awsentity.NewTranslatorWithEntityTypeAndAllowList(awsentity.Resource, common.PipelineNameContainerInsights, false, entityAllowList))
}
// add aws container insights receiver
receivers = common.NewTranslatorMap(awscontainerinsight.NewTranslator())
Expand Down
19 changes: 19 additions & 0 deletions translator/translate/otel/processor/awsentity/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type translator struct {
entityType string
name string
scrapeDatapointAttribute bool
attributeAllowList []string
}

func NewTranslator() common.ComponentTranslator {
Expand All @@ -49,6 +50,21 @@ func NewTranslatorWithEntityType(entityType string, name string, scrapeDatapoint
}
}

func NewTranslatorWithEntityTypeAndAllowList(entityType string, name string, scrapeDatapointAttribute bool, allowList []string) common.ComponentTranslator {
pipelineName := strings.ToLower(entityType)
if name != "" {
pipelineName = pipelineName + "/" + name
}

return &translator{
factory: awsentity.NewFactory(),
entityType: entityType,
name: pipelineName,
scrapeDatapointAttribute: scrapeDatapointAttribute,
attributeAllowList: allowList,
}
}

func (t *translator) ID() component.ID {
return component.NewIDWithName(t.factory.Type(), t.name)
}
Expand Down Expand Up @@ -88,5 +104,8 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
// processor can perform different logics for EKS
// in EC2 or Non-EC2
cfg.Platform = ctx.Mode()

cfg.AttributeAllowList = t.attributeAllowList

return cfg, nil
}
Loading