From c7c7130aeef46ed521f843cd2dfe18d127aa1579 Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Thu, 2 Jan 2025 13:03:34 +0100 Subject: [PATCH 01/16] fix(backup_test): add missing 'Integration' suffix to tests (#4176) * fix(backup_test): add missing 'Integration' suffix to tests Some tests were missing the Integration suffix in their names. This resulted in not including them in the 'make pkg-integration-test' command used when running tests on gh actions. * refactor(testutils): export CheckAnyConstraint It is also useful for backup svc tests. * fix(backup_test): skip TestBackupSkipSchemaIntegration for older Scylla versions --- .../backup/service_backup_integration_test.go | 9 ++++-- .../service_deduplicate_integration_test.go | 3 +- .../restore/helper_integration_test.go | 18 ----------- .../restore/restore_integration_test.go | 6 ++-- .../service_restore_integration_test.go | 14 ++++---- pkg/testutils/version.go | 32 +++++++++++++++++++ 6 files changed, 50 insertions(+), 32 deletions(-) create mode 100644 pkg/testutils/version.go diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index e9146dca0..8cd5d4485 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -2462,7 +2462,7 @@ func TestBackupAlternatorIntegration(t *testing.T) { } } -func TestBackupViews(t *testing.T) { +func TestBackupViewsIntegration(t *testing.T) { const ( testBucket = "backuptest-views" testKeyspace = "backuptest_views" @@ -2547,7 +2547,7 @@ func TestBackupViews(t *testing.T) { } } -func TestBackupSkipSchema(t *testing.T) { +func TestBackupSkipSchemaIntegration(t *testing.T) { const ( testBucket = "backuptest-skip-schema" testKeyspace = "backuptest_skip_schema" @@ -2562,6 +2562,11 @@ func TestBackupSkipSchema(t *testing.T) { clusterSession = CreateSessionAndDropAllKeyspaces(t, h.Client) ) + if CheckAnyConstraint(h.T, h.Client, "< 6.0", "< 2024.2, > 1000") { + t.Skip("CQL credentials are not needed for the backup with this Scylla version, " + + "so the --skip-schema flag is not needed there") + } + Print("And: simple table to back up") WriteData(t, clusterSession, testKeyspace, 1) diff --git a/pkg/service/backup/service_deduplicate_integration_test.go b/pkg/service/backup/service_deduplicate_integration_test.go index 80abc9e73..ed3ffedb9 100644 --- a/pkg/service/backup/service_deduplicate_integration_test.go +++ b/pkg/service/backup/service_deduplicate_integration_test.go @@ -19,7 +19,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) -func TestBackupPauseResumeOnDeduplicationStage(t *testing.T) { +func TestBackupPauseResumeOnDeduplicationStageIntegration(t *testing.T) { const ( testBucket = "backuptest-deduplication" testKeyspace = "backuptest_deduplication" @@ -160,5 +160,4 @@ func TestBackupPauseResumeOnDeduplicationStage(t *testing.T) { t.Fatalf("Expected uploaded 0 bytes on delta 0 backup, but was %v", totalUploaded) } }() - } diff --git a/pkg/service/restore/helper_integration_test.go b/pkg/service/restore/helper_integration_test.go index 648e86933..6a032eb5b 100644 --- a/pkg/service/restore/helper_integration_test.go +++ b/pkg/service/restore/helper_integration_test.go @@ -20,7 +20,6 @@ import ( "github.com/scylladb/gocqlx/v2" "github.com/scylladb/gocqlx/v2/qb" "github.com/scylladb/scylla-manager/v3/pkg/service/cluster" - "github.com/scylladb/scylla-manager/v3/pkg/util/version" "go.uber.org/zap/zapcore" "github.com/scylladb/scylla-manager/v3/pkg/metrics" @@ -465,23 +464,6 @@ func validateCompleteProgress(t *testing.T, pr Progress, tables []table) { } } -func checkAnyConstraint(t *testing.T, client *scyllaclient.Client, constraints ...string) bool { - ni, err := client.AnyNodeInfo(context.Background()) - if err != nil { - t.Fatal(err) - } - for _, c := range constraints { - ok, err := version.CheckConstraint(ni.ScyllaVersion, c) - if err != nil { - t.Fatal(err) - } - if ok { - return true - } - } - return false -} - func createTable(t *testing.T, session gocqlx.Session, keyspace string, tables ...string) { for _, tab := range tables { ExecStmt(t, session, fmt.Sprintf("CREATE TABLE %q.%q (id int PRIMARY KEY, data int)", keyspace, tab)) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index f0d37eaa4..90e584c4b 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -36,7 +36,7 @@ import ( func TestRestoreTablesUserIntegration(t *testing.T) { h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) - if checkAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if CheckAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { t.Skip("Auth restore is not supported in Scylla 6.0. It requires core side support that is aimed at 6.1 release") } @@ -123,7 +123,7 @@ func TestRestoreSchemaRoundtripIntegration(t *testing.T) { h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) hRev := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) - if !checkAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if !CheckAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { t.Skip("This test assumes that schema is backed up and restored via DESCRIBE SCHEMA WITH INTERNALS") } @@ -237,7 +237,7 @@ func TestRestoreSchemaRoundtripIntegration(t *testing.T) { func TestRestoreSchemaDropAddColumnIntegration(t *testing.T) { h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) - if !checkAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if !CheckAnyConstraint(t, h.dstCluster.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { t.Skip("This test is the reason why SM needs to restore schema by DESCRIBE SCHEMA WITH INTERNALS") } diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 6b3b6b5c4..5aeaebe3f 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -308,7 +308,7 @@ func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) { } var ignoreTarget []string - if checkAnyConstraint(t, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if CheckAnyConstraint(t, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { ignoreTarget = []string{ "!system_auth.*", "!system_distributed.service_levels", @@ -325,10 +325,10 @@ func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) { } var ignoreUnits []string - if checkAnyConstraint(t, h.Client, "< 1000") { + if CheckAnyConstraint(t, h.Client, "< 1000") { ignoreUnits = append(ignoreUnits, "system_replicated_keys") } - if checkAnyConstraint(t, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if CheckAnyConstraint(t, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { ignoreUnits = append(ignoreUnits, "system_auth", "service_levels", @@ -1407,7 +1407,7 @@ func restoreAllTables(t *testing.T, schemaTarget, tablesTarget Target, keyspace {ks: "system_traces", tab: "sessions"}, {ks: "system_traces", tab: "sessions_time_idx"}, } - if !checkAnyConstraint(t, dstH.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if !CheckAnyConstraint(t, dstH.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { toValidate = append(toValidate, table{ks: "system_auth", tab: "role_attributes"}, table{ks: "system_auth", tab: "role_members"}, @@ -1480,7 +1480,7 @@ func restoreAlternator(t *testing.T, schemaTarget, tablesTarget Target, testKeys ) dstH.shouldSkipTest(schemaTarget, tablesTarget) - if checkAnyConstraint(t, dstH.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if CheckAnyConstraint(t, dstH.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { t.Skip("See https://github.com/scylladb/scylladb/issues/19112") } @@ -1531,7 +1531,7 @@ func (h *restoreTestHelper) validateRestoreSuccess(dstSession, srcSession gocqlx Print("Then: validate restore result") if target.RestoreSchema { - if !checkAnyConstraint(h.T, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if !CheckAnyConstraint(h.T, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { // Schema restart is required only for older Scylla versions h.restartScylla() } @@ -1828,7 +1828,7 @@ func getBucketKeyspaceUser(t *testing.T) (string, string, string) { func (h *restoreTestHelper) shouldSkipTest(targets ...Target) { for _, target := range targets { if target.RestoreSchema { - if err := IsRestoreSchemaFromSSTablesSupported(context.Background(), h.Client); err != nil && !checkAnyConstraint(h.T, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { + if err := IsRestoreSchemaFromSSTablesSupported(context.Background(), h.Client); err != nil && !CheckAnyConstraint(h.T, h.Client, ">= 6.0, < 2000", ">= 2024.2, > 1000") { h.T.Skip(err) } } diff --git a/pkg/testutils/version.go b/pkg/testutils/version.go new file mode 100644 index 000000000..d9d0843c7 --- /dev/null +++ b/pkg/testutils/version.go @@ -0,0 +1,32 @@ +// Copyright (C) 2024 ScyllaDB + +package testutils + +import ( + "context" + "testing" + + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" + "github.com/scylladb/scylla-manager/v3/pkg/util/version" +) + +// CheckAnyConstraint checks if any of the passed version constraints are satisfied. +// Can be used for skipping tests which make sense only for certain Scylla versions. +func CheckAnyConstraint(t *testing.T, client *scyllaclient.Client, constraints ...string) bool { + t.Helper() + + ni, err := client.AnyNodeInfo(context.Background()) + if err != nil { + t.Fatal(err) + } + for _, c := range constraints { + ok, err := version.CheckConstraint(ni.ScyllaVersion, c) + if err != nil { + t.Fatal(err) + } + if ok { + return true + } + } + return false +} From ef3b968167d0ea32f69218aab695c6c47162e775 Mon Sep 17 00:00:00 2001 From: Vasil Averyanau Date: Fri, 3 Jan 2025 09:26:19 +0100 Subject: [PATCH 02/16] feat(swagger): adds /cloud/metadata to agent api definition. (#4186) This adds /cloud/metadata api call to agent which should return cloud instance metadata, such as instance_type and cloud_provider. Refs: #4130 --- v3/swagger/agent.json | 49 +++++++ .../client/operations/metadata_parameters.go | 113 +++++++++++++++ .../client/operations/metadata_responses.go | 135 ++++++++++++++++++ .../client/operations/operations_client.go | 37 +++++ .../gen/agent/models/instance_metadata.go | 111 ++++++++++++++ 5 files changed, 445 insertions(+) create mode 100644 v3/swagger/gen/agent/client/operations/metadata_parameters.go create mode 100644 v3/swagger/gen/agent/client/operations/metadata_responses.go create mode 100644 v3/swagger/gen/agent/models/instance_metadata.go diff --git a/v3/swagger/agent.json b/v3/swagger/agent.json index 028871831..af87e1652 100644 --- a/v3/swagger/agent.json +++ b/v3/swagger/agent.json @@ -966,6 +966,34 @@ }, "security": [] } + }, + "/cloud/metadata": { + "get": { + "description": "Collect instance metadata on a node", + "summary": "Return instance metadata", + "operationId": "Metadata", + "produces": [ + "application/json" + ], + "parameters": [], + "responses": { + "200": { + "description": "Instance metadata", + "schema": { + "$ref": "#/definitions/InstanceMetadata" + }, + "headers": {} + }, + "default": { + "description": "Server error", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "headers": {} + } + }, + "security": [] + } } }, "definitions": { @@ -1694,6 +1722,27 @@ "format": "uint64" } } + }, + "InstanceMetadata": { + "title": "instance metadata", + "description": "Information about instance on which agent is running", + "type": "object", + "properties": { + "instance_type": { + "description": "Cloud machine type, e.g. Standard-A3", + "type": "string" + }, + "cloud_provider": { + "description": "Cloud provider", + "type": "string", + "enum": [ + "aws", + "gcp", + "azure", + "" + ] + } + } } }, "tags": [] diff --git a/v3/swagger/gen/agent/client/operations/metadata_parameters.go b/v3/swagger/gen/agent/client/operations/metadata_parameters.go new file mode 100644 index 000000000..aede911a5 --- /dev/null +++ b/v3/swagger/gen/agent/client/operations/metadata_parameters.go @@ -0,0 +1,113 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "net/http" + "time" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime" + cr "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" +) + +// NewMetadataParams creates a new MetadataParams object +// with the default values initialized. +func NewMetadataParams() *MetadataParams { + + return &MetadataParams{ + + timeout: cr.DefaultTimeout, + } +} + +// NewMetadataParamsWithTimeout creates a new MetadataParams object +// with the default values initialized, and the ability to set a timeout on a request +func NewMetadataParamsWithTimeout(timeout time.Duration) *MetadataParams { + + return &MetadataParams{ + + timeout: timeout, + } +} + +// NewMetadataParamsWithContext creates a new MetadataParams object +// with the default values initialized, and the ability to set a context for a request +func NewMetadataParamsWithContext(ctx context.Context) *MetadataParams { + + return &MetadataParams{ + + Context: ctx, + } +} + +// NewMetadataParamsWithHTTPClient creates a new MetadataParams object +// with the default values initialized, and the ability to set a custom HTTPClient for a request +func NewMetadataParamsWithHTTPClient(client *http.Client) *MetadataParams { + + return &MetadataParams{ + HTTPClient: client, + } +} + +/* +MetadataParams contains all the parameters to send to the API endpoint +for the metadata operation typically these are written to a http.Request +*/ +type MetadataParams struct { + timeout time.Duration + Context context.Context + HTTPClient *http.Client +} + +// WithTimeout adds the timeout to the metadata params +func (o *MetadataParams) WithTimeout(timeout time.Duration) *MetadataParams { + o.SetTimeout(timeout) + return o +} + +// SetTimeout adds the timeout to the metadata params +func (o *MetadataParams) SetTimeout(timeout time.Duration) { + o.timeout = timeout +} + +// WithContext adds the context to the metadata params +func (o *MetadataParams) WithContext(ctx context.Context) *MetadataParams { + o.SetContext(ctx) + return o +} + +// SetContext adds the context to the metadata params +func (o *MetadataParams) SetContext(ctx context.Context) { + o.Context = ctx +} + +// WithHTTPClient adds the HTTPClient to the metadata params +func (o *MetadataParams) WithHTTPClient(client *http.Client) *MetadataParams { + o.SetHTTPClient(client) + return o +} + +// SetHTTPClient adds the HTTPClient to the metadata params +func (o *MetadataParams) SetHTTPClient(client *http.Client) { + o.HTTPClient = client +} + +// WriteToRequest writes these params to a swagger request +func (o *MetadataParams) WriteToRequest(r runtime.ClientRequest, reg strfmt.Registry) error { + + if err := r.SetTimeout(o.timeout); err != nil { + return err + } + var res []error + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/v3/swagger/gen/agent/client/operations/metadata_responses.go b/v3/swagger/gen/agent/client/operations/metadata_responses.go new file mode 100644 index 000000000..31089cc9e --- /dev/null +++ b/v3/swagger/gen/agent/client/operations/metadata_responses.go @@ -0,0 +1,135 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "fmt" + "io" + "strconv" + "strings" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/strfmt" + + "github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models" +) + +// MetadataReader is a Reader for the Metadata structure. +type MetadataReader struct { + formats strfmt.Registry +} + +// ReadResponse reads a server response into the received o. +func (o *MetadataReader) ReadResponse(response runtime.ClientResponse, consumer runtime.Consumer) (interface{}, error) { + switch response.Code() { + case 200: + result := NewMetadataOK() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return result, nil + default: + result := NewMetadataDefault(response.Code()) + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + if response.Code()/100 == 2 { + return result, nil + } + return nil, result + } +} + +// NewMetadataOK creates a MetadataOK with default headers values +func NewMetadataOK() *MetadataOK { + return &MetadataOK{} +} + +/* +MetadataOK handles this case with default header values. + +Instance metadata +*/ +type MetadataOK struct { + Payload *models.InstanceMetadata + JobID int64 +} + +func (o *MetadataOK) GetPayload() *models.InstanceMetadata { + return o.Payload +} + +func (o *MetadataOK) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.InstanceMetadata) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + if jobIDHeader := response.GetHeader("x-rclone-jobid"); jobIDHeader != "" { + jobID, err := strconv.ParseInt(jobIDHeader, 10, 64) + if err != nil { + return err + } + + o.JobID = jobID + } + return nil +} + +// NewMetadataDefault creates a MetadataDefault with default headers values +func NewMetadataDefault(code int) *MetadataDefault { + return &MetadataDefault{ + _statusCode: code, + } +} + +/* +MetadataDefault handles this case with default header values. + +Server error +*/ +type MetadataDefault struct { + _statusCode int + + Payload *models.ErrorResponse + JobID int64 +} + +// Code gets the status code for the metadata default response +func (o *MetadataDefault) Code() int { + return o._statusCode +} + +func (o *MetadataDefault) GetPayload() *models.ErrorResponse { + return o.Payload +} + +func (o *MetadataDefault) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.ErrorResponse) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + if jobIDHeader := response.GetHeader("x-rclone-jobid"); jobIDHeader != "" { + jobID, err := strconv.ParseInt(jobIDHeader, 10, 64) + if err != nil { + return err + } + + o.JobID = jobID + } + return nil +} + +func (o *MetadataDefault) Error() string { + return fmt.Sprintf("agent [HTTP %d] %s", o._statusCode, strings.TrimRight(o.Payload.Message, ".")) +} diff --git a/v3/swagger/gen/agent/client/operations/operations_client.go b/v3/swagger/gen/agent/client/operations/operations_client.go index d64a8fe64..82b098751 100644 --- a/v3/swagger/gen/agent/client/operations/operations_client.go +++ b/v3/swagger/gen/agent/client/operations/operations_client.go @@ -43,6 +43,8 @@ type ClientService interface { JobStop(params *JobStopParams) (*JobStopOK, error) + Metadata(params *MetadataParams) (*MetadataOK, error) + NodeInfo(params *NodeInfoParams) (*NodeInfoOK, error) OperationsAbout(params *OperationsAboutParams) (*OperationsAboutOK, error) @@ -393,6 +395,41 @@ func (a *Client) JobStop(params *JobStopParams) (*JobStopOK, error) { return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) } +/* +Metadata returns instance metadata + +Collect instance metadata on a node +*/ +func (a *Client) Metadata(params *MetadataParams) (*MetadataOK, error) { + // TODO: Validate the params before sending + if params == nil { + params = NewMetadataParams() + } + + result, err := a.transport.Submit(&runtime.ClientOperation{ + ID: "Metadata", + Method: "GET", + PathPattern: "/cloud/metadata", + ProducesMediaTypes: []string{"application/json"}, + ConsumesMediaTypes: []string{"application/json"}, + Schemes: []string{"http"}, + Params: params, + Reader: &MetadataReader{formats: a.formats}, + Context: params.Context, + Client: params.HTTPClient, + }) + if err != nil { + return nil, err + } + success, ok := result.(*MetadataOK) + if ok { + return success, nil + } + // unexpected success response + unexpectedSuccess := result.(*MetadataDefault) + return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) +} + /* NodeInfo gets information about scylla node diff --git a/v3/swagger/gen/agent/models/instance_metadata.go b/v3/swagger/gen/agent/models/instance_metadata.go new file mode 100644 index 000000000..ca21a3c35 --- /dev/null +++ b/v3/swagger/gen/agent/models/instance_metadata.go @@ -0,0 +1,111 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + "github.com/go-openapi/errors" + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + "github.com/go-openapi/validate" +) + +// InstanceMetadata instance metadata +// +// # Information about instance on which agent is running +// +// swagger:model InstanceMetadata +type InstanceMetadata struct { + + // Cloud provider + // Enum: [aws gcp azure ] + CloudProvider string `json:"cloud_provider,omitempty"` + + // Cloud machine type, e.g. Standard-A3 + InstanceType string `json:"instance_type,omitempty"` +} + +// Validate validates this instance metadata +func (m *InstanceMetadata) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateCloudProvider(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +var instanceMetadataTypeCloudProviderPropEnum []interface{} + +func init() { + var res []string + if err := json.Unmarshal([]byte(`["aws","gcp","azure",""]`), &res); err != nil { + panic(err) + } + for _, v := range res { + instanceMetadataTypeCloudProviderPropEnum = append(instanceMetadataTypeCloudProviderPropEnum, v) + } +} + +const ( + + // InstanceMetadataCloudProviderAws captures enum value "aws" + InstanceMetadataCloudProviderAws string = "aws" + + // InstanceMetadataCloudProviderGcp captures enum value "gcp" + InstanceMetadataCloudProviderGcp string = "gcp" + + // InstanceMetadataCloudProviderAzure captures enum value "azure" + InstanceMetadataCloudProviderAzure string = "azure" + + // InstanceMetadataCloudProviderEmpty captures enum value "" + InstanceMetadataCloudProviderEmpty string = "" +) + +// prop value enum +func (m *InstanceMetadata) validateCloudProviderEnum(path, location string, value string) error { + if err := validate.EnumCase(path, location, value, instanceMetadataTypeCloudProviderPropEnum, true); err != nil { + return err + } + return nil +} + +func (m *InstanceMetadata) validateCloudProvider(formats strfmt.Registry) error { + + if swag.IsZero(m.CloudProvider) { // not required + return nil + } + + // value enum + if err := m.validateCloudProviderEnum("cloud_provider", "body", m.CloudProvider); err != nil { + return err + } + + return nil +} + +// MarshalBinary interface implementation +func (m *InstanceMetadata) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *InstanceMetadata) UnmarshalBinary(b []byte) error { + var res InstanceMetadata + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} From 5cfe08c550debdab6dd526eef98a2c27d2ac71fa Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:47:27 +0100 Subject: [PATCH 03/16] refactor(scyllaclient): reduce info to debug on closest DC (#4189) This log does not contain any useful information, but it clogs the log files since checking for closest DC is done during every fresh scyllaclient creation, which is done by the config cache service every minute. --- pkg/scyllaclient/client_ping.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scyllaclient/client_ping.go b/pkg/scyllaclient/client_ping.go index c547fb17d..d29fafeec 100644 --- a/pkg/scyllaclient/client_ping.go +++ b/pkg/scyllaclient/client_ping.go @@ -119,7 +119,7 @@ func (c *Client) CheckHostsConnectivity(ctx context.Context, hosts []string) []e // the lowest latency over 3 Ping() invocations across random selection of // hosts for each DC. func (c *Client) ClosestDC(ctx context.Context, dcs map[string][]string) ([]string, error) { - c.logger.Info(ctx, "Measuring datacenter latencies", "dcs", extractKeys(dcs)) + c.logger.Debug(ctx, "Measuring datacenter latencies", "dcs", extractKeys(dcs)) if len(dcs) == 0 { return nil, errors.Errorf("no dcs to choose from") From d2c391cdf67659260a350f3a2e9edacce5f187ca Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:48:44 +0100 Subject: [PATCH 04/16] fix(scyllaclient): don't query raft read barrier API on Scylla 2024.2 (#4185) It turns out that Scylla 2024.2 does not expose this API. For now, it's not know which enterprise release will contain it, so we need to fall back to the CQL workaround. Fixes #4183 --- pkg/scyllaclient/client_agent.go | 2 +- pkg/scyllaclient/client_agent_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scyllaclient/client_agent.go b/pkg/scyllaclient/client_agent.go index 67ccef330..c0318c080 100644 --- a/pkg/scyllaclient/client_agent.go +++ b/pkg/scyllaclient/client_agent.go @@ -239,7 +239,7 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho for _, fv := range []featureByVersion{ {Constraint: ">= 6.1, < 2000", Method: SafeDescribeMethodReadBarrierAPI}, - {Constraint: ">= 2024.2, > 1000", Method: SafeDescribeMethodReadBarrierAPI}, + {Constraint: ">= 2024.2, > 1000", Method: SafeDescribeMethodReadBarrierCQL}, {Constraint: ">= 6.0, < 2000", Method: SafeDescribeMethodReadBarrierCQL}, } { supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, fv.Constraint) diff --git a/pkg/scyllaclient/client_agent_test.go b/pkg/scyllaclient/client_agent_test.go index bac5fa871..08ac42782 100644 --- a/pkg/scyllaclient/client_agent_test.go +++ b/pkg/scyllaclient/client_agent_test.go @@ -372,9 +372,9 @@ func TestSupportsSafeDescribeSchemaWithInternals(t *testing.T) { expectedError: nil, }, { - name: "when scylla >= 2024.2, then it is expected to support read barrier api", + name: "when scylla >= 2024.2, then it is expected to support read barrier cql", scyllaVersion: "2024.2", - expectedMethod: scyllaclient.SafeDescribeMethodReadBarrierAPI, + expectedMethod: scyllaclient.SafeDescribeMethodReadBarrierCQL, expectedError: nil, }, { From 2def7c19549eff4f54def26e90d052940bc2cf19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 13 Nov 2024 09:57:46 +0100 Subject: [PATCH 05/16] feat(testing): configure Scylla object storage (minio) For Scylla to access object storage, it needs to be configured in the 'object_storage.yaml' config file. --- testing/Makefile | 5 +++-- testing/docker-compose.yaml | 24 +++++++++++++++++++++++ testing/scylla/.gitignore | 3 ++- testing/scylla/config/object_storage.yaml | 8 ++++++++ 4 files changed, 37 insertions(+), 3 deletions(-) create mode 100644 testing/scylla/config/object_storage.yaml diff --git a/testing/Makefile b/testing/Makefile index fadbf8558..3df48c6eb 100644 --- a/testing/Makefile +++ b/testing/Makefile @@ -63,6 +63,7 @@ up: @cd scylla/certs && ./generate.sh @echo "==> Generating Scylla configuration" @cp scylla/config/scylla.yaml scylla/scylla.yaml + @cp scylla/config/object_storage.yaml scylla/object_storage.yaml ifeq ($(SSL_ENABLED),true) # disable non-ssl port @@ -70,7 +71,7 @@ ifeq ($(SSL_ENABLED),true) # merge into scylla.yaml values from config/scylla-ssl.yaml with overwrite option (-x) @$(YQ) merge -i -x scylla/scylla.yaml scylla/config/scylla-ssl.yaml @cp scylla/config/cqlshrc-ssl scylla/cqlshrc -else +else @cp scylla/config/cqlshrc scylla/cqlshrc endif @@ -118,7 +119,7 @@ endif @until [ 1 -le $$($(SM_NODETOOL) status | grep -c "UN") ]; do echo -n "."; sleep 2; done ; echo "" @./nodes_exec "rm /root/.cqlshrc || true" - @./nodes_exec "mkdir -p /root/.cassandra" + @./nodes_exec "mkdir -p /root/.cassandra" @./nodes_cp "scylla/cqlshrc" "/root/.cassandra/cqlshrc" @echo "==> Adding Minio user" diff --git a/testing/docker-compose.yaml b/testing/docker-compose.yaml index d4c9530c0..933b61eef 100644 --- a/testing/docker-compose.yaml +++ b/testing/docker-compose.yaml @@ -9,6 +9,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -26,6 +29,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -43,6 +49,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs/ @@ -60,6 +69,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -77,6 +89,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -94,6 +109,9 @@ services: - type: bind source: ./scylla/scylla.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -111,6 +129,9 @@ services: - type: bind source: ./scylla/scylla-second-cluster.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs @@ -128,6 +149,9 @@ services: - type: bind source: ./scylla/scylla-second-cluster.yaml target: /etc/scylla/scylla.yaml + - type: bind + source: ./scylla/object_storage.yaml + target: /etc/scylla/object_storage.yaml - type: bind source: ./scylla/certs/ target: /etc/scylla/certs diff --git a/testing/scylla/.gitignore b/testing/scylla/.gitignore index 941f753aa..18e8660ed 100644 --- a/testing/scylla/.gitignore +++ b/testing/scylla/.gitignore @@ -1,3 +1,4 @@ /cqlshrc /scylla.yaml -/scylla-second-cluster.yaml \ No newline at end of file +/scylla-second-cluster.yaml +/object_storage.yaml \ No newline at end of file diff --git a/testing/scylla/config/object_storage.yaml b/testing/scylla/config/object_storage.yaml new file mode 100644 index 000000000..91968a795 --- /dev/null +++ b/testing/scylla/config/object_storage.yaml @@ -0,0 +1,8 @@ +# Scylla dev docs: https://github.com/scylladb/scylladb/blob/92db2eca0b8ab0a4fa2571666a7fe2d2b07c697b/docs/dev/object_storage.md?plain=1#L29-L39 +endpoints: + - name: 192.168.200.99 + port: 9000 + https: true + aws_region: us-east-1 + aws_access_key_id: miniouser + aws_secret_access_key: minio1234 \ No newline at end of file From 3e04748f89f35638a68c00dcfc51977b1abf0c79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 11 Dec 2024 11:22:56 +0100 Subject: [PATCH 06/16] feat(schema): add scylla_task_id to backup_run_progress A separate column for Scylla task ID is needed because: - it has a different type from agent job ID - it make it clear which API was used --- pkg/schema/table/table.go | 1 + pkg/service/backup/model.go | 6 +++++- schema/v3.5.0.cql | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 schema/v3.5.0.cql diff --git a/pkg/schema/table/table.go b/pkg/schema/table/table.go index 6163163e3..6bf74ddfa 100644 --- a/pkg/schema/table/table.go +++ b/pkg/schema/table/table.go @@ -40,6 +40,7 @@ var ( "failed", "host", "run_id", + "scylla_task_id", "size", "skipped", "started_at", diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index 08a761941..717f2df2b 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -129,7 +129,11 @@ type RunProgress struct { Unit int64 TableName string - AgentJobID int64 + // Uploading SSTables could be done by either Rclone or Scylla API. + // Only one of those IDs should be set. + AgentJobID int64 + ScyllaTaskID string + StartedAt *time.Time CompletedAt *time.Time Error string diff --git a/schema/v3.5.0.cql b/schema/v3.5.0.cql new file mode 100644 index 000000000..b5ab8e2bf --- /dev/null +++ b/schema/v3.5.0.cql @@ -0,0 +1 @@ +ALTER TABLE backup_run_progress ADD scylla_task_id text; \ No newline at end of file From c55fb1d818f6979852862f69fe45e6b8410abe01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Thu, 12 Dec 2024 11:39:36 +0100 Subject: [PATCH 07/16] feat(scyllaclient): add methods for managing Scylla backup API Those methods consist of both: - direct Scylla backup API call - helper Scylla Task Manager API calls --- pkg/scyllaclient/client_scylla.go | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/pkg/scyllaclient/client_scylla.go b/pkg/scyllaclient/client_scylla.go index 03210b83f..255123b85 100644 --- a/pkg/scyllaclient/client_scylla.go +++ b/pkg/scyllaclient/client_scylla.go @@ -1159,6 +1159,92 @@ func (c *Client) RaftReadBarrier(ctx context.Context, host, groupID string) erro return nil } +// ScyllaBackup schedules Scylla backup task and returns its ID. +func (c *Client) ScyllaBackup(ctx context.Context, host, endpoint, bucket, prefix, keyspace, table, snapshotTag string) (string, error) { + resp, err := c.scyllaOps.StorageServiceBackupPost(&operations.StorageServiceBackupPostParams{ + Context: forceHost(ctx, host), + Endpoint: endpoint, + Bucket: bucket, + Prefix: prefix, + Keyspace: keyspace, + Table: table, + Snapshot: &snapshotTag, + }) + if err != nil { + return "", err + } + return resp.GetPayload(), nil +} + +// ScyllaTaskState describes Scylla task state. +type ScyllaTaskState string + +// Possible ScyllaTaskState. +const ( + ScyllaTaskStateCreated ScyllaTaskState = "created" + ScyllaTaskStateRunning ScyllaTaskState = "running" + ScyllaTaskStateDone ScyllaTaskState = "done" + ScyllaTaskStateFailed ScyllaTaskState = "failed" +) + +func isScyllaTaskRunning(err error) bool { + // Scylla API call might return earlier due to timeout (see swagger definition) + status, _ := StatusCodeAndMessageOf(err) + return status == http.StatusRequestTimeout +} + +func scyllaWaitTaskShouldRetryHandler(err error) *bool { + if isScyllaTaskRunning(err) { + return pointer.BoolPtr(false) + } + return nil +} + +// ScyllaWaitTask long polls Scylla task status. +func (c *Client) ScyllaWaitTask(ctx context.Context, host, id string, longPollingSeconds int64) (*models.TaskStatus, error) { + ctx = withShouldRetryHandler(ctx, scyllaWaitTaskShouldRetryHandler) + ctx = forceHost(ctx, host) + ctx = noTimeout(ctx) + p := &operations.TaskManagerWaitTaskTaskIDGetParams{ + Context: ctx, + TaskID: id, + } + if longPollingSeconds > 0 { + p.SetTimeout(&longPollingSeconds) + } + + resp, err := c.scyllaOps.TaskManagerWaitTaskTaskIDGet(p) + if err != nil { + if isScyllaTaskRunning(err) { + return c.ScyllaTaskProgress(ctx, host, id) + } + return nil, err + } + return resp.GetPayload(), nil +} + +// ScyllaTaskProgress returns provided Scylla task status. +func (c *Client) ScyllaTaskProgress(ctx context.Context, host, id string) (*models.TaskStatus, error) { + resp, err := c.scyllaOps.TaskManagerTaskStatusTaskIDGet(&operations.TaskManagerTaskStatusTaskIDGetParams{ + Context: forceHost(ctx, host), + TaskID: id, + }) + if err != nil { + return nil, err + } + return resp.GetPayload(), nil +} + +// ScyllaAbortTask aborts provided Scylla task. +// Note that not all Scylla tasks can be aborted - see models.TaskStatus to check that. +func (c *Client) ScyllaAbortTask(ctx context.Context, host, id string) error { + _, err := c.scyllaOps.TaskManagerAbortTaskTaskIDPost(&operations.TaskManagerAbortTaskTaskIDPostParams{ + Context: forceHost(ctx, host), + TaskID: id, + }) + return err +} + // ToCanonicalIP replaces ":0:0" in IPv6 addresses with "::" // ToCanonicalIP("192.168.0.1") -> "192.168.0.1" // ToCanonicalIP("100:200:0:0:0:0:0:1") -> "100:200::1". From 27af74cf8455020ebd5f587b2d559f48ceb19b0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 13 Dec 2024 11:24:10 +0100 Subject: [PATCH 08/16] feat(agent): resolve provider into host name in proxy When working with Rclone, SM specifies just the provider name, and Rclone (with agent config) resolves it internally to the correct endpoint. This made it so user didn't need to specify the exact endpoint when running SM backup/restore tasks. When working with Scylla, SM needs to specify resolved host name on its own. This should be the same name as specified in 'object_storage.yaml' (See https://github.com/scylladb/scylladb/blob/92db2eca0b8ab0a4fa2571666a7fe2d2b07c697b/docs/dev/object_storage.md?plain=1#L29-L39). In order to maximize compatibility and UX, we still want it to be possible to specify just the provider name when running backup/restore. In such case, SM sends provider name as the "endpoint" query param, which is resolved by agent to proper host name when forwarding request to Scylla. Different "endpoint" query params are not resolved. Note that resolving "endpoint" query param in the proxy is just for the UX, so it might not work correctly in all the cases. In order to ensure correctness, "endpoint" should be specified directly by SM user so that no resolving is needed. --- pkg/cmd/agent/router.go | 161 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/agent/router.go b/pkg/cmd/agent/router.go index c24e56d4b..bb5a7a7ef 100644 --- a/pkg/cmd/agent/router.go +++ b/pkg/cmd/agent/router.go @@ -7,9 +7,14 @@ import ( "net" "net/http" "net/http/httputil" + "net/url" + "regexp" + "sync" "time" + "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/go-chi/chi/v5" + "github.com/pkg/errors" "github.com/scylladb/go-log" "github.com/scylladb/scylla-manager/v3/pkg/auth" "github.com/scylladb/scylla-manager/v3/pkg/config/agent" @@ -38,7 +43,7 @@ func newRouter(c agent.Config, metrics AgentMetrics, rclone http.Handler, logger // Scylla prometheus proxy priv.Mount("/metrics", promProxy(c)) // Fallback to Scylla API proxy - priv.NotFound(apiProxy(c)) + priv.NotFound(apiProxy(c, logger)) return r } @@ -53,13 +58,24 @@ func promProxy(c agent.Config) http.Handler { } } -func apiProxy(c agent.Config) http.HandlerFunc { +func apiProxy(c agent.Config, logger log.Logger) http.HandlerFunc { h := &httputil.ReverseProxy{ - Director: director(net.JoinHostPort(c.Scylla.APIAddress, c.Scylla.APIPort)), + Director: wrappedDirectors( + director(net.JoinHostPort(c.Scylla.APIAddress, c.Scylla.APIPort)), + objectStorageEndpointDirector(c, logger), + ), } return h.ServeHTTP } +func wrappedDirectors(directors ...func(r *http.Request)) func(r *http.Request) { + return func(r *http.Request) { + for _, d := range directors { + d(r) + } + } +} + func director(addr string) func(r *http.Request) { return func(r *http.Request) { r.Host = addr @@ -67,3 +83,142 @@ func director(addr string) func(r *http.Request) { r.URL.Scheme = "http" } } + +const ( + endpointQueryKey = "endpoint" + + s3Provider = "s3" + gcsProvider = "gcs" + azureProvider = "azure" + + defaultGCSHost = "storage.googleapis.com" + defaultAzureEndpoint = "blob.core.windows.net" +) + +// When working with Rclone, SM specifies just the provider name, +// and Rclone (with agent config) resolves it internally to the correct endpoint. +// This made it so user didn't need to specify the exact endpoint when running SM backup/restore tasks. +// +// When working with Scylla, SM needs to specify resolved host name on its own. +// This should be the same name as specified in 'object_storage.yaml' +// (See https://github.com/scylladb/scylladb/blob/92db2eca0b8ab0a4fa2571666a7fe2d2b07c697b/docs/dev/object_storage.md?plain=1#L29-L39). +// +// In order to maximize compatibility and UX, we still want it to be possible +// to specify just the provider name when running backup/restore. +// In such case, SM sends provider name as the "endpoint" query param, +// which is resolved by agent to proper host name when forwarding request to Scylla. +// Different "endpoint" query params are not resolved. +// +// Note that resolving "endpoint" query param in the proxy is just for the UX, +// so it might not work correctly in all the cases. +// In order to ensure correctness, "endpoint" should be specified directly by SM user +// so that no resolving is needed. +func objectStorageEndpointDirector(cfg agent.Config, logger log.Logger) func(r *http.Request) { + regex := regexp.MustCompile(`^/storage_service/backup$`) + resolver := endpointResolver(cfg) + + return func(r *http.Request) { + // Check for object storage endpoints + if !regex.MatchString(r.URL.Path) { + return + } + + // Ensure the existence of "endpoint" query param + q := r.URL.Query() + if !q.Has(endpointQueryKey) { + logger.Error(r.Context(), "Expected endpoint query param, but didn't receive it", + "query", r.URL.RawQuery) + return + } + + // Resolve provider to the proper endpoint + provider := q.Get(endpointQueryKey) + resolvedEndpoint, err := resolver(provider) + if err != nil { + logger.Error(r.Context(), "Failed to resolve provider to endpoint", "provider", provider, "err", err) + return + } + + resolvedHost, err := endpointToHostName(resolvedEndpoint) + if err != nil { + logger.Error(r.Context(), "Failed to convert endpoint to host name", + "endpoint", resolvedEndpoint, + "err", err) + return + } + + q.Del(endpointQueryKey) + q.Add(endpointQueryKey, resolvedHost) + r.URL.RawQuery = q.Encode() + } +} + +func endpointResolver(cfg agent.Config) func(provider string) (string, error) { + s3Resolver := s3EndpointResolver() + + return func(provider string) (string, error) { + var resolvedEndpoint string + switch provider { + case s3Provider: + resolvedEndpoint = cfg.S3.Endpoint + if resolvedEndpoint == "" { + var err error + resolvedEndpoint, err = s3Resolver(cfg.S3.Region) + if err != nil { + return "", err + } + } + case gcsProvider: + // It's not possible to specify non-default GCS endpoint + // with Rclone config (at least with Rclone v1.54). + resolvedEndpoint = defaultGCSHost + case azureProvider: + // For Azure, account is a part of the resolved host + if cfg.Azure.Account == "" { + return "", errors.New("account is not set in Azure config") + } + endpoint := cfg.Azure.Endpoint + if endpoint == "" { + endpoint = defaultAzureEndpoint + } + resolvedEndpoint = cfg.Azure.Account + "." + endpoint + default: + // Endpoint has already been resolved on SM side + resolvedEndpoint = provider + } + return resolvedEndpoint, nil + } +} + +func s3EndpointResolver() func(region string) (string, error) { + var ( + // No need to resolve endpoint multiple times + resolvedEndpoint string + mu sync.Mutex + ) + + return func(region string) (string, error) { + mu.Lock() + defer mu.Unlock() + if resolvedEndpoint != "" { + return resolvedEndpoint, nil + } + + resolver := endpoints.DefaultResolver() + re, err := resolver.EndpointFor(s3Provider, region) + if err != nil { + return "", errors.Wrap(err, "resolve S3 endpoint for region "+region) + } + + resolvedEndpoint = re.URL + return re.URL, nil + } +} + +func endpointToHostName(endpoint string) (string, error) { + u, err := url.Parse(endpoint) + if err != nil { + return "", errors.Wrap(err, "parse endpoint "+endpoint) + } + return u.Hostname(), nil +} From dc6acb8d748e38b05c6b609ba3e1eb68e47b187d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Fri, 13 Dec 2024 15:13:38 +0100 Subject: [PATCH 09/16] feat(backup): check when Scylla backup API can be used Scylla backup API can be used when: - node exposes Scylla backup API - s3 is the used provider - backup won't create versioned files --- pkg/scyllaclient/client_agent.go | 23 +++++++++++++++++ pkg/service/backup/service.go | 6 +++++ pkg/service/backup/worker.go | 24 ++++++++++++++++++ pkg/service/backup/worker_deduplicate.go | 26 ++++++++++++------- pkg/service/backup/worker_scylla_upload.go | 29 ++++++++++++++++++++++ 5 files changed, 99 insertions(+), 9 deletions(-) create mode 100644 pkg/service/backup/worker_scylla_upload.go diff --git a/pkg/scyllaclient/client_agent.go b/pkg/scyllaclient/client_agent.go index c0318c080..cdbbc44f5 100644 --- a/pkg/scyllaclient/client_agent.go +++ b/pkg/scyllaclient/client_agent.go @@ -254,6 +254,29 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho return "", nil } +// SupportsScyllaBackupRestoreAPI returns whether node exposes backup/restore API +// that can be used instead of the Rclone API for backup/restore tasks. +func (ni *NodeInfo) SupportsScyllaBackupRestoreAPI() (bool, error) { + // Check master builds + if scyllaversion.MasterVersion(ni.ScyllaVersion) { + return true, nil + } + // Check OSS + supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 6.3, < 2000") + if err != nil { + return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion) + } + if supports { + return true, nil + } + // Check ENT + supports, err = scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 2024.3") + if err != nil { + return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion) + } + return supports, nil +} + // FreeOSMemory calls debug.FreeOSMemory on the agent to return memory to OS. func (c *Client) FreeOSMemory(ctx context.Context, host string) error { p := operations.FreeOSMemoryParams{ diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index 048fb0927..f8ca0bf3e 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -677,6 +677,11 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID return errors.Wrap(err, "invalid cluster") } + nodeConfig, err := s.configCache.ReadAll(clusterID) + if err != nil { + return errors.Wrap(err, "read all nodes config") + } + // Create a worker w := &worker{ workerTools: workerTools{ @@ -687,6 +692,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID SnapshotTag: run.SnapshotTag, Config: s.config, Client: client, + NodeConfig: nodeConfig, }, PrevStage: run.Stage, Metrics: s.metrics, diff --git a/pkg/service/backup/worker.go b/pkg/service/backup/worker.go index 3555f8041..dc06fd5cc 100644 --- a/pkg/service/backup/worker.go +++ b/pkg/service/backup/worker.go @@ -6,12 +6,14 @@ import ( "bytes" "context" "fmt" + "net" "sync" "github.com/scylladb/go-log" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -42,6 +44,9 @@ type snapshotDir struct { SkippedBytesOffset int64 NewFilesSize int64 + // willCreateVersioned is set to true when uploading the snapshot directory after + // the deduplication results in creating versioned SSTables. + willCreateVersioned bool } func (sd snapshotDir) String() string { @@ -58,6 +63,7 @@ type workerTools struct { SnapshotTag string Config Config Client *scyllaclient.Client + NodeConfig map[string]configcache.NodeConfig Logger log.Logger } @@ -123,3 +129,21 @@ func (w *worker) cleanup(ctx context.Context, hi []hostInfo) { _ = hostsInParallel(hi, parallel.NoLimit, f, notify) // nolint: errcheck } + +// nodeInfo is a getter for workerTools.NodeConfig which is a workaround for #4181. +func (w *worker) nodeInfo(ctx context.Context, host string) (*scyllaclient.NodeInfo, error) { + // Try to get direct entry in config cache + if nc, ok := w.NodeConfig[host]; ok { + return nc.NodeInfo, nil + } + // Try to get resolved entry in config cache + if hostIP := net.ParseIP(host); hostIP != nil { + for h, nc := range w.NodeConfig { + if ip := net.ParseIP(h); ip != nil && hostIP.Equal(ip) { + return nc.NodeInfo, nil + } + } + } + // Last resort - query node info from the scratch + return w.Client.NodeInfo(ctx, host) +} diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index e3b5fdc2a..23d390bc2 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -82,10 +82,11 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error { } deduplicatedUUIDSSTables := w.deduplicateUUIDSStables(remoteSSTableBundles, localSSTableBundles) - deduplicatedIntSSTables, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles) + deduplicatedIntSSTables, willCreateVersioned, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles) if err != nil { return errors.Wrap(err, "deduplication based on .crc32 content") } + d.willCreateVersioned = willCreateVersioned deduplicated := make([]string, 0, len(deduplicatedUUIDSSTables)+len(deduplicatedIntSSTables)) var totalSkipped int64 @@ -134,9 +135,10 @@ func (w *worker) deduplicateUUIDSStables(remoteSSTables, localSSTables *sstableB return deduplicated } +// willCreateVersioned corresponds to snapshotDir.willCreateVersioned. func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remoteDir, localDir string, remoteSSTables, localSSTables *sstableBundlesByID, -) (deduplicated []fileInfo, err error) { +) (deduplicated []fileInfo, willCreateVersioned bool, err error) { // Reference to SSTables 3.0 Data File Format // https://opensource.docs.scylladb.com/stable/architecture/sstable/sstable3/sstables-3-data-file-format.html @@ -144,38 +146,44 @@ func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remote // to the remote -Digest.crc32 content. // The same content implies that SSTable can be deduplicated and removed from local directory. for id, localBundle := range localSSTables.intID { + remoteBundle, ok := remoteSSTables.intID[id] + if !ok { + continue + } + // At this point analyzed SSTable ID is present in both local and remote dirs. + // Not being able to deduplicate it results in setting 'willCreateVersioned' to true. crc32Idx := slices.IndexFunc(localBundle, func(fi fileInfo) bool { return strings.HasSuffix(fi.Name, "Digest.crc32") }) if crc32Idx == -1 { + willCreateVersioned = true continue } crc32FileName := localBundle[crc32Idx].Name - remoteBundle, ok := remoteSSTables.intID[id] - if !ok { - continue - } if !isSSTableBundleSizeEqual(localBundle, remoteBundle) { + willCreateVersioned = true continue } remoteCRC32Path := path.Join(remoteDir, crc32FileName) remoteCRC32, err := w.Client.RcloneCat(ctx, host, remoteCRC32Path) if err != nil { - return deduplicated, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path) + return nil, true, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path) } localCRC32Path := path.Join(localDir, crc32FileName) localCRC32, err := w.Client.RcloneCat(ctx, host, localCRC32Path) if err != nil { - return deduplicated, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path) + return nil, true, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path) } if bytes.Equal(localCRC32, remoteCRC32) { deduplicated = append(deduplicated, localBundle...) + } else { + willCreateVersioned = true } } - return deduplicated, nil + return deduplicated, willCreateVersioned, nil } type sstableBundlesByID struct { diff --git a/pkg/service/backup/worker_scylla_upload.go b/pkg/service/backup/worker_scylla_upload.go new file mode 100644 index 000000000..4a33eb280 --- /dev/null +++ b/pkg/service/backup/worker_scylla_upload.go @@ -0,0 +1,29 @@ +// Copyright (C) 2024 ScyllaDB + +package backup + +import ( + "slices" + + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +// Decides whether we should use Scylla backup API for uploading the files. +func (w *worker) useScyllaBackupAPI(ctx context.Context, d snapshotDir, hi hostInfo) (bool, error) { + // Scylla backup API does not handle creation of versioned files. + if d.willCreateVersioned { + return false, nil + } + // List of object storage providers supported by Scylla backup API. + scyllaSupportedProviders := []Provider{ + S3, + } + if !slices.Contains(scyllaSupportedProviders, hi.Location.Provider) { + return false, nil + } + nc, err := w.nodeInfo(ctx, hi.IP) + if err != nil { + return false, errors.Wrapf(err, "get node %s info", hi.IP) + } + return nc.SupportsScyllaBackupRestoreAPI() +} From 19b00bf77163889db1e08130419e65175486a02e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 17 Dec 2024 10:05:53 +0100 Subject: [PATCH 10/16] feat(backup): use Scylla backup API This commit adds code for using Scylla backup API. Luckily for us, handling pause/resume and progress is analogous to the Rclone API handling. Fixes #4143 Fixes #4138 Fixes #4141 --- pkg/service/backup/worker_scylla_upload.go | 94 ++++++++++++++++++++++ pkg/service/backup/worker_upload.go | 8 ++ 2 files changed, 102 insertions(+) diff --git a/pkg/service/backup/worker_scylla_upload.go b/pkg/service/backup/worker_scylla_upload.go index 4a33eb280..6d5933054 100644 --- a/pkg/service/backup/worker_scylla_upload.go +++ b/pkg/service/backup/worker_scylla_upload.go @@ -3,9 +3,14 @@ package backup import ( + "context" "slices" + "time" + "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/models" ) // Decides whether we should use Scylla backup API for uploading the files. @@ -27,3 +32,92 @@ func (w *worker) useScyllaBackupAPI(ctx context.Context, d snapshotDir, hi hostI } return nc.SupportsScyllaBackupRestoreAPI() } + +func (w *worker) scyllaBackup(ctx context.Context, hi hostInfo, d snapshotDir) error { + if d.Progress.ScyllaTaskID == "" || !w.scyllaCanAttachToTask(ctx, hi.IP, d.Progress.ScyllaTaskID) { + prefix := w.remoteSSTableDir(hi, d) + // Agent's proxy can resolve provider into the endpoint. + id, err := w.Client.ScyllaBackup(ctx, hi.IP, string(hi.Location.Provider), hi.Location.Path, prefix, d.Keyspace, d.Table, w.SnapshotTag) + if err != nil { + return errors.Wrap(err, "backup") + } + + w.Logger.Info(ctx, "Backing up dir", "host", d.Host, "keyspace", d.Keyspace, "table", d.Table, "prefix", prefix, "task id", id) + d.Progress.ScyllaTaskID = id + w.onRunProgress(ctx, d.Progress) + } + + if err := w.scyllaWaitTask(ctx, d.Progress.ScyllaTaskID, d); err != nil { + w.Logger.Error(ctx, "Backing up dir failed", "host", d.Host, "task id", d.Progress.TaskID, "error", err) + return err + } + return nil +} + +func (w *worker) scyllaCanAttachToTask(ctx context.Context, host, taskID string) bool { + task, err := w.Client.ScyllaTaskProgress(ctx, host, taskID) + if err != nil { + w.Logger.Error(ctx, "Failed to fetch task info", + "host", host, + "task id", taskID, + "error", err, + ) + return false + } + + state := scyllaclient.ScyllaTaskState(task.State) + return state == scyllaclient.ScyllaTaskStateDone || + state == scyllaclient.ScyllaTaskStateRunning || + state == scyllaclient.ScyllaTaskStateCreated +} + +func (w *worker) scyllaWaitTask(ctx context.Context, id string, d snapshotDir) (err error) { + defer func() { + // On error abort task + if err != nil { + w.Logger.Info(ctx, "Stop task", "host", d.Host, "id", id) + // Watch out for already cancelled context + if e := w.Client.ScyllaAbortTask(context.Background(), d.Host, id); e != nil { + w.Logger.Error(ctx, "Failed to abort task", + "host", d.Host, + "id", id, + "error", e, + ) + } + } + }() + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + task, err := w.Client.ScyllaWaitTask(ctx, d.Host, id, int64(w.Config.LongPollingTimeoutSeconds)) + if err != nil { + return errors.Wrap(err, "wait for scylla task") + } + w.scyllaUpdateProgress(ctx, d, task) + switch scyllaclient.ScyllaTaskState(task.State) { + case scyllaclient.ScyllaTaskStateFailed: + return errors.Errorf("task error (%s): %s", id, task.Error) + case scyllaclient.ScyllaTaskStateDone: + return nil + } + } +} + +func (w *worker) scyllaUpdateProgress(ctx context.Context, d snapshotDir, task *models.TaskStatus) { + p := d.Progress + p.StartedAt = nil + if t := time.Time(task.StartTime); !t.IsZero() { + p.StartedAt = &t + } + p.CompletedAt = nil + if t := time.Time(task.EndTime); !t.IsZero() { + p.CompletedAt = &t + } + p.Error = task.Error + p.Uploaded = int64(task.ProgressCompleted) + p.Skipped = d.SkippedBytesOffset + w.onRunProgress(ctx, p) +} diff --git a/pkg/service/backup/worker_upload.go b/pkg/service/backup/worker_upload.go index 23a76955f..029ee8454 100644 --- a/pkg/service/backup/worker_upload.go +++ b/pkg/service/backup/worker_upload.go @@ -71,6 +71,14 @@ func (w *worker) uploadHost(ctx context.Context, h hostInfo) error { err = errors.Wrap(w.deleteTableSnapshot(ctx, h, d), "delete table snapshot") }() + if ok, err := w.useScyllaBackupAPI(ctx, d, h); err != nil { + return errors.Wrap(err, "check Scylla backup API support") + } else if ok { + w.Logger.Info(ctx, "Use Scylla backup API", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table) + return w.scyllaBackup(ctx, h, d) + } + + w.Logger.Info(ctx, "Use Rclone movedir API", "host", h.IP, "keyspace", d.Keyspace, "table", d.Table) // Check if we should attach to a previous job and wait for it to complete. if attached, err := w.attachToJob(ctx, h, d); err != nil { return errors.Wrap(err, "attach to the agent job") From 4d5bee44fdcb51bea5a911cb18c272ed98953389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 16 Dec 2024 12:46:59 +0100 Subject: [PATCH 11/16] chore(backup_test): adjust tests to Scylla backup API Some tests used interceptor for given paths in order to wait/block/check some API calls. Those interceptors were updated to also look for Scylla backup API paths. --- .../backup/service_backup_integration_test.go | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index 8cd5d4485..328949074 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -15,7 +15,6 @@ import ( "os" "path" "strings" - "sync" "testing" "time" @@ -142,27 +141,37 @@ func defaultConfig() backup.Config { return c } -func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(method string, path string) { - var ( - brokenHost string - mu sync.Mutex - ) +func (h *backupTestHelper) setInterceptorBlockPathOnFirstHost(paths ...string) { + brokenHost := atomic.NewString("") h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if req.Method == method && req.URL.Path == path { - mu.Lock() - defer mu.Unlock() - - if brokenHost == "" { - h.T.Log("Setting broken host", req.Host) - brokenHost = req.Host + for _, p := range paths { + if strings.HasPrefix(req.URL.Path, p) { + if brokenHost.CompareAndSwap("", req.Host) { + h.T.Log("Setting broken host", req.Host) + } + if brokenHost.Load() == req.Host { + return nil, errors.New("dial error") + } } + } + return nil, nil + })) +} - if brokenHost == req.Host { - return nil, errors.New("dial error") +func (h *backupTestHelper) setInterceptorWaitPath(paths ...string) chan struct{} { + guard := atomic.NewBool(false) + wait := make(chan struct{}) + h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + for _, p := range paths { + if strings.HasPrefix(req.URL.Path, p) { + if guard.CompareAndSwap(false, true) { + close(wait) + } } } return nil, nil })) + return wait } func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) { @@ -1002,6 +1011,8 @@ func TestBackupResumeIntegration(t *testing.T) { done = make(chan struct{}) ) + upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir") + if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil { t.Fatal(err) } @@ -1010,20 +1021,19 @@ func TestBackupResumeIntegration(t *testing.T) { defer close(done) Print("When: backup is running") err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target) - if err == nil { - t.Error("Expected error on run but got nil") - } else { - if !strings.Contains(err.Error(), "context") { - t.Errorf("Expected context error but got: %+v", err) - } + if !errors.Is(err, context.Canceled) { + t.Errorf("Expected %q error but got: %q", context.Canceled, err) } }() - h.waitTransfersStarted() - - Print("And: context is canceled") - cancel() - <-ctx.Done() + select { + case <-time.After(backupTimeout): + t.Fatalf("Backup failed to complete in under %s", backupTimeout) + case <-upload: + Print("And: context is canceled") + cancel() + <-ctx.Done() + } select { case <-time.After(backupTimeout): @@ -1059,6 +1069,8 @@ func TestBackupResumeIntegration(t *testing.T) { ) defer cancel() + upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir") + if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil { t.Fatal(err) } @@ -1072,16 +1084,18 @@ func TestBackupResumeIntegration(t *testing.T) { close(done) }() - h.waitTransfersStarted() - - Print("And: we restart the agents") - restartAgents(h.CommonTestHelper) + select { + case <-time.After(backupTimeout * 3): + t.Fatalf("Backup failed to complete in under %s", backupTimeout*3) + case <-upload: + Print("And: we restart the agents") + restartAgents(h.CommonTestHelper) + } select { case <-time.After(backupTimeout * 3): t.Fatalf("Backup failed to complete in under %s", backupTimeout*3) case <-done: - Print("Then: backup completed execution") } Print("And: nothing is transferring") @@ -1094,7 +1108,7 @@ func TestBackupResumeIntegration(t *testing.T) { t.Run("resume after snapshot failed", func(t *testing.T) { h := newBackupTestHelper(t, session, config, location, nil) Print("Given: snapshot fails on a host") - h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/storage_service/snapshots") + h.setInterceptorBlockPathOnFirstHost("/storage_service/snapshots") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1132,7 +1146,7 @@ func TestBackupResumeIntegration(t *testing.T) { t.Run("resume after upload failed", func(t *testing.T) { h := newBackupTestHelper(t, session, config, location, nil) Print("Given: upload fails on a host") - h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/agent/rclone/job/progress") + h.setInterceptorBlockPathOnFirstHost("/agent/rclone/job/progress", "/task_manager/wait_task") ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 007313b0ef6549c126de5810731fb3d1c8244804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 17 Dec 2024 12:38:27 +0100 Subject: [PATCH 12/16] chore(restore_test): adjust tests to Scylla backup API Using Scylla backup API does not result in changes to Rclone transfers, rate limiting or cpu pinning, so it shouldn't be checked as a part of the restore test. --- pkg/service/restore/restore_integration_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 90e584c4b..2108789dd 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -611,9 +611,6 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "rate_limit": []string{"88"}, }) - Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3, 88, pinnedCPU) - runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() From ecfeb806228686a926b401f3914b296704cbecdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Tue, 17 Dec 2024 15:29:59 +0100 Subject: [PATCH 13/16] feat(backup_test): add TestBackupCorrectAPIIntegration This is a simple test for checking whether the correct API is used during the backup. --- .../backup/service_backup_integration_test.go | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index 328949074..461cb2a9c 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -2644,3 +2644,74 @@ func TestBackupSkipSchemaIntegration(t *testing.T) { } } } + +func TestBackupCorrectAPIIntegration(t *testing.T) { + // This test validates that the correct API is used + // for uploading snapshot dirs (Rclone or Scylla). + const ( + testBucket = "backuptest-api" + testKeyspace = "backuptest_api" + ) + + var ( + location = s3Location(testBucket) + session = CreateScyllaManagerDBSession(t) + h = newBackupTestHelperWithUser(t, session, defaultConfig(), location, nil, "", "") + ctx = context.Background() + clusterSession = CreateSessionAndDropAllKeyspaces(t, h.Client) + ) + + ni, err := h.Client.AnyNodeInfo(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Choose expected API - Rclone or Scylla depending on node version + ensuredPath := "/agent/rclone/sync/movedir" + blockedPath := "/storage_service/backup" + if ok, err := ni.SupportsScyllaBackupRestoreAPI(); err != nil { + t.Fatal(err) + } else if ok { + ensuredPath, blockedPath = blockedPath, ensuredPath + } + + Printf("Expect that %q API will be used for backup, while %q API won't be used at all", ensuredPath, blockedPath) + encounteredEnsured := atomic.NewBool(false) + encounteredBlocked := atomic.NewBool(false) + h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + if strings.HasPrefix(req.URL.Path, ensuredPath) { + encounteredEnsured.Store(true) + } + if strings.HasPrefix(req.URL.Path, blockedPath) { + encounteredBlocked.Store(true) + } + return nil, nil + })) + + WriteData(t, clusterSession, testKeyspace, 1) + + props := map[string]any{ + "location": []Location{location}, + "keyspace": []string{testKeyspace}, + } + rawProps, err := json.Marshal(props) + if err != nil { + t.Fatal(errors.Wrap(err, "create raw properties")) + } + target, err := h.service.GetTarget(ctx, h.ClusterID, rawProps) + if err != nil { + t.Fatal(errors.Wrap(err, "create target")) + } + + err = h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target) + if err != nil { + t.Fatal(err) + } + + if !encounteredEnsured.Load() { + t.Fatalf("Expected SM to use %q API", ensuredPath) + } + if encounteredBlocked.Load() { + t.Fatalf("Expected SM not to use %q API", blockedPath) + } +} From 49c62e9b13e838975591fe45543ba5458855f8bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 15 Jan 2025 11:42:34 +0100 Subject: [PATCH 14/16] chore(todo): left todos in the code --- pkg/cmd/agent/router.go | 2 ++ pkg/scyllaclient/client_agent.go | 1 + pkg/service/backup/worker.go | 1 + 3 files changed, 4 insertions(+) diff --git a/pkg/cmd/agent/router.go b/pkg/cmd/agent/router.go index bb5a7a7ef..5c0605e39 100644 --- a/pkg/cmd/agent/router.go +++ b/pkg/cmd/agent/router.go @@ -113,6 +113,8 @@ const ( // so it might not work correctly in all the cases. // In order to ensure correctness, "endpoint" should be specified directly by SM user // so that no resolving is needed. +// +// ML_TODO: this is tmp solution (#4211) - verify that it's still needed before merging to master. func objectStorageEndpointDirector(cfg agent.Config, logger log.Logger) func(r *http.Request) { regex := regexp.MustCompile(`^/storage_service/backup$`) resolver := endpointResolver(cfg) diff --git a/pkg/scyllaclient/client_agent.go b/pkg/scyllaclient/client_agent.go index cdbbc44f5..9e74ba9e9 100644 --- a/pkg/scyllaclient/client_agent.go +++ b/pkg/scyllaclient/client_agent.go @@ -257,6 +257,7 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho // SupportsScyllaBackupRestoreAPI returns whether node exposes backup/restore API // that can be used instead of the Rclone API for backup/restore tasks. func (ni *NodeInfo) SupportsScyllaBackupRestoreAPI() (bool, error) { + // ML_TODO: this is tmp solution - verify versions before merging to master. // Check master builds if scyllaversion.MasterVersion(ni.ScyllaVersion) { return true, nil diff --git a/pkg/service/backup/worker.go b/pkg/service/backup/worker.go index dc06fd5cc..46add5aff 100644 --- a/pkg/service/backup/worker.go +++ b/pkg/service/backup/worker.go @@ -132,6 +132,7 @@ func (w *worker) cleanup(ctx context.Context, hi []hostInfo) { // nodeInfo is a getter for workerTools.NodeConfig which is a workaround for #4181. func (w *worker) nodeInfo(ctx context.Context, host string) (*scyllaclient.NodeInfo, error) { + // ML_TODO: this is tmp solution (#4181) - verify before merging to master. // Try to get direct entry in config cache if nc, ok := w.NodeConfig[host]; ok { return nc.NodeInfo, nil From 0731050d880f398fe993917d1eaf932652cfeb1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 15 Jan 2025 12:01:11 +0100 Subject: [PATCH 15/16] feat(backup): improve scylla backup API check logging --- pkg/service/backup/worker_scylla_upload.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/service/backup/worker_scylla_upload.go b/pkg/service/backup/worker_scylla_upload.go index 6d5933054..99c678eef 100644 --- a/pkg/service/backup/worker_scylla_upload.go +++ b/pkg/service/backup/worker_scylla_upload.go @@ -17,6 +17,7 @@ import ( func (w *worker) useScyllaBackupAPI(ctx context.Context, d snapshotDir, hi hostInfo) (bool, error) { // Scylla backup API does not handle creation of versioned files. if d.willCreateVersioned { + w.Logger.Info(ctx, "Can't use Scylla backup API", "reason", "backup needs to create versioned files") return false, nil } // List of object storage providers supported by Scylla backup API. @@ -24,13 +25,22 @@ func (w *worker) useScyllaBackupAPI(ctx context.Context, d snapshotDir, hi hostI S3, } if !slices.Contains(scyllaSupportedProviders, hi.Location.Provider) { + w.Logger.Info(ctx, "Can't use Scylla backup API", "reason", "unsupported cloud provider") return false, nil } nc, err := w.nodeInfo(ctx, hi.IP) if err != nil { return false, errors.Wrapf(err, "get node %s info", hi.IP) } - return nc.SupportsScyllaBackupRestoreAPI() + + ok, err := nc.SupportsScyllaBackupRestoreAPI() + if err != nil { + return false, err + } + if !ok { + w.Logger.Info(ctx, "Can't use Scylla backup API", "reason", "no native Scylla backup API exposed") + } + return ok, nil } func (w *worker) scyllaBackup(ctx context.Context, hi hostInfo, d snapshotDir) error { From d80e48f1064c326f838fa2fb350831a8153e278a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 15 Jan 2025 12:05:28 +0100 Subject: [PATCH 16/16] fix(restore_test): validate that Scylla backup does not impact Rclone config --- pkg/service/restore/restore_integration_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index 2108789dd..4b29c3f8c 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -489,6 +489,15 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) + ni, err := h.dstCluster.Client.AnyNodeInfo(context.Background()) + if err != nil { + t.Fatal(err) + } + nativeAPISupport, err := ni.SupportsScyllaBackupRestoreAPI() + if err != nil { + t.Fatal(err) + } + Print("Keyspace setup") ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" ks := randomizedName("prep_") @@ -611,6 +620,13 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "rate_limit": []string{"88"}, }) + Print("Validate state after backup") + if nativeAPISupport { + validateState(h.srcCluster, "repair", true, 10, 99, pinnedCPU) + } else { + validateState(h.srcCluster, "repair", true, 3, 88, pinnedCPU) + } + runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime()