Skip to content

Commit

Permalink
Merge pull request #6493 from allenxu404/i6130
Browse files Browse the repository at this point in the history
Add data upload/download metrics
  • Loading branch information
qiuming-best authored Jul 14, 2023
2 parents 967152c + 084fd66 commit 82e1ebb
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6493-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add data upload and download metrics
8 changes: 4 additions & 4 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (s *nodeAgentServer) run() {
s.logger.Fatalf("Failed to start metric server for node agent at [%s]: %v", s.metricsAddress, err)
}
}()
s.metrics = metrics.NewPodVolumeMetrics()
s.metrics = metrics.NewNodeMetrics()
s.metrics.RegisterAllMetrics()
s.metrics.InitPodVolumeMetricsForNode(s.nodeName)
s.metrics.InitMetricsForNode(s.nodeName)

s.markInProgressCRsFailed()

Expand Down Expand Up @@ -260,13 +260,13 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataUploadsCancel(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.markDataDownloadsCancel(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
repository "github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand All @@ -63,10 +64,11 @@ type DataDownloadReconciler struct {
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler {
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -79,6 +81,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
metrics: metrics,
}
}

Expand Down Expand Up @@ -301,6 +304,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
log.WithError(err).Error("error updating data download status")
} else {
log.Infof("Data download is marked as %s", dd.Status.Phase)
r.metrics.RegisterDataDownloadSuccess(r.nodeName)
}
}

Expand Down Expand Up @@ -343,6 +347,8 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
dd.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating data download status")
} else {
r.metrics.RegisterDataDownloadCancel(r.nodeName)
}
}
}
Expand Down Expand Up @@ -497,6 +503,8 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v

if patchErr := r.client.Patch(ctx, dd, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataDownload status")
} else {
r.metrics.RegisterDataDownloadFailure(r.nodeName)
}

return err
Expand Down Expand Up @@ -548,6 +556,8 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))

log.Info("Dataupload has been cleaned up")

r.metrics.RegisterDataDownloadFailure(r.nodeName)
}

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"

Expand Down Expand Up @@ -136,7 +137,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...
if err != nil {
return nil, err
}
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil
return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand All @@ -71,11 +72,12 @@ type DataUploadReconciler struct {
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution,
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler {
cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -89,6 +91,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa
snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)},
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
metrics: metrics,
}
}

Expand Down Expand Up @@ -308,8 +311,10 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp

if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
} else {
log.Info("Data upload completed")
r.metrics.RegisterDataUploadSuccess(r.nodeName)
}
log.Info("Data upload completed")
}

func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
Expand Down Expand Up @@ -360,6 +365,8 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error updating DataUpload status")
} else {
r.metrics.RegisterDataUploadCancel(r.nodeName)
}
}
}
Expand Down Expand Up @@ -518,6 +525,8 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
} else {
r.metrics.RegisterDataUploadFailure(r.nodeName)
}

return err
Expand Down Expand Up @@ -580,6 +589,8 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov

log.Info("Dataupload has been cleaned up")
}

r.metrics.RegisterDataUploadFailure(r.nodeName)
}

func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
Expand Down Expand Up @@ -193,7 +194,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
return nil, err
}
return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil,
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil
testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func dataUploadBuilder() *builder.DataUploadBuilder {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/pod_volume_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
r := PodVolumeBackupReconciler{
Client: fakeClient,
clock: testclocks.NewFakeClock(now),
metrics: metrics.NewPodVolumeMetrics(),
metrics: metrics.NewNodeMetrics(),
credentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore},
nodeName: "test_node",
fileSystem: fakeFS,
Expand Down
120 changes: 118 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ const (
podVolumeOperationLatencySeconds = "pod_volume_operation_latency_seconds"
podVolumeOperationLatencyGaugeSeconds = "pod_volume_operation_latency_seconds_gauge"

// data mover metrics
DataUploadSuccessTotal = "data_upload_success_total"
DataUploadFailureTotal = "data_upload_failure_total"
DataUploadCancelTotal = "data_upload_cancel_total"
DataDownloadSuccessTotal = "data_download_success_total"
DataDownloadFailureTotal = "data_download_failure_total"
DataDownloadCancelTotal = "data_download_cancel_total"

// Labels
nodeMetricLabel = "node"
podVolumeOperationLabel = "operation"
Expand Down Expand Up @@ -319,7 +327,7 @@ func NewServerMetrics() *ServerMetrics {
}
}

func NewPodVolumeMetrics() *ServerMetrics {
func NewNodeMetrics() *ServerMetrics {
return &ServerMetrics{
metrics: map[string]prometheus.Collector{
podVolumeBackupEnqueueTotal: prometheus.NewCounterVec(
Expand Down Expand Up @@ -365,6 +373,54 @@ func NewPodVolumeMetrics() *ServerMetrics {
},
[]string{nodeMetricLabel, podVolumeOperationLabel, backupNameLabel, pvbNameLabel},
),
DataUploadSuccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadSuccessTotal,
Help: "Total number of successful uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataUploadFailureTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadFailureTotal,
Help: "Total number of failed uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataUploadCancelTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataUploadCancelTotal,
Help: "Total number of canceled uploaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadSuccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadSuccessTotal,
Help: "Total number of successful downloaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadFailureTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadFailureTotal,
Help: "Total number of failed downloaded snapshots",
},
[]string{nodeMetricLabel},
),
DataDownloadCancelTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: podVolumeMetricsNamespace,
Name: DataDownloadCancelTotal,
Help: "Total number of canceled downloaded snapshots",
},
[]string{nodeMetricLabel},
),
},
}
}
Expand Down Expand Up @@ -450,13 +506,31 @@ func (m *ServerMetrics) InitSchedule(scheduleName string) {
}

// InitSchedule initializes counter metrics for a node.
func (m *ServerMetrics) InitPodVolumeMetricsForNode(node string) {
func (m *ServerMetrics) InitMetricsForNode(node string) {
if c, ok := m.metrics[podVolumeBackupEnqueueTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[podVolumeBackupDequeueTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Add(0)
}
}

// RegisterPodVolumeBackupEnqueue records enqueuing of a PodVolumeBackup object.
Expand All @@ -473,6 +547,48 @@ func (m *ServerMetrics) RegisterPodVolumeBackupDequeue(node string) {
}
}

// RegisterDataUploadSuccess records successful uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadSuccess(node string) {
if c, ok := m.metrics[DataUploadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataUploadFailure records failed uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadFailure(node string) {
if c, ok := m.metrics[DataUploadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataUploadCancel records canceled uploaded snapshots.
func (m *ServerMetrics) RegisterDataUploadCancel(node string) {
if c, ok := m.metrics[DataUploadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadSuccess records successful downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadSuccess(node string) {
if c, ok := m.metrics[DataDownloadSuccessTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadFailure records failed downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadFailure(node string) {
if c, ok := m.metrics[DataDownloadFailureTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// RegisterDataDownloadCancel records canceled downloaded snapshots.
func (m *ServerMetrics) RegisterDataDownloadCancel(node string) {
if c, ok := m.metrics[DataDownloadCancelTotal].(*prometheus.CounterVec); ok {
c.WithLabelValues(node).Inc()
}
}

// ObservePodVolumeOpLatency records the number of seconds a pod volume operation took.
func (m *ServerMetrics) ObservePodVolumeOpLatency(node, pvbName, opName, backupName string, seconds float64) {
if h, ok := m.metrics[podVolumeOperationLatencySeconds].(*prometheus.HistogramVec); ok {
Expand Down

0 comments on commit 82e1ebb

Please sign in to comment.