Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Dec 10, 2024
1 parent 63066cf commit a2a02ef
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 54 deletions.
2 changes: 2 additions & 0 deletions pkg/metrics/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (m RestoreMetrics) SetProgress(labels RestoreProgressLabels, progress float
m.progress.With(l).Set(progress)
}

// TODO: add new states for Scylla restore.

// RestoreState is the enum that defines how node is used during the restore.
type RestoreState int

Expand Down
100 changes: 60 additions & 40 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"os"
"path"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -143,27 +142,37 @@ func defaultConfig() backup.Config {
return c
}

func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(method string, path string) {
var (
brokenHost string
mu sync.Mutex
)
func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(paths ...string) {
brokenHost := atomic.NewString("")
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if req.Method == method && req.URL.Path == path {
mu.Lock()
defer mu.Unlock()

if brokenHost == "" {
h.T.Log("Setting broken host", req.Host)
brokenHost = req.Host
for _, p := range paths {
if strings.HasPrefix(req.URL.Path, p) {
if brokenHost.CompareAndSwap("", req.Host) {
h.T.Log("Setting broken host", req.Host)
}
if brokenHost.Load() == req.Host {
return nil, errors.New("dial error")
}
}
}
return nil, nil
}))
}

if brokenHost == req.Host {
return nil, errors.New("dial error")
func (h *backupTestHelper) setInterceptorWaitPath(paths ...string) chan struct{} {
guard := atomic.NewBool(false)
wait := make(chan struct{})
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
for _, p := range paths {
if strings.HasPrefix(req.URL.Path, p) {
if guard.CompareAndSwap(false, true) {
close(wait)
}
}
}
return nil, nil
}))
return wait
}

func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) {
Expand Down Expand Up @@ -242,19 +251,21 @@ func (h *backupTestHelper) waitManifestUploaded() {
}

func (h *backupTestHelper) waitNoTransfers() {
h.waitCond(func() bool {
h.T.Helper()
for _, host := range h.GetAllHosts() {
job, err := h.Client.RcloneJobInfo(context.Background(), host, scyllaclient.GlobalProgressID, longPollingTimeoutSeconds)
if err != nil {
h.T.Fatal(err)
}
if len(job.Stats.Transferring) > 0 {
return false
}
}
return true
})
// TODO: clean this up
time.Sleep(time.Second)
//h.waitCond(func() bool {
// h.T.Helper()
// for _, host := range h.GetAllHosts() {
// job, err := h.Client.RcloneJobInfo(context.Background(), host, scyllaclient.GlobalProgressID, longPollingTimeoutSeconds)
// if err != nil {
// h.T.Fatal(err)
// }
// if len(job.Stats.Transferring) > 0 {
// return false
// }
// }
// return true
//})
}

func (h *backupTestHelper) tamperWithManifest(ctx context.Context, manifestsPath string, f func(ManifestInfoWithContent) bool) {
Expand Down Expand Up @@ -1003,6 +1014,8 @@ func TestBackupResumeIntegration(t *testing.T) {
done = make(chan struct{})
)

upload := h.setInterceptorWaitPath("/storage_service/backup", "/rclone/sync/copydir")

if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}
Expand All @@ -1020,11 +1033,14 @@ func TestBackupResumeIntegration(t *testing.T) {
}
}()

h.waitTransfersStarted()

Print("And: context is canceled")
cancel()
<-ctx.Done()
select {
case <-time.After(backupTimeout):
t.Fatalf("Backup failed to complete in under %s", backupTimeout)
case <-upload:
Print("And: context is canceled")
cancel()
<-ctx.Done()
}

select {
case <-time.After(backupTimeout):
Expand Down Expand Up @@ -1060,6 +1076,8 @@ func TestBackupResumeIntegration(t *testing.T) {
)
defer cancel()

upload := h.setInterceptorWaitPath("/storage_service/backup", "/rclone/sync/copydir")

if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}
Expand All @@ -1073,16 +1091,18 @@ func TestBackupResumeIntegration(t *testing.T) {
close(done)
}()

h.waitTransfersStarted()

Print("And: we restart the agents")
restartAgents(h.CommonTestHelper)
select {
case <-time.After(backupTimeout * 3):
t.Fatalf("Backup failed to complete in under %s", backupTimeout*3)
case <-upload:
Print("And: we restart the agents")
restartAgents(h.CommonTestHelper)
}

select {
case <-time.After(backupTimeout * 3):
t.Fatalf("Backup failed to complete in under %s", backupTimeout*3)
case <-done:
Print("Then: backup completed execution")
}

Print("And: nothing is transferring")
Expand All @@ -1095,7 +1115,7 @@ func TestBackupResumeIntegration(t *testing.T) {
t.Run("resume after snapshot failed", func(t *testing.T) {
h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: snapshot fails on a host")
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/storage_service/snapshots")
h.setInterceptorBlockEndpointOnFirstHost("/storage_service/snapshots")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1133,7 +1153,7 @@ func TestBackupResumeIntegration(t *testing.T) {
t.Run("resume after upload failed", func(t *testing.T) {
h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: upload fails on a host")
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/agent/rclone/job/progress")
h.setInterceptorBlockEndpointOnFirstHost("/rclone/job/progress", "/task_manager/wait_task")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (t *View) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error
return gocql.Unmarshal(info, data, f.Addr().Interface())
}

// TODO - new table and struct for scylla api progress?

// RunProgress describes progress of restoring a single batch.
type RunProgress struct {
ClusterID uuid.UUID
Expand Down
29 changes: 15 additions & 14 deletions pkg/service/restore/tablesdir_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch)
// TODO: resolve endpoint by either:
// - making agent return endpoint information to SM <- preferred
// - allowing for specifying endpoint instead of backed in --location flag
prefix, ok := strings.CutPrefix(b.RemoteSSTableDir, b.Location.Path)
prefix, ok := strings.CutPrefix(b.RemoteSSTableDir, b.Location.StringWithoutDC())
if !ok {
return errors.Errorf("")
return errors.Errorf("asdasd")
}
id, err := w.client.ScyllaRestore(ctx, host, "192.168.200.99", b.Location.Path, prefix, b.Keyspace, b.Table, b.TOC())
if err != nil {
Expand All @@ -313,18 +313,19 @@ func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch)
}

func (w *tablesWorker) scyllaWaitTask(ctx context.Context, pr *RunProgress, b batch) (err error) {
defer func() {
// On error abort task
if err != nil {
if e := w.client.ScyllaAbortTask(context.Background(), pr.Host, pr.ScyllaTaskID); e != nil {
w.logger.Error(ctx, "Failed to abort task",
"host", pr.Host,
"id", pr.ScyllaTaskID,
"error", e,
)
}
}
}()
// TODO: restore task cannot be aborted.
//defer func() {

Check failure on line 317 in pkg/service/restore/tablesdir_worker.go

View workflow job for this annotation

GitHub Actions / Various checks

commentFormatting: put a space between `//` and comment text (gocritic)
// // On error abort task
// if err != nil {
// if e := w.client.ScyllaAbortTask(context.Background(), pr.Host, pr.ScyllaTaskID); e != nil {
// w.logger.Error(ctx, "Failed to abort task",
// "host", pr.Host,
// "id", pr.ScyllaTaskID,
// "error", e,
// )
// }
// }
//}()

for {
if ctx.Err() != nil {
Expand Down

0 comments on commit a2a02ef

Please sign in to comment.