Skip to content

Commit

Permalink
data mover ms watcher
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Jul 11, 2024
1 parent 6a3e226 commit 03cf952
Show file tree
Hide file tree
Showing 16 changed files with 1,771 additions and 11 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7999-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover ms watcher according to design #7574
3 changes: 3 additions & 0 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi
&velerov2alpha1api.DataDownload{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},
&v1.Event{}: {
Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(),
},

Check warning on line 194 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L192-L194

Added lines #L192 - L194 were not covered by tests
},
}
mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{
Expand Down
20 changes: 20 additions & 0 deletions pkg/datapath/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds")
var FSBRCreator = newFileSystemBR
var MicroServiceBRWatcherCreator = newMicroServiceBRWatcher

type Manager struct {
cocurrentNum int
Expand Down Expand Up @@ -56,6 +59,23 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c
return m.tracker[jobName], nil
}

// CreateMicroServiceBRWatcher creates a new micro service watcher instance
func (m *Manager) CreateMicroServiceBRWatcher(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string,
taskName string, namespace string, podName string, containerName string, associatedObject string, callbacks Callbacks, resume bool, log logrus.FieldLogger) (AsyncBR, error) {
m.trackerLock.Lock()
defer m.trackerLock.Unlock()

if !resume {
if len(m.tracker) >= m.cocurrentNum {
return nil, ConcurrentLimitExceed
}
}

m.tracker[taskName] = MicroServiceBRWatcherCreator(client, kubeClient, mgr, taskType, taskName, namespace, podName, containerName, associatedObject, callbacks, log)

return m.tracker[taskName], nil
}

// RemoveAsyncBR removes a file system backup/restore data path instance
func (m *Manager) RemoveAsyncBR(jobName string) {
m.trackerLock.Lock()
Expand Down
36 changes: 35 additions & 1 deletion pkg/datapath/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestManager(t *testing.T) {
func TestCreateFileSystemBR(t *testing.T) {
m := NewManager(2)

async_job_1, err := m.CreateFileSystemBR("job-1", "test", context.TODO(), nil, "velero", Callbacks{}, nil)
Expand All @@ -50,3 +50,37 @@ func TestManager(t *testing.T) {
ret = m.GetAsyncBR("job-1")
assert.Nil(t, ret)
}

func TestCreateMicroServiceBRWatcher(t *testing.T) {
m := NewManager(2)

async_job_1, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-1", "velero", "pod-1", "container", "du-1", Callbacks{}, false, nil)
assert.NoError(t, err)

_, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-2", "velero", "pod-2", "container", "du-2", Callbacks{}, false, nil)
assert.NoError(t, err)

_, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-3", "velero", "pod-3", "container", "du-3", Callbacks{}, false, nil)
assert.Equal(t, ConcurrentLimitExceed, err)

async_job_4, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-4", "velero", "pod-4", "container", "du-4", Callbacks{}, true, nil)
assert.NoError(t, err)

ret := m.GetAsyncBR("job-0")
assert.Nil(t, ret)

ret = m.GetAsyncBR("job-1")
assert.Equal(t, async_job_1, ret)

ret = m.GetAsyncBR("job-4")
assert.Equal(t, async_job_4, ret)

m.RemoveAsyncBR("job-0")
assert.Len(t, m.tracker, 3)

m.RemoveAsyncBR("job-1")
assert.Len(t, m.tracker, 2)

ret = m.GetAsyncBR("job-1")
assert.Nil(t, ret)
}
Loading

0 comments on commit 03cf952

Please sign in to comment.