Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Scylla API for backup #4169

Merged
merged 16 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
c7c7130
fix(backup_test): add missing 'Integration' suffix to tests (#4176)
Michal-Leszczynski Jan 2, 2025
ef3b968
feat(swagger): adds /cloud/metadata to agent api definition. (#4186)
VAveryanov8 Jan 3, 2025
5cfe08c
refactor(scyllaclient): reduce info to debug on closest DC (#4189)
Michal-Leszczynski Jan 7, 2025
d2c391c
fix(scyllaclient): don't query raft read barrier API on Scylla 2024.2…
Michal-Leszczynski Jan 7, 2025
2def7c1
feat(testing): configure Scylla object storage (minio)
Michal-Leszczynski Nov 13, 2024
3e04748
feat(schema): add scylla_task_id to backup_run_progress
Michal-Leszczynski Dec 11, 2024
c55fb1d
feat(scyllaclient): add methods for managing Scylla backup API
Michal-Leszczynski Dec 12, 2024
27af74c
feat(agent): resolve provider into host name in proxy
Michal-Leszczynski Dec 13, 2024
dc6acb8
feat(backup): check when Scylla backup API can be used
Michal-Leszczynski Dec 13, 2024
19b00bf
feat(backup): use Scylla backup API
Michal-Leszczynski Dec 17, 2024
4d5bee4
chore(backup_test): adjust tests to Scylla backup API
Michal-Leszczynski Dec 16, 2024
007313b
chore(restore_test): adjust tests to Scylla backup API
Michal-Leszczynski Dec 17, 2024
ecfeb80
feat(backup_test): add TestBackupCorrectAPIIntegration
Michal-Leszczynski Dec 17, 2024
49c62e9
chore(todo): left todos in the code
Michal-Leszczynski Jan 15, 2025
0731050
feat(backup): improve scylla backup API check logging
Michal-Leszczynski Jan 15, 2025
d80e48f
fix(restore_test): validate that Scylla backup does not impact Rclone…
Michal-Leszczynski Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 160 additions & 3 deletions pkg/cmd/agent/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/url"
"regexp"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"github.com/scylladb/go-log"
"github.com/scylladb/scylla-manager/v3/pkg/auth"
"github.com/scylladb/scylla-manager/v3/pkg/config/agent"
Expand Down Expand Up @@ -38,7 +43,7 @@ func newRouter(c agent.Config, metrics AgentMetrics, rclone http.Handler, logger
// Scylla prometheus proxy
priv.Mount("/metrics", promProxy(c))
// Fallback to Scylla API proxy
priv.NotFound(apiProxy(c))
priv.NotFound(apiProxy(c, logger))

return r
}
Expand All @@ -53,17 +58,169 @@ func promProxy(c agent.Config) http.Handler {
}
}

func apiProxy(c agent.Config) http.HandlerFunc {
func apiProxy(c agent.Config, logger log.Logger) http.HandlerFunc {
h := &httputil.ReverseProxy{
Director: director(net.JoinHostPort(c.Scylla.APIAddress, c.Scylla.APIPort)),
Director: wrappedDirectors(
director(net.JoinHostPort(c.Scylla.APIAddress, c.Scylla.APIPort)),
objectStorageEndpointDirector(c, logger),
),
}
return h.ServeHTTP
}

func wrappedDirectors(directors ...func(r *http.Request)) func(r *http.Request) {
return func(r *http.Request) {
for _, d := range directors {
d(r)
}
}
}

func director(addr string) func(r *http.Request) {
return func(r *http.Request) {
r.Host = addr
r.URL.Host = addr
r.URL.Scheme = "http"
}
}

const (
endpointQueryKey = "endpoint"

s3Provider = "s3"
gcsProvider = "gcs"
azureProvider = "azure"

defaultGCSHost = "storage.googleapis.com"
defaultAzureEndpoint = "blob.core.windows.net"
)

// When working with Rclone, SM specifies just the provider name,
// and Rclone (with agent config) resolves it internally to the correct endpoint.
// This made it so user didn't need to specify the exact endpoint when running SM backup/restore tasks.
//
// When working with Scylla, SM needs to specify resolved host name on its own.
// This should be the same name as specified in 'object_storage.yaml'
// (See https://github.com/scylladb/scylladb/blob/92db2eca0b8ab0a4fa2571666a7fe2d2b07c697b/docs/dev/object_storage.md?plain=1#L29-L39).
//
// In order to maximize compatibility and UX, we still want it to be possible
// to specify just the provider name when running backup/restore.
// In such case, SM sends provider name as the "endpoint" query param,
// which is resolved by agent to proper host name when forwarding request to Scylla.
// Different "endpoint" query params are not resolved.
//
// Note that resolving "endpoint" query param in the proxy is just for the UX,
// so it might not work correctly in all the cases.
// In order to ensure correctness, "endpoint" should be specified directly by SM user
// so that no resolving is needed.
//
// ML_TODO: this is tmp solution (#4211) - verify that it's still needed before merging to master.
func objectStorageEndpointDirector(cfg agent.Config, logger log.Logger) func(r *http.Request) {
regex := regexp.MustCompile(`^/storage_service/backup$`)
VAveryanov8 marked this conversation as resolved.
Show resolved Hide resolved
resolver := endpointResolver(cfg)

return func(r *http.Request) {
// Check for object storage endpoints
if !regex.MatchString(r.URL.Path) {
return
}

// Ensure the existence of "endpoint" query param
q := r.URL.Query()
if !q.Has(endpointQueryKey) {
logger.Error(r.Context(), "Expected endpoint query param, but didn't receive it",
"query", r.URL.RawQuery)
return
}

// Resolve provider to the proper endpoint
provider := q.Get(endpointQueryKey)
resolvedEndpoint, err := resolver(provider)
if err != nil {
logger.Error(r.Context(), "Failed to resolve provider to endpoint", "provider", provider, "err", err)
return
}

resolvedHost, err := endpointToHostName(resolvedEndpoint)
if err != nil {
logger.Error(r.Context(), "Failed to convert endpoint to host name",
"endpoint", resolvedEndpoint,
"err", err)
return
}

Comment on lines +130 to +151
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when there is an error raised in the Directory that is expected to validate&modify the incoming request ?
I understand that the error is just logged, but proceeds with the call to the scylla server with the current implementation.
This behavior means that the inproper request is proxied still to the scylla server.

It's much more efficient to stop processing the invalid request straight in the proxy instead of forwarding it.
To do that, you can introduce the middleware that is reponsible for validation +

http.Error(w, "Missing required query parameter: endpoint", http.StatusBadRequest)
return

The middleware should evaluate the correct endpoint and save it to conext.
Then, the director just checks the context to see if there is something to change.

Director, can skip the validation
This comment is only valid if you proceed with resolving endpoint in agent. The topic to discuss is that is can be safer/better to have the identifiers of endpoints in object storage and use it in sm configuration.

q.Del(endpointQueryKey)
q.Add(endpointQueryKey, resolvedHost)
r.URL.RawQuery = q.Encode()
}
}

func endpointResolver(cfg agent.Config) func(provider string) (string, error) {
s3Resolver := s3EndpointResolver()

return func(provider string) (string, error) {
var resolvedEndpoint string
switch provider {
case s3Provider:
resolvedEndpoint = cfg.S3.Endpoint
if resolvedEndpoint == "" {
var err error
resolvedEndpoint, err = s3Resolver(cfg.S3.Region)
if err != nil {
return "", err
}
}
case gcsProvider:
// It's not possible to specify non-default GCS endpoint
// with Rclone config (at least with Rclone v1.54).
resolvedEndpoint = defaultGCSHost
case azureProvider:
// For Azure, account is a part of the resolved host
if cfg.Azure.Account == "" {
return "", errors.New("account is not set in Azure config")
}
endpoint := cfg.Azure.Endpoint
if endpoint == "" {
endpoint = defaultAzureEndpoint
}
resolvedEndpoint = cfg.Azure.Account + "." + endpoint
default:
// Endpoint has already been resolved on SM side
resolvedEndpoint = provider
Comment on lines +187 to +189
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that if I call agent with endpoint query param = "http://169.254.169.254/storage_service/backup" then "169.254.169.254" is going to be passed to scylla server as the AWS endpoint to scylla server ?

Then this IP is consumed by scylla server to query S3 API, right ?

I think it's a threat called SSRF.

This default must be either change to return explicit error, or Scylla Manager Agent must be aware of whitelisted IP addresses (passed with "endpoint" query param).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see it's actually the main route. The input must be validated then. Is it possible to provide whitelisted IPs/Hosts ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, does it mean that the S3 enpoint url is not known by Scylla ?
Why ?
Isn't it stored in some scylla configuration ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@regevran why ScyllaAPI must be informed about the endpoint ? Cannot it be read by scylla directly ? Using this info https://github.com/scylladb/scylladb/blob/92db2eca0b8ab0a4fa2571666a7fe2d2b07c697b/docs/dev/object_storage.md?plain=1#L29-L39 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we covered those points during the meeting, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. These are comments from before the sync. Waiting for the output from the meeting then.

}
return resolvedEndpoint, nil
}
}

func s3EndpointResolver() func(region string) (string, error) {
var (
// No need to resolve endpoint multiple times
resolvedEndpoint string
mu sync.Mutex
)

return func(region string) (string, error) {
mu.Lock()
defer mu.Unlock()
if resolvedEndpoint != "" {
return resolvedEndpoint, nil
}

resolver := endpoints.DefaultResolver()
re, err := resolver.EndpointFor(s3Provider, region)
if err != nil {
return "", errors.Wrap(err, "resolve S3 endpoint for region "+region)
}

VAveryanov8 marked this conversation as resolved.
Show resolved Hide resolved
resolvedEndpoint = re.URL
return re.URL, nil
}
}

func endpointToHostName(endpoint string) (string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", errors.Wrap(err, "parse endpoint "+endpoint)
}
return u.Hostname(), nil
}
1 change: 1 addition & 0 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion pkg/scyllaclient/client_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho

for _, fv := range []featureByVersion{
{Constraint: ">= 6.1, < 2000", Method: SafeDescribeMethodReadBarrierAPI},
{Constraint: ">= 2024.2, > 1000", Method: SafeDescribeMethodReadBarrierAPI},
{Constraint: ">= 2024.2, > 1000", Method: SafeDescribeMethodReadBarrierCQL},
{Constraint: ">= 6.0, < 2000", Method: SafeDescribeMethodReadBarrierCQL},
} {
supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, fv.Constraint)
Expand All @@ -254,6 +254,30 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho
return "", nil
}

// SupportsScyllaBackupRestoreAPI returns whether node exposes backup/restore API
// that can be used instead of the Rclone API for backup/restore tasks.
func (ni *NodeInfo) SupportsScyllaBackupRestoreAPI() (bool, error) {
// ML_TODO: this is tmp solution - verify versions before merging to master.
// Check master builds
if scyllaversion.MasterVersion(ni.ScyllaVersion) {
return true, nil
}
// Check OSS
supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 6.3, < 2000")
if err != nil {
return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion)
}
if supports {
return true, nil
}
// Check ENT
supports, err = scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 2024.3")
if err != nil {
return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion)
}
return supports, nil
}

// FreeOSMemory calls debug.FreeOSMemory on the agent to return memory to OS.
func (c *Client) FreeOSMemory(ctx context.Context, host string) error {
p := operations.FreeOSMemoryParams{
Expand Down
4 changes: 2 additions & 2 deletions pkg/scyllaclient/client_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ func TestSupportsSafeDescribeSchemaWithInternals(t *testing.T) {
expectedError: nil,
},
{
name: "when scylla >= 2024.2, then it is expected to support read barrier api",
name: "when scylla >= 2024.2, then it is expected to support read barrier cql",
scyllaVersion: "2024.2",
expectedMethod: scyllaclient.SafeDescribeMethodReadBarrierAPI,
expectedMethod: scyllaclient.SafeDescribeMethodReadBarrierCQL,
expectedError: nil,
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/scyllaclient/client_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Client) CheckHostsConnectivity(ctx context.Context, hosts []string) []e
// the lowest latency over 3 Ping() invocations across random selection of
// hosts for each DC.
func (c *Client) ClosestDC(ctx context.Context, dcs map[string][]string) ([]string, error) {
c.logger.Info(ctx, "Measuring datacenter latencies", "dcs", extractKeys(dcs))
c.logger.Debug(ctx, "Measuring datacenter latencies", "dcs", extractKeys(dcs))

if len(dcs) == 0 {
return nil, errors.Errorf("no dcs to choose from")
Expand Down
86 changes: 86 additions & 0 deletions pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,92 @@ func (c *Client) RaftReadBarrier(ctx context.Context, host, groupID string) erro
return nil
}

// ScyllaBackup schedules Scylla backup task and returns its ID.
func (c *Client) ScyllaBackup(ctx context.Context, host, endpoint, bucket, prefix, keyspace, table, snapshotTag string) (string, error) {
resp, err := c.scyllaOps.StorageServiceBackupPost(&operations.StorageServiceBackupPostParams{
Context: forceHost(ctx, host),
Endpoint: endpoint,
Bucket: bucket,
Prefix: prefix,
Keyspace: keyspace,
Table: table,
Snapshot: &snapshotTag,
})
if err != nil {
return "", err
}
return resp.GetPayload(), nil
}

// ScyllaTaskState describes Scylla task state.
type ScyllaTaskState string

// Possible ScyllaTaskState.
const (
ScyllaTaskStateCreated ScyllaTaskState = "created"
ScyllaTaskStateRunning ScyllaTaskState = "running"
ScyllaTaskStateDone ScyllaTaskState = "done"
ScyllaTaskStateFailed ScyllaTaskState = "failed"
)

func isScyllaTaskRunning(err error) bool {
// Scylla API call might return earlier due to timeout (see swagger definition)
status, _ := StatusCodeAndMessageOf(err)
return status == http.StatusRequestTimeout
}

func scyllaWaitTaskShouldRetryHandler(err error) *bool {
if isScyllaTaskRunning(err) {
return pointer.BoolPtr(false)
}
return nil
}

// ScyllaWaitTask long polls Scylla task status.
func (c *Client) ScyllaWaitTask(ctx context.Context, host, id string, longPollingSeconds int64) (*models.TaskStatus, error) {
ctx = withShouldRetryHandler(ctx, scyllaWaitTaskShouldRetryHandler)
ctx = forceHost(ctx, host)
ctx = noTimeout(ctx)
p := &operations.TaskManagerWaitTaskTaskIDGetParams{
Context: ctx,
TaskID: id,
}
if longPollingSeconds > 0 {
p.SetTimeout(&longPollingSeconds)
}

resp, err := c.scyllaOps.TaskManagerWaitTaskTaskIDGet(p)
if err != nil {
if isScyllaTaskRunning(err) {
return c.ScyllaTaskProgress(ctx, host, id)
}
return nil, err
}
return resp.GetPayload(), nil
}

// ScyllaTaskProgress returns provided Scylla task status.
func (c *Client) ScyllaTaskProgress(ctx context.Context, host, id string) (*models.TaskStatus, error) {
resp, err := c.scyllaOps.TaskManagerTaskStatusTaskIDGet(&operations.TaskManagerTaskStatusTaskIDGetParams{
Context: forceHost(ctx, host),
TaskID: id,
})
if err != nil {
return nil, err
}
return resp.GetPayload(), nil
}

// ScyllaAbortTask aborts provided Scylla task.
// Note that not all Scylla tasks can be aborted - see models.TaskStatus to check that.
func (c *Client) ScyllaAbortTask(ctx context.Context, host, id string) error {
_, err := c.scyllaOps.TaskManagerAbortTaskTaskIDPost(&operations.TaskManagerAbortTaskTaskIDPostParams{
Context: forceHost(ctx, host),
TaskID: id,
})
return err
}

// ToCanonicalIP replaces ":0:0" in IPv6 addresses with "::"
// ToCanonicalIP("192.168.0.1") -> "192.168.0.1"
// ToCanonicalIP("100:200:0:0:0:0:0:1") -> "100:200::1".
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/backup/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ type RunProgress struct {
Unit int64
TableName string

AgentJobID int64
// Uploading SSTables could be done by either Rclone or Scylla API.
// Only one of those IDs should be set.
AgentJobID int64
ScyllaTaskID string

StartedAt *time.Time
CompletedAt *time.Time
Error string
Expand Down
6 changes: 6 additions & 0 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
return errors.Wrap(err, "invalid cluster")
}

nodeConfig, err := s.configCache.ReadAll(clusterID)
if err != nil {
return errors.Wrap(err, "read all nodes config")
}

Comment on lines +680 to +684
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's safer and more accurate to force config cache to update cluster configuration first ? And then call to Read All.

// Create a worker
w := &worker{
workerTools: workerTools{
Expand All @@ -687,6 +692,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
SnapshotTag: run.SnapshotTag,
Config: s.config,
Client: client,
NodeConfig: nodeConfig,
},
PrevStage: run.Stage,
Metrics: s.metrics,
Expand Down
Loading
Loading