Skip to content

Commit db7a4b2

Browse files
committed
Implement telegraf's own full metric type
main reasons behind this: - make adding/removing tags cheap - make adding/removing fields cheap - make parsing cheaper - make parse -> decorate -> write out bytes metric flow much faster Refactor serializer to use byte buffer
1 parent 332f678 commit db7a4b2

40 files changed

+1375
-397
lines changed

agent/accumulator_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/influxdata/telegraf"
12+
"github.com/influxdata/telegraf/metric"
1213

1314
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/require"
@@ -323,7 +324,7 @@ func (tm *TestMetricMaker) MakeMetric(
323324
) telegraf.Metric {
324325
switch mType {
325326
case telegraf.Untyped:
326-
if m, err := telegraf.NewMetric(measurement, tags, fields, t); err == nil {
327+
if m, err := metric.New(measurement, tags, fields, t); err == nil {
327328
return m
328329
}
329330
case telegraf.Counter:

internal/models/makemetric.go

+2-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/influxdata/telegraf"
9+
"github.com/influxdata/telegraf/metric"
910
)
1011

1112
// makemetric is used by both RunningAggregator & RunningInput
@@ -135,16 +136,7 @@ func makemetric(
135136
}
136137
}
137138

138-
var m telegraf.Metric
139-
var err error
140-
switch mType {
141-
case telegraf.Counter:
142-
m, err = telegraf.NewCounterMetric(measurement, tags, fields, t)
143-
case telegraf.Gauge:
144-
m, err = telegraf.NewGaugeMetric(measurement, tags, fields, t)
145-
default:
146-
m, err = telegraf.NewMetric(measurement, tags, fields, t)
147-
}
139+
m, err := metric.New(measurement, tags, fields, t, mType)
148140
if err != nil {
149141
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
150142
return nil

internal/models/running_aggregator.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/influxdata/telegraf"
7+
"github.com/influxdata/telegraf/metric"
78
)
89

910
type RunningAggregator struct {
@@ -90,7 +91,7 @@ func (r *RunningAggregator) Add(in telegraf.Metric) bool {
9091
return false
9192
}
9293

93-
in, _ = telegraf.NewMetric(name, tags, fields, t)
94+
in, _ = metric.New(name, tags, fields, t)
9495
}
9596

9697
r.metrics <- in

internal/models/running_output.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/influxdata/telegraf"
88
"github.com/influxdata/telegraf/internal/buffer"
9+
"github.com/influxdata/telegraf/metric"
910
)
1011

1112
const (
@@ -56,23 +57,23 @@ func NewRunningOutput(
5657

5758
// AddMetric adds a metric to the output. This function can also write cached
5859
// points if FlushBufferWhenFull is true.
59-
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
60+
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
6061
// Filter any tagexclude/taginclude parameters before adding metric
6162
if ro.Config.Filter.IsActive() {
6263
// In order to filter out tags, we need to create a new metric, since
6364
// metrics are immutable once created.
64-
name := metric.Name()
65-
tags := metric.Tags()
66-
fields := metric.Fields()
67-
t := metric.Time()
65+
name := m.Name()
66+
tags := m.Tags()
67+
fields := m.Fields()
68+
t := m.Time()
6869
if ok := ro.Config.Filter.Apply(name, fields, tags); !ok {
6970
return
7071
}
7172
// error is not possible if creating from another metric, so ignore.
72-
metric, _ = telegraf.NewMetric(name, tags, fields, t)
73+
m, _ = metric.New(name, tags, fields, t)
7374
}
7475

75-
ro.metrics.Add(metric)
76+
ro.metrics.Add(m)
7677
if ro.metrics.Len() == ro.MetricBatchSize {
7778
batch := ro.metrics.Batch(ro.MetricBatchSize)
7879
err := ro.write(batch)

metric.go

+27-165
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package telegraf
33
import (
44
"time"
55

6+
// TODO remove
67
"github.com/influxdata/influxdb/client/v2"
7-
"github.com/influxdata/influxdb/models"
88
)
99

1010
// ValueType is an enumeration of metric types that represent a simple value.
@@ -19,178 +19,40 @@ const (
1919
)
2020

2121
type Metric interface {
22-
// Name returns the measurement name of the metric
23-
Name() string
22+
Serialize() []byte
23+
String() string // convenience function for string(Serialize())
24+
Copy() Metric
2425

25-
// Name returns the tags associated with the metric
26-
Tags() map[string]string
26+
// Tag functions
27+
HasTag(key string) bool
28+
AddTag(key, value string)
29+
RemoveTag(key string) bool
2730

28-
// Time return the timestamp for the metric
29-
Time() time.Time
31+
// Field functions
32+
HasField(key string) bool
33+
AddField(key string, value interface{})
34+
RemoveField(key string) bool
3035

31-
// Type returns the metric type. Can be either telegraf.Gauge or telegraf.Counter
32-
Type() ValueType
36+
// Name functions
37+
SetName(name string)
38+
SetPrefix(prefix string)
39+
SetSuffix(suffix string)
3340

34-
// UnixNano returns the unix nano time of the metric
41+
// Getting data structure functions
42+
Name() string
43+
Tags() map[string]string
44+
Fields() map[string]interface{}
45+
Time() time.Time
3546
UnixNano() int64
36-
37-
// HashID returns a non-cryptographic hash of the metric (name + tags)
38-
// NOTE: do not persist & depend on this value to disk.
47+
Type() ValueType
48+
Len() int // returns the length of the serialized metric, including newline
3949
HashID() uint64
4050

41-
// Fields returns the fields for the metric
42-
Fields() map[string]interface{}
43-
44-
// String returns a line-protocol string of the metric
45-
String() string
46-
47-
// PrecisionString returns a line-protocol string of the metric, at precision
48-
PrecisionString(precison string) string
49-
50-
// Point returns a influxdb client.Point object
51-
Point() *client.Point
52-
53-
// SetAggregate sets the metric's aggregate status
54-
// This is so that aggregate metrics don't get re-sent to aggregator plugins
51+
// aggregator things:
5552
SetAggregate(bool)
56-
// IsAggregate returns true if the metric is an aggregate
5753
IsAggregate() bool
5854

59-
// Copy copies the metric
60-
Copy() Metric
61-
}
62-
63-
// metric is a wrapper of the influxdb client.Point struct
64-
type metric struct {
65-
pt models.Point
66-
67-
mType ValueType
68-
69-
isaggregate bool
70-
}
71-
72-
func NewMetricFromPoint(pt models.Point) Metric {
73-
return &metric{
74-
pt: pt,
75-
mType: Untyped,
76-
}
77-
}
78-
79-
// NewMetric returns an untyped metric.
80-
func NewMetric(
81-
name string,
82-
tags map[string]string,
83-
fields map[string]interface{},
84-
t time.Time,
85-
) (Metric, error) {
86-
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
87-
if err != nil {
88-
return nil, err
89-
}
90-
return &metric{
91-
pt: pt,
92-
mType: Untyped,
93-
}, nil
94-
}
95-
96-
// NewGaugeMetric returns a gauge metric.
97-
// Gauge metrics should be used when the metric is can arbitrarily go up and
98-
// down. ie, temperature, memory usage, cpu usage, etc.
99-
func NewGaugeMetric(
100-
name string,
101-
tags map[string]string,
102-
fields map[string]interface{},
103-
t time.Time,
104-
) (Metric, error) {
105-
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
106-
if err != nil {
107-
return nil, err
108-
}
109-
return &metric{
110-
pt: pt,
111-
mType: Gauge,
112-
}, nil
113-
}
114-
115-
// NewCounterMetric returns a Counter metric.
116-
// Counter metrics should be used when the metric being created is an
117-
// always-increasing counter. ie, net bytes received, requests served, errors, etc.
118-
func NewCounterMetric(
119-
name string,
120-
tags map[string]string,
121-
fields map[string]interface{},
122-
t time.Time,
123-
) (Metric, error) {
124-
pt, err := models.NewPoint(name, models.NewTags(tags), fields, t)
125-
if err != nil {
126-
return nil, err
127-
}
128-
return &metric{
129-
pt: pt,
130-
mType: Counter,
131-
}, nil
132-
}
133-
134-
func (m *metric) Name() string {
135-
return m.pt.Name()
136-
}
137-
138-
func (m *metric) Tags() map[string]string {
139-
return m.pt.Tags().Map()
140-
}
141-
142-
func (m *metric) Time() time.Time {
143-
return m.pt.Time()
144-
}
145-
146-
func (m *metric) Type() ValueType {
147-
return m.mType
148-
}
149-
150-
func (m *metric) HashID() uint64 {
151-
return m.pt.HashID()
152-
}
153-
154-
func (m *metric) UnixNano() int64 {
155-
return m.pt.UnixNano()
156-
}
157-
158-
func (m *metric) Fields() map[string]interface{} {
159-
return m.pt.Fields()
160-
}
161-
162-
func (m *metric) String() string {
163-
return m.pt.String()
164-
}
165-
166-
func (m *metric) PrecisionString(precison string) string {
167-
return m.pt.PrecisionString(precison)
168-
}
169-
170-
func (m *metric) Point() *client.Point {
171-
return client.NewPointFrom(m.pt)
172-
}
173-
174-
func (m *metric) IsAggregate() bool {
175-
return m.isaggregate
176-
}
177-
178-
func (m *metric) SetAggregate(b bool) {
179-
m.isaggregate = b
180-
}
181-
182-
func (m *metric) Copy() Metric {
183-
t := time.Time(m.Time())
184-
185-
tags := make(map[string]string)
186-
fields := make(map[string]interface{})
187-
for k, v := range m.Tags() {
188-
tags[k] = v
189-
}
190-
for k, v := range m.Fields() {
191-
fields[k] = v
192-
}
193-
194-
out, _ := NewMetric(m.Name(), tags, fields, t)
195-
return out
55+
// Point returns a influxdb client.Point object
56+
// TODO remove this function
57+
Point() *client.Point
19658
}

metric/inline_strconv_parse.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package metric
2+
3+
import (
4+
"reflect"
5+
"strconv"
6+
"unsafe"
7+
)
8+
9+
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
10+
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
11+
s := unsafeBytesToString(b)
12+
return strconv.ParseInt(s, base, bitSize)
13+
}
14+
15+
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
16+
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
17+
s := unsafeBytesToString(b)
18+
return strconv.ParseFloat(s, bitSize)
19+
}
20+
21+
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
22+
func parseBoolBytes(b []byte) (bool, error) {
23+
return strconv.ParseBool(unsafeBytesToString(b))
24+
}
25+
26+
// unsafeBytesToString converts a []byte to a string without a heap allocation.
27+
//
28+
// It is unsafe, and is intended to prepare input to short-lived functions
29+
// that require strings.
30+
func unsafeBytesToString(in []byte) string {
31+
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
32+
dst := reflect.StringHeader{
33+
Data: src.Data,
34+
Len: src.Len,
35+
}
36+
s := *(*string)(unsafe.Pointer(&dst))
37+
return s
38+
}

0 commit comments

Comments
 (0)