-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.go
145 lines (128 loc) · 3.73 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/jkerry/sensu-go-elasticsearch/lib/pkg/eventprocessing"
"github.com/spf13/cobra"
)
var (
index, time_postfix string
dated_postfix, full_event_logging bool
point_name_as_metric_name bool
)
func main() {
rootCmd := configureRootCommand()
if err := rootCmd.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
func configureRootCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "sensu-go-elasticsearch",
Short: "The Sensu Go handler for metric and event logging in elasticsearch\nRequired: Set the ELASTICSEARCH_URL env var with an appropriate connection url (https://user:pass@hostname:port)",
RunE: run,
}
cmd.Flags().BoolVarP(&dated_postfix,
"dated_index",
"d",
false,
"Should the index have the current date postfixed? ie: metric_data-2019-06-27")
cmd.Flags().StringVarP(&time_postfix,
"time_index",
"t",
"",
"uses go date format to generate index postfix: \"-2006.01.02\"")
cmd.Flags().BoolVarP(&full_event_logging,
"full_event_logging",
"f",
false,
"send the full event body instead of isolating event metrics")
cmd.Flags().BoolVarP(&point_name_as_metric_name,
"point_name_as_metric_name",
"p",
false,
"use the entire point name as the metric name")
cmd.Flags().StringVarP(&index,
"index",
"i",
"",
"metric_data")
_ = cmd.MarkFlagRequired("index")
return cmd
}
func generateIndex() string {
dt := time.Now()
if dated_postfix {
return fmt.Sprintf("%s-%s", index, dt.Format("2006.01.02"))
} else if time_postfix != "" {
return fmt.Sprintf("%s%s", index, dt.Format(time_postfix))
}
return index
}
func run(cmd *cobra.Command, args []string) error {
if len(args) != 0 {
_ = cmd.Help()
return fmt.Errorf("invalid argument(s) received")
}
event, err := eventprocessing.GetPipedEvent()
if err != nil {
return fmt.Errorf("Could not process or validate event data from stdin: %v", err)
}
if full_event_logging {
eventValue, err := eventprocessing.ParseEventTimestamp(event)
if err != nil {
return fmt.Errorf("error processing sensu event into eventValue: %v", err)
}
msg, err := json.Marshal(eventValue)
if err != nil {
return fmt.Errorf("error serializing metric data to json payload: %v", err)
}
err = sendElasticSearchData(string(msg), index)
if err != nil {
return fmt.Errorf("error sending metric data to elasticsearch: %v", err)
}
return nil
}
if !event.HasMetrics() {
return fmt.Errorf("event does not contain metrics")
}
for _, point := range event.Metrics.Points {
metric, err := eventprocessing.GetMetricFromPoint(point, event.Entity.Name, event.Entity.Namespace, event.Entity.Labels, point_name_as_metric_name)
if err != nil {
return fmt.Errorf("error processing sensu event MetricPoints into MetricValue: %v", err)
}
msg, err := json.Marshal(metric)
if err != nil {
return fmt.Errorf("error serializing metric data to json payload: %v", err)
}
err = sendElasticSearchData(string(msg), index)
if err != nil {
return fmt.Errorf("error sending metric data to elasticsearch: %v", err)
}
}
return nil
}
func sendElasticSearchData(metricBody string, index string) error {
es, _ := elasticsearch.NewDefaultClient()
req := esapi.IndexRequest{
Index: generateIndex(),
Body: strings.NewReader(metricBody),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es)
if err != nil {
return fmt.Errorf("Error getting response: %s", err)
}
if res.IsError() {
return fmt.Errorf("[%s] Error indexing document ID=%d", res.Status(), 0)
}
return nil
}