-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathpod_association.go
150 lines (134 loc) · 4.81 KB
/
pod_association.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8sprocessor
import (
"context"
"errors"
"fmt"
"net"
"strings"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.18.0"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor/kube"
)
// extractPodIds extracts IP and pod UID from attributes or request context.
// It returns a value pair containing configured label and IP Address and/or Pod UID.
// If empty value in return it means that attributes does not contains configured label to match resources for Pod.
func extractPodID(
ctx context.Context,
attrs pcommon.Map,
associations []kube.Association,
) (podIdentifierKey string, podIdentifierValue kube.PodIdentifier, returnErr error) {
connectionIP := getConnectionIP(ctx)
hostname := stringAttributeFromMap(attrs, conventions.AttributeHostName)
// If pod association is not set
if len(associations) == 0 {
var podIP, labelIP kube.PodIdentifier
podIP = kube.PodIdentifier(stringAttributeFromMap(attrs, k8sIPLabelName))
labelIP = kube.PodIdentifier(stringAttributeFromMap(attrs, clientIPLabelName))
podIdentifierKey = k8sIPLabelName
if podIP != "" {
podIdentifierValue = podIP
return
} else if labelIP != "" {
podIdentifierValue = labelIP
return
} else if connectionIP != "" {
podIdentifierValue = connectionIP
return
} else if net.ParseIP(hostname) != nil {
podIdentifierValue = kube.PodIdentifier(hostname)
return
}
return "", kube.PodIdentifier(""), errors.New("pod association not set, could not assign other pod id")
}
for _, asso := range associations {
switch {
// If association configured to take IP address from connection
case asso.From == "connection" && connectionIP != "":
podIdentifierKey = k8sIPLabelName
podIdentifierValue = connectionIP
return
case asso.From == "resource_attribute": // If association configured by resource_attribute
// In k8s environment, host.name label set to a pod IP address.
// If the value doesn't represent an IP address, we skip it.
if asso.Name == conventions.AttributeHostName {
if net.ParseIP(hostname) != nil {
podIdentifierKey = k8sIPLabelName
podIdentifierValue = kube.PodIdentifier(hostname)
return
}
} else {
// Extract values based on configured resource_attribute.
// Value should be a pod ip, pod uid or `pod_name.namespace_name`
attributeValue := stringAttributeFromMap(attrs, asso.Name)
if attributeValue != "" {
podIdentifierKey = asso.Name
podIdentifierValue = kube.PodIdentifier(attributeValue)
return
}
}
case asso.From == "build_hostname":
// Build hostname from pod k8s.pod.name and k8s.namespace.name attributes
pod, ok := attrs.Get(conventions.AttributeK8SPodName)
if !ok {
return "", kube.PodIdentifier(""), errors.New("pod name not found in attributes")
}
namespace, ok := attrs.Get(conventions.AttributeK8SNamespaceName)
if !ok {
return "", kube.PodIdentifier(""), errors.New("namespace name not found in attributes")
}
if pod.Str() == "" || namespace.Str() != "" {
podIdentifierKey = asso.Name
podIdentifierValue = kube.PodIdentifier(fmt.Sprintf("%s.%s", pod.Str(), namespace.Str()))
return
}
}
}
return "", kube.PodIdentifier(""), errors.New("could not assign pod id basing on associations")
}
func getConnectionIP(ctx context.Context) kube.PodIdentifier {
c := client.FromContext(ctx)
if c.Addr == nil {
return ""
}
switch addr := c.Addr.(type) {
case *net.UDPAddr:
return kube.PodIdentifier(addr.IP.String())
case *net.TCPAddr:
return kube.PodIdentifier(addr.IP.String())
case *net.IPAddr:
return kube.PodIdentifier(addr.IP.String())
}
// If this is not a known address type, check for known "untyped" formats.
// 1.1.1.1:<port>
lastColonIndex := strings.LastIndex(c.Addr.String(), ":")
if lastColonIndex != -1 {
ipString := c.Addr.String()[:lastColonIndex]
ip := net.ParseIP(ipString)
if ip != nil {
return kube.PodIdentifier(ip.String())
}
}
return kube.PodIdentifier(c.Addr.String())
}
func stringAttributeFromMap(attrs pcommon.Map, key string) string {
if val, ok := attrs.Get(key); ok {
if val.Type() == pcommon.ValueTypeStr {
return val.Str()
}
}
return ""
}