Skip to content

Commit

Permalink
Merge pull request #6226 from Lyndon-Li/data-mover-generic-data-path
Browse files Browse the repository at this point in the history
Add code change for async generic data path
  • Loading branch information
Lyndon-Li authored May 22, 2023
2 parents c34880e + 9ab8589 commit 25fb08b
Show file tree
Hide file tree
Showing 22 changed files with 1,314 additions and 312 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6226-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add code change for async generic data path that is used by both PVB/PVR and data mover
55 changes: 31 additions & 24 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
clocks "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -52,6 +51,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"
)
Expand All @@ -67,11 +67,22 @@ const (
// defaultCredentialsDirectory is the path on disk where credential
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"

defaultResourceTimeout = 10 * time.Minute
)

type nodeAgentServerConfig struct {
metricsAddress string
resourceTimeout time.Duration
}

func NewServerCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := nodeAgentServerConfig{
metricsAddress: defaultMetricsAddress,
resourceTimeout: defaultResourceTimeout,
}

command := &cobra.Command{
Use: "server",
Expand All @@ -86,7 +97,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
logger.Infof("Starting Velero node-agent server %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
s, err := newNodeAgentServer(logger, f, defaultMetricsAddress)
s, err := newNodeAgentServer(logger, f, config)
cmd.CheckError(err)

s.run()
Expand All @@ -95,6 +106,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {

command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")

return command
}
Expand All @@ -109,9 +121,10 @@ type nodeAgentServer struct {
metricsAddress string
namespace string
nodeName string
config nodeAgentServerConfig
}

func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*nodeAgentServer, error) {
func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) {
ctx, cancelFunc := context.WithCancel(context.Background())

clientConfig, err := factory.ClientConfig()
Expand Down Expand Up @@ -159,14 +172,14 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, metri
}

s := &nodeAgentServer{
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
nodeName: nodeName,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
config: config,
namespace: factory.Namespace(),
nodeName: nodeName,
}

// the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot
Expand Down Expand Up @@ -222,21 +235,15 @@ func (s *nodeAgentServer) run() {
}

credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore}
pvbReconciler := controller.PodVolumeBackupReconciler{
Scheme: s.mgr.GetScheme(),
Client: s.mgr.GetClient(),
Clock: clocks.RealClock{},
Metrics: s.metrics,
CredentialGetter: credentialGetter,
NodeName: s.nodeName,
FileSystem: filesystem.NewFileSystem(),
Log: s.logger,
}
repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)
pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer,
credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger)

if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}

if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialGetter).SetupWithManager(s.mgr); err != nil {
if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

Expand Down Expand Up @@ -327,7 +334,7 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {

if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i],
fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
time.Now()); err != nil {
time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
continue
}
Expand Down Expand Up @@ -363,7 +370,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) {

if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i],
fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed),
time.Now()); err != nil {
time.Now(), s.logger); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName())
continue
}
Expand Down
Loading

0 comments on commit 25fb08b

Please sign in to comment.