From 03cf95248988eaff49de99990870d44bd968d47d Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 11 Jul 2024 14:15:28 +0800 Subject: [PATCH] data mover ms watcher Signed-off-by: Lyndon-Li --- changelogs/unreleased/7999-Lyndon-Li | 1 + pkg/cmd/cli/nodeagent/server.go | 3 + pkg/datapath/manager.go | 20 + pkg/datapath/manager_test.go | 36 +- pkg/datapath/micro_service_watcher.go | 436 +++++++++++++++ pkg/datapath/micro_service_watcher_test.go | 603 +++++++++++++++++++++ pkg/datapath/types.go | 4 +- pkg/exposer/csi_snapshot_test.go | 2 +- pkg/exposer/generic_restore_test.go | 2 +- pkg/test/test_logger.go | 12 + pkg/util/kube/pod.go | 72 ++- pkg/util/kube/pod_test.go | 275 ++++++++++ pkg/util/logging/default_logger.go | 16 +- pkg/util/logging/default_logger_test.go | 2 +- pkg/util/logging/log_merge_hook.go | 113 ++++ pkg/util/logging/log_merge_hook_test.go | 185 +++++++ 16 files changed, 1771 insertions(+), 11 deletions(-) create mode 100644 changelogs/unreleased/7999-Lyndon-Li create mode 100644 pkg/datapath/micro_service_watcher.go create mode 100644 pkg/datapath/micro_service_watcher_test.go create mode 100644 pkg/util/logging/log_merge_hook.go create mode 100644 pkg/util/logging/log_merge_hook_test.go diff --git a/changelogs/unreleased/7999-Lyndon-Li b/changelogs/unreleased/7999-Lyndon-Li new file mode 100644 index 0000000000..a5ddf85ba8 --- /dev/null +++ b/changelogs/unreleased/7999-Lyndon-Li @@ -0,0 +1 @@ +Data mover ms watcher according to design #7574 \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 2748569f3d..3a18ab8629 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -189,6 +189,9 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, confi &velerov2alpha1api.DataDownload{}: { Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), }, + &v1.Event{}: { + Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), + }, }, } mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{ diff --git a/pkg/datapath/manager.go b/pkg/datapath/manager.go index df60f165b3..0b790a5cc9 100644 --- a/pkg/datapath/manager.go +++ b/pkg/datapath/manager.go @@ -22,11 +22,14 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" ) var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds") var FSBRCreator = newFileSystemBR +var MicroServiceBRWatcherCreator = newMicroServiceBRWatcher type Manager struct { cocurrentNum int @@ -56,6 +59,23 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c return m.tracker[jobName], nil } +// CreateMicroServiceBRWatcher creates a new micro service watcher instance +func (m *Manager) CreateMicroServiceBRWatcher(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, + taskName string, namespace string, podName string, containerName string, associatedObject string, callbacks Callbacks, resume bool, log logrus.FieldLogger) (AsyncBR, error) { + m.trackerLock.Lock() + defer m.trackerLock.Unlock() + + if !resume { + if len(m.tracker) >= m.cocurrentNum { + return nil, ConcurrentLimitExceed + } + } + + m.tracker[taskName] = MicroServiceBRWatcherCreator(client, kubeClient, mgr, taskType, taskName, namespace, podName, containerName, associatedObject, callbacks, log) + + return m.tracker[taskName], nil +} + // RemoveAsyncBR removes a file system backup/restore data path instance func (m *Manager) RemoveAsyncBR(jobName string) { m.trackerLock.Lock() diff --git a/pkg/datapath/manager_test.go b/pkg/datapath/manager_test.go index fda574400e..0db6051341 100644 --- a/pkg/datapath/manager_test.go +++ b/pkg/datapath/manager_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestManager(t *testing.T) { +func TestCreateFileSystemBR(t *testing.T) { m := NewManager(2) async_job_1, err := m.CreateFileSystemBR("job-1", "test", context.TODO(), nil, "velero", Callbacks{}, nil) @@ -50,3 +50,37 @@ func TestManager(t *testing.T) { ret = m.GetAsyncBR("job-1") assert.Nil(t, ret) } + +func TestCreateMicroServiceBRWatcher(t *testing.T) { + m := NewManager(2) + + async_job_1, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-1", "velero", "pod-1", "container", "du-1", Callbacks{}, false, nil) + assert.NoError(t, err) + + _, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-2", "velero", "pod-2", "container", "du-2", Callbacks{}, false, nil) + assert.NoError(t, err) + + _, err = m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-3", "velero", "pod-3", "container", "du-3", Callbacks{}, false, nil) + assert.Equal(t, ConcurrentLimitExceed, err) + + async_job_4, err := m.CreateMicroServiceBRWatcher(context.TODO(), nil, nil, nil, "test", "job-4", "velero", "pod-4", "container", "du-4", Callbacks{}, true, nil) + assert.NoError(t, err) + + ret := m.GetAsyncBR("job-0") + assert.Nil(t, ret) + + ret = m.GetAsyncBR("job-1") + assert.Equal(t, async_job_1, ret) + + ret = m.GetAsyncBR("job-4") + assert.Equal(t, async_job_4, ret) + + m.RemoveAsyncBR("job-0") + assert.Len(t, m.tracker, 3) + + m.RemoveAsyncBR("job-1") + assert.Len(t, m.tracker, 2) + + ret = m.GetAsyncBR("job-1") + assert.Nil(t, ret) +} diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go new file mode 100644 index 0000000000..80cc764bec --- /dev/null +++ b/pkg/datapath/micro_service_watcher.go @@ -0,0 +1,436 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "encoding/json" + "os" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +const ( + TaskTypeBackup = "backup" + TaskTypeRestore = "restore" + + ErrCancelled = "data path is canceled" + + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" +) + +type microServiceBRWatcher struct { + ctx context.Context + cancel context.CancelFunc + log logrus.FieldLogger + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + namespace string + callbacks Callbacks + taskName string + taskType string + thisPod string + thisContainer string + associatedObject string + eventCh chan *v1.Event + podCh chan *v1.Pod + startedFromEvent bool + terminatedFromEvent bool + wgWatcher sync.WaitGroup + eventInformer ctrlcache.Informer + podInformer ctrlcache.Informer + eventHandler cache.ResourceEventHandlerRegistration + podHandler cache.ResourceEventHandlerRegistration +} + +func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, + podName string, containerName string, associatedObject string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { + ms := µServiceBRWatcher{ + mgr: mgr, + client: client, + kubeClient: kubeClient, + namespace: namespace, + callbacks: callbacks, + taskType: taskType, + taskName: taskName, + thisPod: podName, + thisContainer: containerName, + associatedObject: associatedObject, + eventCh: make(chan *v1.Event, 10), + podCh: make(chan *v1.Pod, 2), + wgWatcher: sync.WaitGroup{}, + log: log, + } + + return ms +} + +func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error { + ms.ctx, ms.cancel = context.WithCancel(ctx) + succeeded := false + + eventInformer, err := ms.mgr.GetCache().GetInformer(ms.ctx, &v1.Event{}) + if err != nil { + return errors.Wrap(err, "error getting event informer") + } + + podInformer, err := ms.mgr.GetCache().GetInformer(ms.ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + eventHandler, err := eventInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + evt := obj.(*v1.Event) + if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject { + return + } + + ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) + + ms.eventCh <- evt + }, + UpdateFunc: func(_, obj interface{}) { + evt := obj.(*v1.Event) + if evt.InvolvedObject.Namespace != ms.namespace || evt.InvolvedObject.Name != ms.associatedObject { + return + } + + ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) + + ms.eventCh <- evt + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error registering event handler") + } + + defer func() { + if !succeeded { + if err := eventInformer.RemoveEventHandler(eventHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove event handler") + } + } + }() + + podHandler, err := podInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + pod := obj.(*v1.Pod) + if pod.Namespace != ms.namespace || pod.Name != ms.thisPod { + return + } + + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + ms.podCh <- pod + } + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error registering pod handler") + } + + defer func() { + if !succeeded { + if err := podInformer.RemoveEventHandler(podHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove pod handler") + } + } + }() + + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") + + ms.eventInformer = eventInformer + ms.podInformer = podInformer + ms.eventHandler = eventHandler + ms.podHandler = podHandler + + succeeded = true + + return nil +} + +func (ms *microServiceBRWatcher) Close(ctx context.Context) { + if ms.cancel != nil { + ms.cancel() + ms.cancel = nil + } + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") + + ms.wgWatcher.Wait() + + if ms.eventInformer != nil && ms.eventHandler != nil { + if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove event handler") + } + } + + if ms.podInformer != nil && ms.podHandler != nil { + if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { + ms.log.WithError(err).Warn("Failed to remove pod handler") + } + } + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { + ms.log.Infof("Start watching backup ms for source %v", source) + + if err := ms.reEnsureThisPod(); err != nil { + return err + } + + ms.startWatch() + + return nil +} + +func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { + ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) + + if err := ms.reEnsureThisPod(); err != nil { + return err + } + + ms.startWatch() + + return nil +} + +func (ms *microServiceBRWatcher) reEnsureThisPod() error { + thisPod := &v1.Pod{} + if err := ms.client.Get(ms.ctx, types.NamespacedName{ + Namespace: ms.namespace, + Name: ms.thisPod, + }, thisPod); err != nil { + return errors.Wrapf(err, "error getting this pod %s", ms.thisPod) + } + + if thisPod.Status.Phase == v1.PodSucceeded || thisPod.Status.Phase == v1.PodFailed { + ms.podCh <- thisPod + ms.log.WithField("this pod", ms.thisPod).Infof("This pod comes to terminital status %s before watch start", thisPod.Status.Phase) + } + + return nil +} + +var funcGetPodTerminationMessage = kube.GetPodContainerTerminateMessage +var funcRedirectLog = redirectDataMoverLogs +var funcGetResultFromMessage = getResultFromMessage +var funcGetProgressFromMessage = getProgressFromMessage + +var eventWaitTimeout time.Duration = time.Minute + +func (ms *microServiceBRWatcher) startWatch() { + ms.wgWatcher.Add(1) + + go func() { + ms.log.Info("Start watching data path pod") + + var lastPod *v1.Pod + + watchLoop: + for { + select { + case <-ms.ctx.Done(): + break watchLoop + case pod := <-ms.podCh: + lastPod = pod + break watchLoop + case evt := <-ms.eventCh: + ms.onEvent(evt) + } + } + + if lastPod == nil { + ms.log.Warn("Data path pod watch loop is canceled") + ms.wgWatcher.Done() + return + } + + epilogLoop: + for !ms.startedFromEvent || !ms.terminatedFromEvent { + select { + case <-time.After(eventWaitTimeout): + break epilogLoop + case evt := <-ms.eventCh: + ms.onEvent(evt) + } + } + + terminateMessage := funcGetPodTerminationMessage(lastPod, ms.thisContainer) + + logger := ms.log.WithField("data path pod", lastPod.Name) + + logger.Infof("Finish waiting data path pod, phase %s, message %s", lastPod.Status.Phase, terminateMessage) + + if !ms.startedFromEvent { + logger.Warn("VGDP seems not started") + } + + if ms.startedFromEvent && !ms.terminatedFromEvent { + logger.Warn("VGDP started but termination event is not received") + } + + logger.Info("Recording data path pod logs") + + if err := funcRedirectLog(ms.ctx, ms.kubeClient, ms.namespace, lastPod.Name, ms.thisContainer, ms.log); err != nil { + logger.WithError(err).Warn("Failed to collect data mover logs") + } + + logger.Info("Calling callback on data path pod termination") + + if lastPod.Status.Phase == v1.PodSucceeded { + ms.callbacks.OnCompleted(ms.ctx, ms.namespace, ms.taskName, funcGetResultFromMessage(ms.taskType, terminateMessage, ms.log)) + } else { + if terminateMessage == ErrCancelled { + ms.callbacks.OnCancelled(ms.ctx, ms.namespace, ms.taskName) + } else { + ms.callbacks.OnFailed(ms.ctx, ms.namespace, ms.taskName, errors.New(terminateMessage)) + } + } + + logger.Info("Complete callback on data path pod termination") + + ms.wgWatcher.Done() + }() +} + +func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { + switch evt.Reason { + case EventReasonStarted: + ms.startedFromEvent = true + ms.log.Infof("Received data path start message %s", evt.Message) + case EventReasonProgress: + ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) + case EventReasonCompleted: + ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.terminatedFromEvent = true + case EventReasonCancelled: + ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.terminatedFromEvent = true + case EventReasonFailed: + ms.log.Infof("Received data path failed message %s", evt.Message) + ms.terminatedFromEvent = true + default: + ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + } +} + +func getResultFromMessage(taskType string, message string, logger logrus.FieldLogger) Result { + result := Result{} + + if taskType == TaskTypeBackup { + backupResult := BackupResult{} + err := json.Unmarshal([]byte(message), &backupResult) + if err != nil { + logger.WithError(err).Errorf("Failed to unmarshal result message %s", message) + } else { + result.Backup = backupResult + } + } else { + restoreResult := RestoreResult{} + err := json.Unmarshal([]byte(message), &restoreResult) + if err != nil { + logger.WithError(err).Errorf("Failed to unmarshal result message %s", message) + } else { + result.Restore = restoreResult + } + } + + return result +} + +func getProgressFromMessage(message string, logger logrus.FieldLogger) *uploader.Progress { + progress := &uploader.Progress{} + err := json.Unmarshal([]byte(message), progress) + if err != nil { + logger.WithError(err).Debugf("Failed to unmarshal progress message %s", message) + } + + return progress +} + +func (ms *microServiceBRWatcher) Cancel() { + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is canceled") +} + +var funcCreateTemp = os.CreateTemp +var funcCollectPodLogs = kube.CollectPodLogs + +func redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error { + logger.Infof("Starting to collect data mover pod log for %s", thisPod) + + logFile, err := funcCreateTemp("", "") + if err != nil { + return errors.Wrap(err, "error to create temp file for data mover pod log") + } + + defer logFile.Close() + + logFileName := logFile.Name() + logger.Infof("Created log file %s", logFileName) + + err = funcCollectPodLogs(ctx, kubeClient.CoreV1(), thisPod, namespace, thisContainer, logFile) + if err != nil { + return errors.Wrapf(err, "error to collect logs to %s for data mover pod %s", logFileName, thisPod) + } + + logFile.Close() + + logger.Infof("Redirecting to log file %s", logFileName) + + hookLogger := logger.WithField(logging.LogSourceKey, logFileName) + hookLogger.Logln(logging.ListeningLevel, logging.ListeningMessage) + + logger.Infof("Completed to collect data mover pod log for %s", thisPod) + + return nil +} diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go new file mode 100644 index 0000000000..f10f6b3310 --- /dev/null +++ b/pkg/datapath/micro_service_watcher_test.go @@ -0,0 +1,603 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + kubeclientfake "k8s.io/client-go/kubernetes/fake" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +func TestReEnsureThisPod(t *testing.T) { + tests := []struct { + name string + namespace string + thisPod string + kubeClientObj []runtime.Object + expectChan bool + expectErr string + }{ + { + name: "get pod error", + thisPod: "fak-pod-1", + expectErr: "error getting this pod fak-pod-1: pods \"fak-pod-1\" not found", + }, + { + name: "get pod not in terminated state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodRunning).Result(), + }, + }, + { + name: "get pod succeed state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + }, + expectChan: true, + }, + { + name: "get pod failed state", + namespace: "velero", + thisPod: "fake-pod-1", + kubeClientObj: []runtime.Object{ + builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + }, + expectChan: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scheme := runtime.NewScheme() + v1.AddToScheme(scheme) + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + ms := µServiceBRWatcher{ + namespace: test.namespace, + thisPod: test.thisPod, + client: fakeClient, + podCh: make(chan *v1.Pod, 2), + log: velerotest.NewLogger(), + } + + err := ms.reEnsureThisPod() + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + if test.expectChan { + assert.Len(t, ms.podCh, 1) + pod := <-ms.podCh + assert.Equal(t, pod.Name, test.thisPod) + } + } + }) + } +} + +type startWatchFake struct { + terminationMessage string + redirectErr error + complete bool + failed bool + canceled bool + progress int +} + +func (sw *startWatchFake) getPodContainerTerminateMessage(pod *v1.Pod, container string) string { + return sw.terminationMessage +} + +func (sw *startWatchFake) redirectDataMoverLogs(ctx context.Context, kubeClient kubernetes.Interface, namespace string, thisPod string, thisContainer string, logger logrus.FieldLogger) error { + return sw.redirectErr +} + +func (sw *startWatchFake) getResultFromMessage(_ string, _ string, _ logrus.FieldLogger) Result { + return Result{} +} + +func (sw *startWatchFake) OnCompleted(ctx context.Context, namespace string, task string, result Result) { + sw.complete = true +} + +func (sw *startWatchFake) OnFailed(ctx context.Context, namespace string, task string, err error) { + sw.failed = true +} + +func (sw *startWatchFake) OnCancelled(ctx context.Context, namespace string, task string) { + sw.canceled = true +} + +func (sw *startWatchFake) OnProgress(ctx context.Context, namespace string, task string, progress *uploader.Progress) { + sw.progress++ +} + +type insertEvent struct { + event *v1.Event + after time.Duration + delay time.Duration +} + +func TestStartWatch(t *testing.T) { + tests := []struct { + name string + namespace string + thisPod string + thisContainer string + terminationMessage string + redirectLogErr error + insertPod *v1.Pod + insertEventsBefore []insertEvent + insertEventsAfter []insertEvent + ctxCancel bool + expectStartEvent bool + expectTerminateEvent bool + expectComplete bool + expectCancel bool + expectFail bool + expectProgress int + }{ + { + name: "exit from ctx", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + ctxCancel: true, + }, + { + name: "completed with rantional sequence", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + delay: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed with redirect error", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + }, + }, + redirectLogErr: errors.New("fake-error"), + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "complete but terminated event not received in time", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + }, + insertEventsAfter: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + after: time.Second * 6, + }, + }, + expectStartEvent: true, + expectComplete: true, + }, + { + name: "complete but terminated event not received immediately", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + }, + insertEventsAfter: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonCompleted}, + after: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + }, + { + name: "completed with progress", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodSucceeded).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-1"}, + }, + { + event: &v1.Event{Reason: EventReasonProgress, Message: "fake-progress-2"}, + }, + { + event: &v1.Event{Reason: EventReasonCompleted}, + delay: time.Second, + }, + }, + expectStartEvent: true, + expectTerminateEvent: true, + expectComplete: true, + expectProgress: 2, + }, + { + name: "failed", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCancelled}, + }, + }, + terminationMessage: "fake-termination-message-1", + expectStartEvent: true, + expectTerminateEvent: true, + expectFail: true, + }, + { + name: "pod crash", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + terminationMessage: "fake-termination-message-2", + expectFail: true, + }, + { + name: "canceled", + thisPod: "fak-pod-1", + thisContainer: "fake-container-1", + insertPod: builder.ForPod("velero", "fake-pod-1").Phase(v1.PodFailed).Result(), + insertEventsBefore: []insertEvent{ + { + event: &v1.Event{Reason: EventReasonStarted}, + }, + { + event: &v1.Event{Reason: EventReasonCancelled}, + }, + }, + terminationMessage: ErrCancelled, + expectStartEvent: true, + expectTerminateEvent: true, + expectCancel: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + eventWaitTimeout = time.Second * 5 + + sw := startWatchFake{ + terminationMessage: test.terminationMessage, + redirectErr: test.redirectLogErr, + } + funcGetPodTerminationMessage = sw.getPodContainerTerminateMessage + funcRedirectLog = sw.redirectDataMoverLogs + funcGetResultFromMessage = sw.getResultFromMessage + + ms := µServiceBRWatcher{ + ctx: ctx, + namespace: test.namespace, + thisPod: test.thisPod, + thisContainer: test.thisContainer, + podCh: make(chan *v1.Pod, 2), + eventCh: make(chan *v1.Event, 10), + log: velerotest.NewLogger(), + callbacks: Callbacks{ + OnCompleted: sw.OnCompleted, + OnFailed: sw.OnFailed, + OnCancelled: sw.OnCancelled, + OnProgress: sw.OnProgress, + }, + } + + ms.startWatch() + + if test.ctxCancel { + cancel() + } + + for _, ev := range test.insertEventsBefore { + if ev.after != 0 { + time.Sleep(ev.after) + } + + ms.eventCh <- ev.event + + if ev.delay != 0 { + time.Sleep(ev.delay) + } + } + + if test.insertPod != nil { + ms.podCh <- test.insertPod + } + + for _, ev := range test.insertEventsAfter { + if ev.after != 0 { + time.Sleep(ev.after) + } + + ms.eventCh <- ev.event + + if ev.delay != 0 { + time.Sleep(ev.delay) + } + } + + ms.wgWatcher.Wait() + + assert.Equal(t, test.expectStartEvent, ms.startedFromEvent) + assert.Equal(t, test.expectTerminateEvent, ms.terminatedFromEvent) + assert.Equal(t, test.expectComplete, sw.complete) + assert.Equal(t, test.expectCancel, sw.canceled) + assert.Equal(t, test.expectFail, sw.failed) + assert.Equal(t, test.expectProgress, sw.progress) + + cancel() + }) + } +} + +func TestGetResultFromMessage(t *testing.T) { + tests := []struct { + name string + taskType string + message string + expectResult Result + }{ + { + name: "error to unmarshall backup result", + taskType: TaskTypeBackup, + message: "fake-message", + expectResult: Result{}, + }, + { + name: "error to unmarshall restore result", + taskType: TaskTypeRestore, + message: "fake-message", + expectResult: Result{}, + }, + { + name: "succeed to unmarshall backup result", + taskType: TaskTypeBackup, + message: "{\"snapshotID\":\"fake-snapshot-id\",\"emptySnapshot\":true,\"source\":{\"byPath\":\"fake-path-1\",\"volumeMode\":\"Block\"}}", + expectResult: Result{ + Backup: BackupResult{ + SnapshotID: "fake-snapshot-id", + EmptySnapshot: true, + Source: AccessPoint{ + ByPath: "fake-path-1", + VolMode: uploader.PersistentVolumeBlock, + }, + }, + }, + }, + { + name: "succeed to unmarshall restore result", + taskType: TaskTypeRestore, + message: "{\"target\":{\"byPath\":\"fake-path-2\",\"volumeMode\":\"Filesystem\"}}", + expectResult: Result{ + Restore: RestoreResult{ + Target: AccessPoint{ + ByPath: "fake-path-2", + VolMode: uploader.PersistentVolumeFilesystem, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := getResultFromMessage(test.taskType, test.message, velerotest.NewLogger()) + assert.Equal(t, test.expectResult, result) + }) + } +} + +func TestGetProgressFromMessage(t *testing.T) { + tests := []struct { + name string + message string + expectProgress uploader.Progress + }{ + { + name: "error to unmarshall progress", + message: "fake-message", + expectProgress: uploader.Progress{}, + }, + { + name: "succeed to unmarshall progress", + message: "{\"totalBytes\":1000,\"doneBytes\":200}", + expectProgress: uploader.Progress{ + TotalBytes: 1000, + BytesDone: 200, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + progress := getProgressFromMessage(test.message, velerotest.NewLogger()) + assert.Equal(t, test.expectProgress, *progress) + }) + } +} + +type redirectFake struct { + logFile *os.File + createTempErr error + getPodLogErr error + logMessage string +} + +func (rf *redirectFake) fakeCreateTempFile(_ string, _ string) (*os.File, error) { + if rf.createTempErr != nil { + return nil, rf.createTempErr + } + + return rf.logFile, nil +} + +func (rf *redirectFake) fakeCollectPodLogs(_ context.Context, _ corev1client.CoreV1Interface, _ string, _ string, _ string, output io.Writer) error { + if rf.getPodLogErr != nil { + return rf.getPodLogErr + } + + _, err := output.Write([]byte(rf.logMessage)) + + return err +} + +func TestRedirectDataMoverLogs(t *testing.T) { + logFileName := path.Join(os.TempDir(), "test-logger-file.log") + + var buffer string + + tests := []struct { + name string + thisPod string + logMessage string + logger logrus.FieldLogger + createTempErr error + collectLogErr error + expectErr string + }{ + { + name: "error to create temp file", + thisPod: "fake-pod", + createTempErr: errors.New("fake-create-temp-error"), + logger: velerotest.NewLogger(), + expectErr: "error to create temp file for data mover pod log: fake-create-temp-error", + }, + { + name: "error to collect pod log", + thisPod: "fake-pod", + collectLogErr: errors.New("fake-collect-log-error"), + logger: velerotest.NewLogger(), + expectErr: fmt.Sprintf("error to collect logs to %s for data mover pod fake-pod: fake-collect-log-error", logFileName), + }, + { + name: "succeed", + thisPod: "fake-pod", + logMessage: "fake-log-message-01\nfake-log-message-02\nfake-log-message-03\n", + logger: velerotest.NewSingleLoggerWithHooks(&buffer, logging.DefaultHooks(true)), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buffer = "" + + logFile, err := os.Create(logFileName) + require.NoError(t, err) + + rf := redirectFake{ + logFile: logFile, + createTempErr: test.createTempErr, + getPodLogErr: test.collectLogErr, + logMessage: test.logMessage, + } + + funcCreateTemp = rf.fakeCreateTempFile + funcCollectPodLogs = rf.fakeCollectPodLogs + + fakeKubeClient := kubeclientfake.NewSimpleClientset() + + err = redirectDataMoverLogs(context.Background(), fakeKubeClient, "", test.thisPod, "", test.logger) + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + assert.NoError(t, err) + + assert.True(t, strings.Contains(buffer, test.logMessage)) + } + }) + } +} diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index c98cd284a9..a2fac3ed59 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -50,8 +50,8 @@ type Callbacks struct { // AccessPoint represents an access point that has been exposed to a data path instance type AccessPoint struct { - ByPath string - VolMode uploader.PersistentVolumeMode + ByPath string `json:"byPath"` + VolMode uploader.PersistentVolumeMode `json:"volumeMode"` } // AsyncBR is the interface for asynchronous data path methods diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index cc0a895e11..44c29b1d5f 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -672,7 +672,7 @@ func TestPeekExpose(t *testing.T) { kubeClientObj: []runtime.Object{ backupPodUrecoverable, }, - err: "Pod is in abnormal state Failed", + err: "Pod is in abnormal state [Failed], message []", }, { name: "succeed", diff --git a/pkg/exposer/generic_restore_test.go b/pkg/exposer/generic_restore_test.go index 6080f8b97d..9108ba2281 100644 --- a/pkg/exposer/generic_restore_test.go +++ b/pkg/exposer/generic_restore_test.go @@ -456,7 +456,7 @@ func TestRestorePeekExpose(t *testing.T) { kubeClientObj: []runtime.Object{ restorePodUrecoverable, }, - err: "Pod is in abnormal state Failed", + err: "Pod is in abnormal state [Failed], message []", }, { name: "succeed", diff --git a/pkg/test/test_logger.go b/pkg/test/test_logger.go index b890fd5da3..65dc8422a7 100644 --- a/pkg/test/test_logger.go +++ b/pkg/test/test_logger.go @@ -50,3 +50,15 @@ func NewSingleLogger(buffer *string) logrus.FieldLogger { logger.Level = logrus.TraceLevel return logrus.NewEntry(logger) } + +func NewSingleLoggerWithHooks(buffer *string, hooks []logrus.Hook) logrus.FieldLogger { + logger := logrus.New() + logger.Out = &singleLogRecorder{buffer: buffer} + logger.Level = logrus.TraceLevel + + for _, hook := range hooks { + logger.Hooks.Add(hook) + } + + return logrus.NewEntry(logger) +} diff --git a/pkg/util/kube/pod.go b/pkg/util/kube/pod.go index 857fe9420e..9def5d514a 100644 --- a/pkg/util/kube/pod.go +++ b/pkg/util/kube/pod.go @@ -18,6 +18,7 @@ package kube import ( "context" "fmt" + "io" "time" "github.com/pkg/errors" @@ -117,8 +118,9 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, string) { // Check the Phase field if pod.Status.Phase == corev1api.PodFailed || pod.Status.Phase == corev1api.PodUnknown { - log.Warnf("Pod is in abnormal state %s", pod.Status.Phase) - return true, fmt.Sprintf("Pod is in abnormal state %s", pod.Status.Phase) + message := GetPodTerminateMessage(pod) + log.Warnf("Pod is in abnormal state %s, message [%s]", pod.Status.Phase, message) + return true, fmt.Sprintf("Pod is in abnormal state [%s], message [%s]", pod.Status.Phase, message) } // removed "Unschedulable" check since unschedulable condition isn't always permanent @@ -133,3 +135,69 @@ func IsPodUnrecoverable(pod *corev1api.Pod, log logrus.FieldLogger) (bool, strin } return false, "" } + +// GetPodContainerTerminateMessage returns the terminate message for a specific container of a pod +func GetPodContainerTerminateMessage(pod *corev1api.Pod, container string) string { + message := "" + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Name == container { + if containerStatus.State.Terminated != nil { + message = containerStatus.State.Terminated.Message + } + break + } + } + + return message +} + +// GetPodTerminateMessage returns the terminate message for all containers of a pod +func GetPodTerminateMessage(pod *corev1api.Pod) string { + message := "" + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Terminated != nil { + if containerStatus.State.Terminated.Message != "" { + message += containerStatus.State.Terminated.Message + "/" + } + } + } + + return message +} + +func getPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) { + request := podGetter.Pods(namespace).GetLogs(pod, logOptions) + return request.Stream(ctx) +} + +var podLogReaderGetter = getPodLogReader + +// CollectPodLogs collects logs of the specified container of a pod and write to the output +func CollectPodLogs(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, container string, output io.Writer) error { + logIndicator := fmt.Sprintf("***************************begin pod logs[%s/%s]***************************\n", pod, container) + + if _, err := output.Write([]byte(logIndicator)); err != nil { + return errors.Wrap(err, "error to write begin pod log indicator") + } + + logOptions := &corev1api.PodLogOptions{ + Container: container, + } + + if input, err := podLogReaderGetter(ctx, podGetter, pod, namespace, logOptions); err != nil { + logIndicator = fmt.Sprintf("No present log retrieved, err: %v\n", err) + } else { + if _, err := io.Copy(output, input); err != nil { + return errors.Wrap(err, "error to copy input") + } + + logIndicator = "" + } + + logIndicator += fmt.Sprintf("***************************end pod logs[%s/%s]***************************\n", pod, container) + if _, err := output.Write([]byte(logIndicator)); err != nil { + return errors.Wrap(err, "error to write end pod log indicator") + } + + return nil +} diff --git a/pkg/util/kube/pod_test.go b/pkg/util/kube/pod_test.go index f1cdac043e..7ccb225781 100644 --- a/pkg/util/kube/pod_test.go +++ b/pkg/util/kube/pod_test.go @@ -18,6 +18,8 @@ package kube import ( "context" + "io" + "strings" "testing" "time" @@ -32,6 +34,8 @@ import ( clientTesting "k8s.io/client-go/testing" velerotest "github.com/vmware-tanzu/velero/pkg/test" + + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) func TestEnsureDeletePod(t *testing.T) { @@ -422,3 +426,274 @@ func TestIsPodUnrecoverable(t *testing.T) { }) } } + +func TestGetPodTerminateMessage(t *testing.T) { + tests := []struct { + name string + pod *corev1api.Pod + message string + }{ + { + name: "empty message when no container status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + }, + }, + { + name: "empty message when no termination status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}}, + }, + }, + }, + }, + { + name: "empty message when no termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}}, + }, + }, + }, + }, + { + name: "with termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + message: "message-1/message-2/message-3/", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := GetPodTerminateMessage(test.pod) + assert.Equal(t, test.message, message) + }) + } +} + +func TestGetPodContainerTerminateMessage(t *testing.T) { + tests := []struct { + name string + pod *corev1api.Pod + container string + message string + }{ + { + name: "empty message when no container status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + Phase: corev1api.PodFailed, + }, + }, + }, + { + name: "empty message when no termination status", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Waiting: &corev1api.ContainerStateWaiting{Reason: "ImagePullBackOff"}}}, + }, + }, + }, + container: "container-1", + }, + { + name: "empty message when no termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Reason: "fake-reason"}}}, + }, + }, + }, + container: "container-1", + }, + { + name: "not matched container name", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + container: "container-0", + }, + { + name: "with termination message", + pod: &corev1api.Pod{ + Status: corev1api.PodStatus{ + ContainerStatuses: []corev1api.ContainerStatus{ + {Name: "container-1", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-1"}}}, + {Name: "container-2", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-2"}}}, + {Name: "container-3", State: corev1api.ContainerState{Terminated: &corev1api.ContainerStateTerminated{Message: "message-3"}}}, + }, + }, + }, + container: "container-2", + message: "message-2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := GetPodContainerTerminateMessage(test.pod, test.container) + assert.Equal(t, test.message, message) + }) + } +} + +type fakePodLog struct { + getError error + readError error + beginWriteError error + endWriteError error + writeError error + logMessage string + outputMessage string + readPos int +} + +func (fp *fakePodLog) GetPodLogReader(ctx context.Context, podGetter corev1client.CoreV1Interface, pod string, namespace string, logOptions *corev1api.PodLogOptions) (io.ReadCloser, error) { + if fp.getError != nil { + return nil, fp.getError + } + + return fp, nil +} + +func (fp *fakePodLog) Read(p []byte) (n int, err error) { + if fp.readError != nil { + return -1, fp.readError + } + + if fp.readPos == len(fp.logMessage) { + return 0, io.EOF + } + + copy(p, []byte(fp.logMessage)) + fp.readPos += len(fp.logMessage) + + return len(fp.logMessage), nil +} + +func (fp *fakePodLog) Close() error { + return nil +} + +func (fp *fakePodLog) Write(p []byte) (n int, err error) { + message := string(p) + if strings.Contains(message, "begin pod logs") { + if fp.beginWriteError != nil { + return -1, fp.beginWriteError + } + } else if strings.Contains(message, "end pod logs") { + if fp.endWriteError != nil { + return -1, fp.endWriteError + } + } else { + if fp.writeError != nil { + return -1, fp.writeError + } + } + + fp.outputMessage += message + + return len(message), nil +} + +func TestCollectPodLogs(t *testing.T) { + tests := []struct { + name string + pod string + container string + getError error + readError error + beginWriteError error + endWriteError error + writeError error + readMessage string + message string + expectErr string + }{ + { + name: "error to write begin indicator", + beginWriteError: errors.New("fake-write-error-01"), + expectErr: "error to write begin pod log indicator: fake-write-error-01", + }, + { + name: "error to get log", + pod: "fake-pod", + container: "fake-container", + getError: errors.New("fake-get-error"), + message: "***************************begin pod logs[fake-pod/fake-container]***************************\nNo present log retrieved, err: fake-get-error\n***************************end pod logs[fake-pod/fake-container]***************************\n", + }, + { + name: "error to read pod log", + pod: "fake-pod", + container: "fake-container", + readError: errors.New("fake-read-error"), + expectErr: "error to copy input: fake-read-error", + }, + { + name: "error to write pod log", + pod: "fake-pod", + container: "fake-container", + writeError: errors.New("fake-write-error-03"), + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + expectErr: "error to copy input: fake-write-error-03", + }, + { + name: "error to write end indicator", + pod: "fake-pod", + container: "fake-container", + endWriteError: errors.New("fake-write-error-02"), + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + expectErr: "error to write end pod log indicator: fake-write-error-02", + }, + { + name: "succeed", + pod: "fake-pod", + container: "fake-container", + readMessage: "fake pod message 01\n fake pod message 02\n fake pod message 03\n", + message: "***************************begin pod logs[fake-pod/fake-container]***************************\nfake pod message 01\n fake pod message 02\n fake pod message 03\n***************************end pod logs[fake-pod/fake-container]***************************\n", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fp := &fakePodLog{ + getError: test.getError, + readError: test.readError, + beginWriteError: test.beginWriteError, + endWriteError: test.endWriteError, + writeError: test.writeError, + logMessage: test.readMessage, + } + podLogReaderGetter = fp.GetPodLogReader + + err := CollectPodLogs(context.Background(), nil, test.pod, "", test.container, fp) + if test.expectErr != "" { + assert.EqualError(t, err, test.expectErr) + } else { + assert.NoError(t, err) + assert.Equal(t, fp.outputMessage, test.message) + } + }) + } +} diff --git a/pkg/util/logging/default_logger.go b/pkg/util/logging/default_logger.go index f1c22f80cb..b374c3d847 100644 --- a/pkg/util/logging/default_logger.go +++ b/pkg/util/logging/default_logger.go @@ -24,16 +24,26 @@ import ( // DefaultHooks returns a slice of the default // logrus hooks to be used by a logger. -func DefaultHooks() []logrus.Hook { - return []logrus.Hook{ +func DefaultHooks(merge bool) []logrus.Hook { + hooks := []logrus.Hook{ &LogLocationHook{}, &ErrorLocationHook{}, } + + if merge { + hooks = append(hooks, &MergeHook{}) + } + + return hooks } // DefaultLogger returns a Logger with the default properties // and hooks. The desired output format is passed as a LogFormat Enum. func DefaultLogger(level logrus.Level, format Format) *logrus.Logger { + return createLogger(level, format, false) +} + +func createLogger(level logrus.Level, format Format, merge bool) *logrus.Logger { logger := logrus.New() if format == FormatJSON { @@ -62,7 +72,7 @@ func DefaultLogger(level logrus.Level, format Format) *logrus.Logger { logger.Level = level - for _, hook := range DefaultHooks() { + for _, hook := range DefaultHooks(merge) { logger.Hooks.Add(hook) } diff --git a/pkg/util/logging/default_logger_test.go b/pkg/util/logging/default_logger_test.go index 7aab504962..10f6907578 100644 --- a/pkg/util/logging/default_logger_test.go +++ b/pkg/util/logging/default_logger_test.go @@ -34,7 +34,7 @@ func TestDefaultLogger(t *testing.T) { assert.Equal(t, os.Stdout, logger.Out) for _, level := range logrus.AllLevels { - assert.Equal(t, DefaultHooks(), logger.Hooks[level]) + assert.Equal(t, DefaultHooks(false), logger.Hooks[level]) } } } diff --git a/pkg/util/logging/log_merge_hook.go b/pkg/util/logging/log_merge_hook.go new file mode 100644 index 0000000000..92a55cda6c --- /dev/null +++ b/pkg/util/logging/log_merge_hook.go @@ -0,0 +1,113 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "bytes" + "io" + "os" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + ListeningLevel = logrus.ErrorLevel + ListeningMessage = "merge-log-57847fd0-0c7c-48e3-b5f7-984b293d8376" + LogSourceKey = "log-source" +) + +// MergeHook is used to redirect a batch of logs to another logger atomically. +// It hooks a log with ListeningMessage message, once the message is hit it replaces +// the logger's output to HookWriter so that HookWriter retrieves the logs from a file indicated +// by LogSourceKey field. +type MergeHook struct { +} + +type hookWriter struct { + orgWriter io.Writer + source string + logger *logrus.Logger +} + +func newHookWriter(orgWriter io.Writer, source string, logger *logrus.Logger) io.Writer { + return &hookWriter{ + orgWriter: orgWriter, + source: source, + logger: logger, + } +} + +func (h *MergeHook) Levels() []logrus.Level { + return []logrus.Level{ListeningLevel} +} + +func (h *MergeHook) Fire(entry *logrus.Entry) error { + if entry.Message != ListeningMessage { + return nil + } + + source, exist := entry.Data[LogSourceKey] + if !exist { + return nil + } + + entry.Logger.SetOutput(newHookWriter(entry.Logger.Out, source.(string), entry.Logger)) + + return nil +} + +func (w *hookWriter) Write(p []byte) (n int, err error) { + if !bytes.Contains(p, []byte(ListeningMessage)) { + return w.orgWriter.Write(p) + } + + defer func() { + w.logger.Out = w.orgWriter + }() + + sourceFile, err := os.OpenFile(w.source, os.O_RDONLY, 0600) + if err != nil { + return 0, err + } + defer sourceFile.Close() + + total := 0 + + buffer := make([]byte, 2048) + for { + read, err := sourceFile.Read(buffer) + if err == io.EOF { + return total, nil + } + + if err != nil { + return total, errors.Wrapf(err, "error to read source file %s at pos %v", w.source, total) + } + + written, err := w.orgWriter.Write(buffer[0:read]) + if err != nil { + return total, errors.Wrapf(err, "error to write log at pos %v", total) + } + + if written != read { + return total, errors.Errorf("error to write log at pos %v, read %v but written %v", total, read, written) + } + + total += read + } +} diff --git a/pkg/util/logging/log_merge_hook_test.go b/pkg/util/logging/log_merge_hook_test.go new file mode 100644 index 0000000000..d103152b82 --- /dev/null +++ b/pkg/util/logging/log_merge_hook_test.go @@ -0,0 +1,185 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + "fmt" + "os" + "testing" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeHook_Fire(t *testing.T) { + tests := []struct { + name string + entry logrus.Entry + expectHook bool + }{ + { + name: "normal message", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: "fake-message", + }, + expectHook: false, + }, + { + name: "normal source", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: ListeningMessage, + Data: logrus.Fields{"fake-key": "fake-value"}, + }, + expectHook: false, + }, + { + name: "hook hit", + entry: logrus.Entry{ + Level: logrus.ErrorLevel, + Message: ListeningMessage, + Data: logrus.Fields{LogSourceKey: "any-value"}, + Logger: &logrus.Logger{}, + }, + expectHook: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + hook := &MergeHook{} + // method under test + err := hook.Fire(&test.entry) + + assert.NoError(t, err) + + if test.expectHook { + assert.NotNil(t, test.entry.Logger.Out.(*hookWriter)) + } + }) + } +} + +type fakeWriter struct { + p []byte + writeError error + writtenLen int +} + +func (fw *fakeWriter) Write(p []byte) (n int, err error) { + if fw.writeError != nil || fw.writtenLen != -1 { + return fw.writtenLen, fw.writeError + } + + fw.p = append(fw.p, p...) + + return len(p), nil +} + +func TestMergeHook_Write(t *testing.T) { + sourceFile, err := os.CreateTemp("", "") + require.NoError(t, err) + + logMessage := "fake-message-1\nfake-message-2" + _, err = sourceFile.WriteString(logMessage) + require.NoError(t, err) + + tests := []struct { + name string + content []byte + source string + writeErr error + writtenLen int + expectError string + needRollBackHook bool + }{ + { + name: "normal message", + content: []byte("fake-message"), + writtenLen: -1, + }, + { + name: "failed to open source file", + content: []byte(ListeningMessage), + source: "non-exist", + needRollBackHook: true, + expectError: "open non-exist: no such file or directory", + }, + { + name: "write error", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writeErr: errors.New("fake-error"), + expectError: "error to write log at pos 0: fake-error", + needRollBackHook: true, + }, + { + name: "write len mismatch", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writtenLen: 100, + expectError: fmt.Sprintf("error to write log at pos 0, read %v but written 100", len(logMessage)), + needRollBackHook: true, + }, + { + name: "success", + content: []byte(ListeningMessage), + source: sourceFile.Name(), + writtenLen: -1, + needRollBackHook: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + writer := hookWriter{ + orgWriter: &fakeWriter{ + writeError: test.writeErr, + writtenLen: test.writtenLen, + }, + source: test.source, + logger: &logrus.Logger{}, + } + + n, err := writer.Write(test.content) + + if test.expectError == "" { + assert.NoError(t, err) + + expectStr := string(test.content) + if expectStr == ListeningMessage { + expectStr = logMessage + } + + assert.Len(t, expectStr, n) + + fakeWriter := writer.orgWriter.(*fakeWriter) + writtenStr := string(fakeWriter.p) + assert.Equal(t, writtenStr, expectStr) + } else { + assert.EqualError(t, err, test.expectError) + } + + if test.needRollBackHook { + assert.Equal(t, writer.logger.Out, writer.orgWriter) + } + }) + } +}