Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
ch-stark authored Jan 10, 2025
2 parents 2830441 + 38aff21 commit de8cd4b
Show file tree
Hide file tree
Showing 54 changed files with 11,769 additions and 9,121 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion cicd-scripts/customize-mco.sh
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ get_changed_components() {
get_ginkgo_focus() {
if [[ -n ${IS_KIND_ENV} ]]; then
# For KinD cluster, do not need to run all test cases
GINKGO_FOCUS=" --focus manifestwork/g0 --focus endpoint_preserve/g0 --focus grafana/g0 --focus metrics/g0 --focus addon/g0 --focus alert/g0 --focus dashboard/g0"
# and we skip those that explictly requires OCP
GINKGO_FOCUS=" --focus manifestwork/g0 --focus endpoint_preserve/g0 --focus grafana/g0 --focus metrics/g0 --focus addon/g0 --focus alert/g0 --focus dashboard/g0 --skip requires-ocp/g0"
else
GINKGO_FOCUS=""
fi
Expand Down
2 changes: 1 addition & 1 deletion collectors/metrics/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY ./operators/pkg ./operators/pkg
COPY ./operators/multiclusterobservability/api ./operators/multiclusterobservability/api
RUN go build -v -o metrics-collector ./collectors/metrics/cmd/metrics-collector/main.go

FROM alpine:3.18 AS runner
FROM alpine:3.21 AS runner

USER 1001:1001

Expand Down
25 changes: 15 additions & 10 deletions collectors/metrics/cmd/metrics-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1alpha1"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1beta1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/spf13/cobra"
Expand All @@ -38,15 +38,16 @@ import (

func main() {
opt := &Options{
From: "http://localhost:9090",
Listen: "localhost:9002",
LimitBytes: 200 * 1024,
Matchers: []string{`{__name__="up"}`},
Interval: 4*time.Minute + 30*time.Second,
EvaluateInterval: 30 * time.Second,
WorkerNum: 1,
DisableHyperShift: false,
DisableStatusReporting: false,
From: "http://localhost:9090",
Listen: "localhost:9002",
LimitBytes: 200 * 1024,
Matchers: []string{`{__name__="up"}`},
Interval: 4*time.Minute + 30*time.Second,
EvaluateInterval: 30 * time.Second,
WorkerNum: 1,
DisableHyperShift: false,
DisableStatusReporting: false,
SimulatedTimeseriesFile: "",
}
cmd := &cobra.Command{
Short: "Remote write federated metrics from prometheus",
Expand Down Expand Up @@ -722,6 +723,10 @@ func initShardedConfigs(o *Options, agent Agent) ([]*forwarder.Config, error) {
}

func runMultiWorkers(o *Options, cfg *forwarder.Config) error {
if o.WorkerNum > 1 && o.SimulatedTimeseriesFile == "" {
return nil
}

for i := 1; i < int(o.WorkerNum); i++ {
opt := &Options{
From: o.From,
Expand Down
100 changes: 100 additions & 0 deletions collectors/metrics/cmd/metrics-collector/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"fmt"
stdlog "log"
"os"
"testing"
Expand Down Expand Up @@ -65,3 +66,102 @@ func TestMultiWorkers(t *testing.T) {
time.Sleep(1 * time.Second)

}

func TestSplitMatchersIntoShards(t *testing.T) {
tests := []struct {
name string
matchers []string
shardCount int
want [][]string
}{
{
name: "single shard",
matchers: []string{"match1", "match2", "match3"},
shardCount: 1,
want: [][]string{{"match1", "match2", "match3"}},
},
{
name: "two shards",
matchers: []string{"match1", "match2", "match3", "match4"},
shardCount: 2,
want: [][]string{
{"match1", "match3"},
{"match2", "match4"},
},
},
// This case should not happen and is restricted by CLI option validation.
{
name: "two shards",
matchers: []string{"match1", "match2", "match3", "match4"},
shardCount: 6,
want: [][]string{
{"match1"},
{"match2"},
{"match3"},
{"match4"},
{},
{},
},
},
{
name: "three shards",
matchers: []string{"match1", "match2", "match3", "match4", "match5"},
shardCount: 3,
want: [][]string{
{"match1", "match4"},
{"match2", "match5"},
{"match3"},
},
},
{
name: "more shards than matchers",
matchers: []string{"match1", "match2"},
shardCount: 3,
want: [][]string{
{"match1"},
{"match2"},
{},
},
},
{
name: "zero shards",
matchers: []string{"match1", "match2", "match3"},
shardCount: 0,
want: [][]string{{"match1", "match2", "match3"}},
},
{
name: "negative shards",
matchers: []string{"match1", "match2", "match3"},
shardCount: -1,
want: [][]string{{"match1", "match2", "match3"}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := splitMatchersIntoShards(tt.matchers, tt.shardCount)
fmt.Println(got)
// Check if number of shards matches
if len(got) != len(tt.want) {
t.Errorf("splitMatchersIntoShards() got %d shards, want %d shards",
len(got), len(tt.want))
return
}

// Check if each shard contains the expected matchers
for i := 0; i < len(got); i++ {
if len(got[i]) != len(tt.want[i]) {
t.Errorf("shard %d: got %d matchers, want %d matchers",
i, len(got[i]), len(tt.want[i]))
continue
}
for j := 0; j < len(got[i]); j++ {
if got[i][j] != tt.want[i][j] {
t.Errorf("shard %d matcher %d: got %s, want %s",
i, j, got[i][j], tt.want[i][j])
}
}
}
})
}
}
5 changes: 5 additions & 0 deletions collectors/metrics/pkg/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func (w *Worker) LastMetrics() []*clientmodel.MetricFamily {
}

func (w *Worker) Run(ctx context.Context) {
// Forward metrics immediately on startup.
if err := w.forward(ctx); err != nil {
rlogger.Log(w.logger, rlogger.Error, "msg", "unable to forward results", "err", err)
}

ticker := time.NewTicker(w.interval)
defer ticker.Stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"

"github.com/go-kit/log"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1alpha1"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1beta1"
prom "github.com/prometheus/client_model/go"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"

"github.com/go-kit/log"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1alpha1"
hyperv1 "github.com/openshift/hypershift/api/hypershift/v1beta1"
prom "github.com/prometheus/client_model/go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
Expand Down
62 changes: 45 additions & 17 deletions collectors/metrics/pkg/metricsclient/metricsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -503,12 +504,13 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
}
b.MaxElapsedTime = interval / time.Duration(halfInterval)
retryable := func() error {
return c.sendRequest(req.URL.String(), compressed)
return c.sendRequest(ctx, req.URL.String(), compressed)
}
notify := func(err error, t time.Duration) {
msg := fmt.Sprintf("error: %v happened at time: %v", err, t)
logger.Log(c.logger, logger.Warn, "msg", msg)
}

err = backoff.RetryNotify(retryable, b, notify)
if err != nil {
return err
Expand All @@ -518,43 +520,69 @@ func (c *Client) RemoteWrite(ctx context.Context, req *http.Request,
return nil
}

func (c *Client) sendRequest(serverURL string, body []byte) error {
func (c *Client) sendRequest(ctx context.Context, serverURL string, body []byte) error {
req1, err := http.NewRequest(http.MethodPost, serverURL, bytes.NewBuffer(body))
if err != nil {
msg := "failed to create forwarding request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
wrappedErr := fmt.Errorf("failed to create forwarding request: %w", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)
return backoff.Permanent(wrappedErr)
}

// req.Header.Add("THANOS-TENANT", tenantID)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

req1 = req1.WithContext(ctx)

resp, err := c.client.Do(req1)
if err != nil {
msg := "failed to forward request"
logger.Log(c.logger, logger.Warn, "msg", msg, "err", err)
c.metrics.ForwardRemoteWriteRequests.WithLabelValues("0").Inc()
return errors.New(msg)

wrappedErr := fmt.Errorf("failed to forward request: %w", err)
if isTransientError(err) {
return wrappedErr
}

return backoff.Permanent(wrappedErr)
}

c.metrics.ForwardRemoteWriteRequests.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()

if resp.StatusCode/100 != 2 {
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// surfacing upstreams error to our users too
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Log(c.logger, logger.Warn, err)
}
bodyString := string(bodyBytes)
msg := fmt.Sprintf("response status code is %s, response body is %s", resp.Status, bodyString)
logger.Log(c.logger, logger.Warn, msg)
return errors.New(msg)

retErr := fmt.Errorf("response status code is %s, response body is %s", resp.Status, string(bodyBytes))

if isTransientResponseError(resp) {
return retErr
}

return backoff.Permanent(retErr)
}

return nil
}

func isTransientError(err error) bool {
if urlErr, ok := err.(*url.Error); ok {
return urlErr.Timeout()
}

return false
}

func isTransientResponseError(resp *http.Response) bool {
if resp.StatusCode >= 500 && resp.StatusCode != http.StatusNotImplemented {
return true
}

if resp.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
Loading

0 comments on commit de8cd4b

Please sign in to comment.