Skip to content

Commit 8407617

Browse files
Batch metrics sending to GCP (#3)
1 parent f957fd3 commit 8407617

File tree

1 file changed

+61
-17
lines changed

1 file changed

+61
-17
lines changed

server/middleware/metric_middleware.go

+61-17
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,19 @@ import (
1818
"google.golang.org/protobuf/types/known/timestamppb"
1919
)
2020

21-
const MetricTypePrefix = "custom.googleapis.com/comfy_api_frontend"
21+
const (
22+
MetricTypePrefix = "custom.googleapis.com/comfy_api_frontend"
23+
batchInterval = 30 * time.Second // Batch interval for sending metrics
24+
)
25+
26+
var (
27+
environment = os.Getenv("DRIP_ENV")
28+
metricsCh = make(chan *monitoringpb.TimeSeries, 1000)
29+
)
2230

23-
var environment = os.Getenv("DRIP_ENV")
31+
func init() {
32+
go processMetricsBatch()
33+
}
2434

2535
// MetricsMiddleware creates a middleware to capture and send metrics for HTTP requests.
2636
func MetricsMiddleware(client *monitoring.MetricClient, config *config.Config) echo.MiddlewareFunc {
@@ -32,7 +42,7 @@ func MetricsMiddleware(client *monitoring.MetricClient, config *config.Config) e
3242

3343
// Generate metrics for the request duration, count, and errors.
3444
if config.DripEnv != "localdev" {
35-
sendMetrics(c.Request().Context(), client, config,
45+
enqueueMetrics(
3646
createDurationMetric(c, startTime, endTime),
3747
createRequestMetric(c),
3848
createErrorMetric(c, err),
@@ -79,26 +89,60 @@ func (e EndpointMetricKey) toLabels() map[string]string {
7989
}
8090
}
8191

82-
// sendMetrics sends a batch of time series data to Cloud Monitoring.
83-
func sendMetrics(
84-
ctx context.Context,
85-
client *monitoring.MetricClient,
86-
config *config.Config,
87-
series ...*monitoringpb.TimeSeries,
88-
) {
89-
req := &monitoringpb.CreateTimeSeriesRequest{
90-
Name: "projects/" + config.ProjectID,
91-
TimeSeries: make([]*monitoringpb.TimeSeries, 0, len(series)),
92-
}
93-
92+
func enqueueMetrics(series ...*monitoringpb.TimeSeries) {
9493
for _, s := range series {
9594
if s != nil {
96-
req.TimeSeries = append(req.TimeSeries, s)
95+
metricsCh <- s
96+
}
97+
}
98+
}
99+
100+
func processMetricsBatch() {
101+
ticker := time.NewTicker(batchInterval)
102+
for range ticker.C {
103+
sendBatchedMetrics()
104+
}
105+
}
106+
107+
func sendBatchedMetrics() {
108+
var series []*monitoringpb.TimeSeries
109+
for {
110+
select {
111+
case s := <-metricsCh:
112+
series = append(series, s)
113+
if len(series) >= 1000 {
114+
sendMetrics(series)
115+
series = nil
116+
}
117+
default:
118+
if len(series) > 0 {
119+
sendMetrics(series)
120+
}
121+
return
97122
}
98123
}
124+
}
125+
126+
func sendMetrics(series []*monitoringpb.TimeSeries) {
127+
if len(series) == 0 {
128+
return
129+
}
130+
131+
ctx := context.Background()
132+
client, err := monitoring.NewMetricClient(ctx)
133+
if err != nil {
134+
log.Error().Err(err).Msg("Failed to create metric client")
135+
return
136+
}
137+
defer client.Close()
138+
139+
req := &monitoringpb.CreateTimeSeriesRequest{
140+
Name: "projects/" + os.Getenv("PROJECT_ID"),
141+
TimeSeries: series,
142+
}
99143

100144
if err := client.CreateTimeSeries(ctx, req); err != nil {
101-
log.Ctx(ctx).Error().Err(err).Msg("Failed to create time series")
145+
log.Error().Err(err).Msg("Failed to create time series")
102146
}
103147
}
104148

0 commit comments

Comments
 (0)