diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index aac87817f108c..536310011265f 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" "github.com/influxdata/telegraf/testutil" ) @@ -40,6 +41,13 @@ func TestNewHTTPClient(t *testing.T) { URL: genURL("unix://var/run/influxd.sock"), }, }, + { + cfg: &influxdb.HTTPConfig{ + URL: genURL("unix://var/run/influxd.sock"), + PingTimeout: config.Duration(15 * time.Second), + ReadIdleTimeout: config.Duration(30 * time.Second), + }, + }, } for i := range tests { @@ -56,6 +64,66 @@ func TestNewHTTPClient(t *testing.T) { } } +func TestWrite(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + err := r.ParseForm() + require.NoError(t, err) + require.Equal(t, r.Form["bucket"], []string{"foobar"}) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + require.Contains(t, string(body), "cpu value=42.123") + + w.WriteHeader(http.StatusNoContent) + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + addr := &url.URL{ + Scheme: "http", + Host: ts.Listener.Addr().String(), + } + + cfg := &influxdb.HTTPConfig{ + URL: addr, + Bucket: "telegraf", + BucketTag: "bucket", + ExcludeBucketTag: true, + PingTimeout: config.Duration(15 * time.Second), + ReadIdleTimeout: config.Duration(30 * time.Second), + } + + client, err := influxdb.NewHTTPClient(cfg) + require.NoError(t, err) + + metrics := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "bucket": "foobar", + }, + map[string]interface{}{ + "value": 42.123, + }, + time.Unix(0, 0), + ), + } + + ctx := context.Background() + err = client.Write(ctx, metrics) + require.NoError(t, err) + err = client.Write(ctx, metrics) + require.NoError(t, err) +} + func TestWriteBucketTagWorksOnRetry(t *testing.T) { ts := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -84,14 +152,14 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) { Host: ts.Listener.Addr().String(), } - config := &influxdb.HTTPConfig{ + cfg := &influxdb.HTTPConfig{ URL: addr, Bucket: "telegraf", BucketTag: "bucket", ExcludeBucketTag: true, } - client, err := influxdb.NewHTTPClient(config) + client, err := influxdb.NewHTTPClient(cfg) require.NoError(t, err) metrics := []telegraf.Metric{