Skip to content

Commit

Permalink
Abort request if data is missing (not all Thanos stores are available) (
Browse files Browse the repository at this point in the history
#89)

* Abort request if not all Thanos stores are available

We got missing data and did not see it since the default is just returning data that can be queried

https://thanos.io/v0.28/components/query.md/#partial-response-strategy

* Make partial response behaviour configurable
  • Loading branch information
bastjan authored Sep 13, 2022
1 parent c7df5fe commit 1c95f61
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
20 changes: 20 additions & 0 deletions pkg/thanos/thanos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package thanos

import (
"net/http"
"strconv"
)

// PartialResponseRoundTripper adds a new RoundTripper to the chain that sets the partial_response query parameter to the value of Allow.
type PartialResponseRoundTripper struct {
http.RoundTripper
Allow bool
}

// RoundTrip implements the RoundTripper interface.
func (t *PartialResponseRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
q := req.URL.Query()
q.Set("partial_response", strconv.FormatBool(t.Allow))
req.URL.RawQuery = q.Encode()
return t.RoundTripper.RoundTrip(req)
}
60 changes: 60 additions & 0 deletions pkg/thanos/thanos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package thanos

import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

func TestPartialResponseRoundTripper_X(t *testing.T) {
testCases := []struct {
url string
allow bool
}{
{
url: "https://thanos.io",
allow: false,
},
{
url: "https://thanos.io?testly=blub",
allow: false,
},
{
url: "https://thanos.io",
allow: true,
},
{
url: "https://thanos.io?testly=blub",
allow: true,
},
}
for _, tC := range testCases {
t.Run(fmt.Sprintf("allow %v, url %s", tC.allow, tC.url), func(t *testing.T) {
origUrl, err := url.Parse(tC.url)
require.NoError(t, err)

rt := PartialResponseRoundTripper{
RoundTripper: roundTripFunc(func(r *http.Request) (*http.Response, error) {
require.Contains(t, r.URL.RawQuery, `partial_response=`+strconv.FormatBool(tC.allow))
require.Contains(t, r.URL.RawQuery, origUrl.RawQuery)
return nil, errors.New("not implemented")
}),
Allow: tC.allow,
}

_, _ = rt.RoundTrip(httptest.NewRequest("GET", tC.url, nil))
})
}
}

type roundTripFunc func(r *http.Request) (*http.Response, error)

func (s roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return s(r)
}
13 changes: 11 additions & 2 deletions report_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/appuio/appuio-cloud-reporting/pkg/db"
"github.com/appuio/appuio-cloud-reporting/pkg/report"
"github.com/appuio/appuio-cloud-reporting/pkg/thanos"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
Expand All @@ -21,6 +22,8 @@ type reportCommand struct {
Begin *time.Time
RepeatUntil *time.Time
PromQueryTimeout time.Duration

ThanosAllowPartialResponses bool
}

var reportCommandName = "report"
Expand All @@ -43,6 +46,8 @@ func newReportCommand() *cli.Command {
EnvVars: envVars("REPEAT_UNTIL"), Layout: time.RFC3339, Required: false},
&cli.DurationFlag{Name: "prom-query-timeout", Usage: "Timeout when querying prometheus (example: 1m)",
EnvVars: envVars("PROM_QUERY_TIMEOUT"), Destination: &command.PromQueryTimeout, Required: false},
&cli.BoolFlag{Name: "thanos-allow-partial-responses", Usage: "Allows partial responses from Thanos. Can be helpful when querying a Thanos cluster with lost data.",
EnvVars: envVars("THANOS_ALLOW_PARTIAL_RESPONSES"), Destination: &command.ThanosAllowPartialResponses, Required: false, DefaultText: "false"},
},
}
}
Expand All @@ -57,7 +62,7 @@ func (cmd *reportCommand) execute(cliCtx *cli.Context) error {
ctx := cliCtx.Context
log := AppLogger(ctx).WithName(reportCommandName)

promClient, err := newPrometheusAPIClient(cmd.PrometheusURL)
promClient, err := newPrometheusAPIClient(cmd.PrometheusURL, cmd.ThanosAllowPartialResponses)
if err != nil {
return fmt.Errorf("could not create prometheus client: %w", err)
}
Expand Down Expand Up @@ -123,9 +128,13 @@ func (cmd *reportCommand) runReport(ctx context.Context, db *sqlx.DB, promClient
return tx.Commit()
}

func newPrometheusAPIClient(promURL string) (apiv1.API, error) {
func newPrometheusAPIClient(promURL string, thanosAllowPartialResponses bool) (apiv1.API, error) {
client, err := api.NewClient(api.Config{
Address: promURL,
RoundTripper: &thanos.PartialResponseRoundTripper{
RoundTripper: api.DefaultRoundTripper,
Allow: thanosAllowPartialResponses,
},
})
return apiv1.NewAPI(client), err
}

0 comments on commit 1c95f61

Please sign in to comment.