diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 019e9307e..57651a280 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -53,12 +53,12 @@ jobs: - name: Checkout forkliftci uses: actions/checkout@v4 with: - repository: kubev2v/forkliftci - ref: v15.0 + repository: bennyz/forkliftci + ref: openstack-imagebased - name: Build and setup everything with bazel id: forkliftci - uses: kubev2v/forkliftci/ci/build-and-setup@v8.0 + uses: bennyz/forkliftci/ci/build-and-setup@openstack-imagebased with: provider_name: ${{ matrix.source_provider }} gh_access_token: ${{ secrets.GITHUB_TOKEN }} @@ -72,13 +72,13 @@ jobs: curl -k "${{ steps.forkliftci.outputs.cluster }}/apis/forklift.konveyor.io/v1beta1/namespaces/konveyor-forklift/providers" --header "Authorization: Bearer ${{ steps.forkliftci.outputs.token }}" - name: Run e2e sanity suite - uses: kubev2v/forkliftci/ci/run-suite@v8.0 + uses: bennyz/forkliftci/ci/run-suite@openstack-imagebased with: suite_name: e2e-sanity-${{ matrix.source_provider }} - name: save k8s logs and upload-artifact if: ${{ always() }} - uses: kubev2v/forkliftci/ci/save-artifacts@v8.0 + uses: bennyz/forkliftci/ci/save-artifacts@openstack-imagebased with: source_provider: ${{ matrix.source_provider }} diff --git a/cmd/image-converter/BUILD.bazel b/cmd/image-converter/BUILD.bazel index bc837014b..1a153224f 100644 --- a/cmd/image-converter/BUILD.bazel +++ b/cmd/image-converter/BUILD.bazel @@ -1,11 +1,16 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test") go_library( name = "image-converter_lib", srcs = ["image-converter.go"], importpath = "github.com/konveyor/forklift-controller/cmd/image-converter", visibility = ["//visibility:private"], - deps = ["//vendor/k8s.io/klog/v2:klog"], + deps = [ + "//pkg/metrics", + "//vendor/github.com/prometheus/client_golang/prometheus", + "//vendor/github.com/prometheus/client_model/go", + "//vendor/k8s.io/klog/v2:klog", + ], ) go_binary( @@ -13,3 +18,16 @@ go_binary( embed = [":image-converter_lib"], visibility = ["//visibility:public"], ) + +go_test( + name = "image-converter_test", + srcs = [ + "image-converter_test.go", + "image_converter_suite_test.go", + ], + embed = [":image-converter_lib"], + deps = [ + "//vendor/github.com/onsi/ginkgo/v2:ginkgo", + "//vendor/github.com/onsi/gomega", + ], +) diff --git a/cmd/image-converter/image-converter.go b/cmd/image-converter/image-converter.go index 239d4adb6..5964cf016 100644 --- a/cmd/image-converter/image-converter.go +++ b/cmd/image-converter/image-converter.go @@ -4,31 +4,46 @@ import ( "bufio" "bytes" "flag" + "os" "os/exec" + "strconv" + "strings" + "github.com/konveyor/forklift-controller/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "k8s.io/klog/v2" ) func main() { - var srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode string + var srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode, ownerUID string flag.StringVar(&srcVolPath, "src-path", "", "Source volume path") flag.StringVar(&dstVolPath, "dst-path", "", "Target volume path") flag.StringVar(&srcFormat, "src-format", "", "Format of the source volume") flag.StringVar(&dstFormat, "dst-format", "", "Format of the target volume") flag.StringVar(&volumeMode, "volume-mode", "", "Format of the target volume") + flag.StringVar(&ownerUID, "owner-uid", "", "Owner UID (usually PVC UID)") flag.Parse() klog.Info("srcVolPath: ", srcVolPath, " dstVolPath: ", dstVolPath, " sourceFormat: ", srcFormat, " targetFormat: ", dstFormat) - err := convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode) + + certsDirectory, err := os.MkdirTemp("", "certsdir") + if err != nil { + klog.Fatal(err) + } + + metrics.StartPrometheusEndpoint(certsDirectory) + + err = convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode, ownerUID) if err != nil { klog.Fatal(err) } } -func convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode string) error { - err := qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat) +func convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode, ownerUID string) error { + err := qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat, ownerUID) if err != nil { return err } @@ -38,7 +53,7 @@ func convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode string) er // Copy dst over src switch volumeMode { case "Block": - err = qemuimgConvert(dstVolPath, srcVolPath, dstFormat, dstFormat) + err = qemuimgConvert(dstVolPath, srcVolPath, dstFormat, dstFormat, ownerUID, "-W") if err != nil { return err } @@ -58,7 +73,7 @@ func convert(srcVolPath, dstVolPath, srcFormat, dstFormat, volumeMode string) er return nil } -func qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat string) error { +func qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat, ownerUID string, additionalArgs ...string) error { cmd := exec.Command( "qemu-img", "convert", @@ -69,6 +84,10 @@ func qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat string) error { dstVolPath, ) + if len(additionalArgs) > 0 { + cmd.Args = append(cmd.Args, additionalArgs...) + } + klog.Info("Executing command: ", cmd.String()) stdout, err := cmd.StdoutPipe() if err != nil { @@ -93,10 +112,20 @@ func qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat string) error { }() scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - line := scanner.Text() - klog.Info(line) - } + go func() { + progressVec := createProgressCounter() + for scanner.Scan() { + line := scanner.Text() + klog.Info(line) + progress, err := parseQemuimgProgress(line) + if err != nil { + klog.Error(err) + continue + } + + updateProgress(progressVec, ownerUID, progress) + } + }() err = cmd.Wait() if err != nil { @@ -105,3 +134,54 @@ func qemuimgConvert(srcVolPath, dstVolPath, srcFormat, dstFormat string) error { return nil } + +func updateProgress(progressVec *prometheus.CounterVec, ownerUID string, progress float64) { + if progress == 0 { + return + } + metric := &dto.Metric{} + if err := progressVec.WithLabelValues(ownerUID).Write(metric); err != nil { + klog.Errorf("updateProgress: failed to write metric; %v", err) + } + + if progress > *metric.Counter.Value { + progressVec.WithLabelValues(ownerUID).Add(progress - *metric.Counter.Value) + } +} + +func parseQemuimgProgress(line string) (float64, error) { + // Parse qemu-img progress + // Example: "(10.00/100%)" + trimmed := strings.Trim(line, "\r\n\t") + if strings.HasSuffix(trimmed, "100%)") { + start := strings.Index(trimmed, "(") + 1 + end := strings.Index(trimmed, "/") + + if start < end && end <= len(trimmed) { + progressStr := trimmed[start:end] + progress, err := strconv.ParseFloat(progressStr, 64) + if err != nil { + return 0, err + } + return progress, nil + } + } + + return 0, nil +} + +func createProgressCounter() *prometheus.CounterVec { + progressVec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "image_converter_progress", + Help: "Progress of image conversion", + }, + []string{"ownerUID"}, + ) + + if err := prometheus.Register(progressVec); err != nil { + klog.Error("Prometheus progress counter not registered:", err) + } + + return progressVec +} diff --git a/cmd/image-converter/image-converter_test.go b/cmd/image-converter/image-converter_test.go new file mode 100644 index 000000000..cda2e75dd --- /dev/null +++ b/cmd/image-converter/image-converter_test.go @@ -0,0 +1,25 @@ +package main + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Image converter", func() { + + Describe("parseQemuimgProgress function", func() { + It("correctly parses valid progress output", func() { + line := "(10.00/100%)" + progress, err := parseQemuimgProgress(line) + Expect(err).NotTo(HaveOccurred()) + Expect(progress).To(Equal(10.00)) + }) + + It("returns an error for invalid progress output", func() { + line := "Invalid output" + progress, err := parseQemuimgProgress(line) + Expect(err).ToNot(HaveOccurred()) + Expect(progress).To(Equal(float64(0))) + }) + }) +}) diff --git a/cmd/image-converter/image_converter_suite_test.go b/cmd/image-converter/image_converter_suite_test.go new file mode 100644 index 000000000..b964ccd7f --- /dev/null +++ b/cmd/image-converter/image_converter_suite_test.go @@ -0,0 +1,13 @@ +package main_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestImageConverter(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ImageConverter Suite") +} diff --git a/pkg/controller/plan/BUILD.bazel b/pkg/controller/plan/BUILD.bazel index 363f98ac6..196ce6c50 100644 --- a/pkg/controller/plan/BUILD.bazel +++ b/pkg/controller/plan/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "//pkg/controller/plan/context", "//pkg/controller/plan/handler", "//pkg/controller/plan/scheduler", + "//pkg/controller/plan/util", "//pkg/controller/provider/web", "//pkg/controller/validation", "//pkg/lib/client/openshift", diff --git a/pkg/controller/plan/adapter/base/doc.go b/pkg/controller/plan/adapter/base/doc.go index bbeb4c0a2..21f468ded 100644 --- a/pkg/controller/plan/adapter/base/doc.go +++ b/pkg/controller/plan/adapter/base/doc.go @@ -89,6 +89,8 @@ type Builder interface { GetPopulatorTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) // Get the virtual machine preference name PreferenceName(vmRef ref.Ref, configMap *core.ConfigMap) (name string, err error) + // Get conversionTaskName + GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) } // Client API. diff --git a/pkg/controller/plan/adapter/converter.go b/pkg/controller/plan/adapter/converter.go index d84ad02c3..583c8e72e 100644 --- a/pkg/controller/plan/adapter/converter.go +++ b/pkg/controller/plan/adapter/converter.go @@ -2,7 +2,12 @@ package adapter import ( "context" + "crypto/tls" "fmt" + "io" + "net/http" + "regexp" + "strconv" planbase "github.com/konveyor/forklift-controller/pkg/controller/plan/adapter/base" plancontext "github.com/konveyor/forklift-controller/pkg/controller/plan/context" @@ -29,6 +34,7 @@ type Converter struct { Log logging.LevelLogger Labels map[string]string FilterFn filterFn + httpClient *http.Client } func NewConverter(destination *plancontext.Destination, logger logging.LevelLogger, labels map[string]string) *Converter { @@ -72,19 +78,15 @@ func (c *Converter) ConvertPVCs(pvcs []*v1.PersistentVolumeClaim, srcFormat srcF } c.Log.Info("Convert job status", "pvc", pvc.Name, "status", convertJob.Status) - for _, condition := range convertJob.Status.Conditions { - switch condition.Type { - case batchv1.JobComplete: - c.Log.Info("Convert job completed", "pvc", pvc.Name) - c.deleteScratchDV(scratchDV) - return true, nil - - case batchv1.JobFailed: - if convertJob.Status.Failed >= 3 { - c.deleteScratchDV(scratchDV) - return true, liberr.New("convert job failed") - } - } + if convertJob.Status.Succeeded >= 1 { + completed++ + continue + } + + if convertJob.Status.Failed >= 3 { + c.Log.Info("Convert job failed", "pvc", pvc.Name, "job", convertJob.Name) + c.deleteScratchDV(scratchDV) + return false, liberr.New("convert job failed", "pvc", pvc.Name, "job", convertJob.Name) } } @@ -208,6 +210,12 @@ func makeConversionContainer(pvc *v1.PersistentVolumeClaim, srcFormat, dstFormat Drop: []v1.Capability{"ALL"}, }, }, + Ports: []v1.ContainerPort{ + { + Name: "metrics", + ContainerPort: 2112, + }, + }, Command: []string{"/usr/local/bin/image-converter"}, Args: []string{ "-src-path", srcPath, @@ -215,6 +223,7 @@ func makeConversionContainer(pvc *v1.PersistentVolumeClaim, srcFormat, dstFormat "-src-format", srcFormat, "-dst-format", dstFormat, "-volume-mode", string(volumeMode), + "-owner-uid", string(pvc.UID), }, } @@ -272,6 +281,109 @@ func (c *Converter) ensureScratchDV(sourcePVC *v1.PersistentVolumeClaim) (*cdi.D return scratchDV, nil } +func (c *Converter) GetConversionProgressForPVC(pvc *v1.PersistentVolumeClaim) (float64, error) { + // Find pod via pvc -> job -> pod + jobList := &batchv1.JobList{} + label := client.MatchingLabels{planbase.AnnConversionSourcePVC: pvc.Name} + err := c.Destination.Client.List(context.Background(), jobList, client.InNamespace(pvc.Namespace), label) + if err != nil { + return 0, err + } + + if len(jobList.Items) == 0 { + return 0, nil + } + + job := &jobList.Items[0] + for _, condition := range job.Status.Conditions { + switch condition.Type { + case batchv1.JobComplete: + return 100, nil + } + } + + podList := &v1.PodList{} + label = client.MatchingLabels{planbase.AnnConversionSourcePVC: pvc.Name} + err = c.Destination.Client.List(context.Background(), podList, client.InNamespace(pvc.Namespace), label) + if err != nil { + return 0, err + } + + var pod *v1.Pod + for _, currentPod := range podList.Items { + // loop var + currentPod := currentPod + if currentPod.Status.Phase != v1.PodFailed { + if currentPod.Status.Phase == v1.PodRunning { + pod = ¤tPod + break + } + continue + } + } + + if pod == nil { + return 0, nil + } + + // Get pod IP + podIP := pod.Status.PodIP + if podIP == "" { + return 0, liberr.New("pod IP not found") + } + + // Get progress from pod + progressRegexp := regexp.MustCompile("progress\\{ownerUID=\"" + string(pvc.UID) + "\"\\} (\\d+\\.?\\d*)") + if c.httpClient == nil { + c.httpClient = buildHTTPClient() + } + + resp, err := c.httpClient.Get(fmt.Sprintf("https://%s:2112/metrics", podIP)) + if err != nil { + err = liberr.Wrap(err) + return 0, err + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + err = liberr.Wrap(err) + return 0, err + } + + match := progressRegexp.FindSubmatch(body) + if match == nil { + c.Log.Info("Failed to find matches", "regexp", progressRegexp, "body", string(body)) + return 0, nil + } + + progress, err := strconv.ParseFloat(string(match[1]), 64) + if err != nil { + err = liberr.Wrap(err) + return 0, err + } + + return progress, nil +} + +func buildHTTPClient() *http.Client { + defaultTransport := http.DefaultTransport.(*http.Transport) + + transport := &http.Transport{ + Proxy: defaultTransport.Proxy, + DialContext: defaultTransport.DialContext, + MaxIdleConns: defaultTransport.MaxIdleConns, + IdleConnTimeout: defaultTransport.IdleConnTimeout, + ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout, + TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout, + + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + return &http.Client{Transport: transport} +} + func makeScratchDV(pvc *v1.PersistentVolumeClaim) *cdi.DataVolume { size := pvc.Spec.Resources.Requests[v1.ResourceStorage] annotations := make(map[string]string) diff --git a/pkg/controller/plan/adapter/converter_test.go b/pkg/controller/plan/adapter/converter_test.go index ee66c50a6..70e271348 100644 --- a/pkg/controller/plan/adapter/converter_test.go +++ b/pkg/controller/plan/adapter/converter_test.go @@ -43,21 +43,12 @@ var _ = Describe("Converter tests", func() { }, } - convertJob := &batchv1.Job{ - ObjectMeta: meta.ObjectMeta{ - Name: getJobName(qcow2PVC, "convert"), - Namespace: pvcNamespace, - Labels: map[string]string{ - base.AnnConversionSourcePVC: pvcName, - }, - }, - } - srcFormatFn := func(pvc *v1.PersistentVolumeClaim) string { return pvc.Annotations[base.AnnSourceFormat] } It("Should not be ready if job is not ready", func() { + convertJob := createFakeConvertJob(qcow2PVC, pvcNamespace) converter = createFakeConverter(qcow2PVC, convertJob) ready, err := converter.ConvertPVCs([]*v1.PersistentVolumeClaim{qcow2PVC}, srcFormatFn, "raw") Expect(err).ToNot(HaveOccurred()) @@ -65,9 +56,8 @@ var _ = Describe("Converter tests", func() { }) It("Should be ready if job is ready", func() { - convertJob.Status.Conditions = append(convertJob.Status.Conditions, batchv1.JobCondition{ - Type: batchv1.JobComplete, - }) + convertJob := createFakeConvertJob(qcow2PVC, pvcNamespace) + convertJob.Status.Succeeded = 1 dv := &cdi.DataVolume{ ObjectMeta: meta.ObjectMeta{ @@ -121,13 +111,15 @@ var _ = Describe("Converter tests", func() { }, } - convertJob.Status.Conditions = append(convertJob.Status.Conditions, batchv1.JobCondition{Status: "False", Type: batchv1.JobFailed}) + convertJob := createFakeConvertJob(qcow2PVC, pvcNamespace) + + convertJob.Status.Succeeded = 0 convertJob.Status.Failed = 3 converter = createFakeConverter(qcow2PVC, convertJob, dv) _, err := converter.ConvertPVCs([]*v1.PersistentVolumeClaim{qcow2PVC}, srcFormatFn, "raw") - Expect(err).ToNot(HaveOccurred()) + Expect(err).To(HaveOccurred()) // Check if scratch DV is removed err = converter.Destination.Client.Get(context.TODO(), types.NamespacedName{Name: dv.Name, Namespace: dv.Namespace}, dv) @@ -158,3 +150,15 @@ func createFakeConverter(objects ...runtime.Object) *Converter { Labels: map[string]string{}, } } + +func createFakeConvertJob(pvc *v1.PersistentVolumeClaim, pvcNamespace string) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: meta.ObjectMeta{ + Name: getJobName(pvc, "convert"), + Namespace: pvcNamespace, + Labels: map[string]string{ + base.AnnConversionSourcePVC: pvc.Name, + }, + }, + } +} diff --git a/pkg/controller/plan/adapter/ocp/builder.go b/pkg/controller/plan/adapter/ocp/builder.go index 199812ce6..f3288e6a8 100644 --- a/pkg/controller/plan/adapter/ocp/builder.go +++ b/pkg/controller/plan/adapter/ocp/builder.go @@ -640,3 +640,7 @@ func (r *Builder) LunPersistentVolumeClaims(vmRef ref.Ref) (pvcs []core.Persiste // do nothing return } + +func (r *Builder) GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) { + return "", liberr.New("conversion tasks are not supported for OCP") +} diff --git a/pkg/controller/plan/adapter/openstack/builder.go b/pkg/controller/plan/adapter/openstack/builder.go index b6684d9d5..52b2971f4 100644 --- a/pkg/controller/plan/adapter/openstack/builder.go +++ b/pkg/controller/plan/adapter/openstack/builder.go @@ -600,6 +600,7 @@ func (r *Builder) mapDisks(vm *model.Workload, persistentVolumeClaims []*core.Pe object.Template.Spec.Volumes = kVolumes object.Template.Spec.Domain.Devices.Disks = kDisks + r.Log.Info("Benny was here", "kDisks", kDisks) } func (r *Builder) mapNetworks(vm *model.Workload, object *cnv.VirtualMachineSpec) (err error) { @@ -716,6 +717,7 @@ func (r *Builder) mapNetworks(vm *model.Workload, object *cnv.VirtualMachineSpec // Build tasks. func (r *Builder) Tasks(vmRef ref.Ref) (tasks []*plan.Task, err error) { + r.Log.Info("building tasks", "vm", vmRef.String()) workload := &model.Workload{} err = r.Source.Inventory.Find(workload, vmRef) if err != nil { @@ -982,6 +984,7 @@ func (r *Builder) PopulatorVolumes(vmRef ref.Ref, annotations map[string]string, func (r *Builder) getCorrespondingPvc(image model.Image, workload *model.Workload, annotations map[string]string, secretName string) (pvc *core.PersistentVolumeClaim, err error) { populatorCR, err := r.ensureVolumePopulator(workload, &image, secretName) if err != nil { + err = liberr.Wrap(err) return } return r.ensureVolumePopulatorPVC(workload, &image, annotations, populatorCR.Name) @@ -1407,3 +1410,7 @@ func (r *Builder) GetPopulatorTaskName(pvc *core.PersistentVolumeClaim) (taskNam taskName = image.Name return } + +func (r *Builder) GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) { + return fmt.Sprintf("%s-convert", pvc.Annotations[planbase.AnnDiskSource]), nil +} diff --git a/pkg/controller/plan/adapter/ova/builder.go b/pkg/controller/plan/adapter/ova/builder.go index 5b1afe96b..e6849fa3c 100644 --- a/pkg/controller/plan/adapter/ova/builder.go +++ b/pkg/controller/plan/adapter/ova/builder.go @@ -561,3 +561,7 @@ func (r *Builder) GetPopulatorTaskName(pvc *core.PersistentVolumeClaim) (taskNam err = planbase.VolumePopulatorNotSupportedError return } + +func (r *Builder) GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) { + return "", liberr.New("conversion tasks are not supported for OVA") +} diff --git a/pkg/controller/plan/adapter/ovirt/builder.go b/pkg/controller/plan/adapter/ovirt/builder.go index 9c2f7f4ae..9ada7b8c7 100644 --- a/pkg/controller/plan/adapter/ovirt/builder.go +++ b/pkg/controller/plan/adapter/ovirt/builder.go @@ -984,3 +984,7 @@ func (r *Builder) GetPopulatorTaskName(pvc *core.PersistentVolumeClaim) (taskNam taskName = pvc.Annotations[planbase.AnnDiskSource] return } + +func (r *Builder) GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) { + return "", liberr.New("Conversion task is not supported for oVirt") +} diff --git a/pkg/controller/plan/adapter/vsphere/builder.go b/pkg/controller/plan/adapter/vsphere/builder.go index 4b7edec54..5d2aef1df 100644 --- a/pkg/controller/plan/adapter/vsphere/builder.go +++ b/pkg/controller/plan/adapter/vsphere/builder.go @@ -883,3 +883,7 @@ func (r *Builder) GetPopulatorTaskName(pvc *core.PersistentVolumeClaim) (taskNam err = planbase.VolumePopulatorNotSupportedError return } + +func (r *Builder) GetConversionTaskName(pvc *core.PersistentVolumeClaim) (taskName string, err error) { + return "", liberr.New("conversion task name not supported for vSphere") +} diff --git a/pkg/controller/plan/kubevirt.go b/pkg/controller/plan/kubevirt.go index 01eadf257..4076c5f91 100644 --- a/pkg/controller/plan/kubevirt.go +++ b/pkg/controller/plan/kubevirt.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" k8svalidation "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/utils/ptr" cnv "kubevirt.io/api/core/v1" instancetype "kubevirt.io/api/instancetype/v1beta1" libvirtxml "libvirt.org/libvirt-go-xml" @@ -314,7 +315,7 @@ func (r *KubeVirt) DeleteJobs(vm *plan.VMStatus) (err error) { jobNames := []string{} for _, job := range list.Items { - err = r.DeleteObject(&job, vm, "Deleted job.", "job") + err = r.DeleteObject(&job, vm, "Deleted job.", "job", &client.DeleteOptions{PropagationPolicy: ptr.To(meta.DeletePropagationBackground)}) if err != nil { err = liberr.Wrap(err) r.Log.Error( diff --git a/pkg/controller/plan/migration.go b/pkg/controller/plan/migration.go index cb8ab53fe..6157a4d35 100644 --- a/pkg/controller/plan/migration.go +++ b/pkg/controller/plan/migration.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/konveyor/forklift-controller/pkg/controller/plan/util" + "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1" "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1/plan" "github.com/konveyor/forklift-controller/pkg/controller/plan/adapter" @@ -79,14 +81,15 @@ const ( // Steps. const ( - Initialize = "Initialize" - Cutover = "Cutover" - DiskAllocation = "DiskAllocation" - DiskTransfer = "DiskTransfer" - ImageConversion = "ImageConversion" - DiskTransferV2v = "DiskTransferV2v" - VMCreation = "VirtualMachineCreation" - Unknown = "Unknown" + Initialize = "Initialize" + Cutover = "Cutover" + DiskAllocation = "DiskAllocation" + DiskTransfer = "DiskTransfer" + ImageConversion = "ImageConversion" + DiskTransferV2v = "DiskTransferV2v" + VMCreation = "VirtualMachineCreation" + OpensStackImageConversion = "OpenstackImageConversion" + Unknown = "Unknown" ) const ( @@ -632,8 +635,10 @@ func (r *Migration) step(vm *plan.VMStatus) (step string) { step = Initialize case AllocateDisks: step = DiskAllocation - case CopyDisks, CopyingPaused, CreateSnapshot, WaitForSnapshot, AddCheckpoint, ConvertOpenstackSnapshot: + case CopyDisks, CopyingPaused, CreateSnapshot, WaitForSnapshot, AddCheckpoint: step = DiskTransfer + case ConvertOpenstackSnapshot: + step = OpensStackImageConversion case CreateFinalSnapshot, WaitForFinalSnapshot, AddFinalCheckpoint, Finalize: step = Cutover case CreateGuestConversionPod, ConvertGuest: @@ -935,6 +940,29 @@ func (r *Migration) execute(vm *plan.VMStatus) (err error) { break } + // Update progress + for _, pvc := range pvcs { + if _, ok := pvc.Annotations[base.AnnRequiresConversion]; !ok { + continue + } + + progress, err := r.converter.GetConversionProgressForPVC(pvc) + if err != nil { + r.Log.Error(err, + "Failed to get conversion progress for PVC", + "pvc", + pvc.Name, + "namespace", + pvc.Namespace) + continue + } + err = r.updateImageConversionProgress(pvc, step, progress) + if err != nil { + r.Log.Error(err, "Failed to update image conversion progress") + continue + } + } + if !ready { r.Log.Info("Conversion isn't ready yet") return nil @@ -1232,7 +1260,33 @@ func (r *Migration) buildPipeline(vm *plan.VM) (pipeline []*plan.Step, err error Phase: Pending, }, }) - case AllocateDisks, CopyDisks, CopyDisksVirtV2V, ConvertOpenstackSnapshot: + case ConvertOpenstackSnapshot: + var taskErr error + task, taskErr := util.CreateConversionTask(r.Source.Inventory, vm.Ref) + if taskErr != nil { + return nil, taskErr + } + + if task == nil { + break + } + + pipeline = append( + pipeline, + &plan.Step{ + Task: plan.Task{ + Name: OpensStackImageConversion, + Description: "Convert OpenStack snapshot.", + Progress: libitr.Progress{ + Completed: 0, + Total: 100, + }, + Phase: Pending, + }, + Tasks: []*plan.Task{task}, + }) + + case AllocateDisks, CopyDisks, CopyDisksVirtV2V: tasks, pErr := r.builder.Tasks(vm.Ref) if pErr != nil { err = liberr.Wrap(pErr) @@ -1253,9 +1307,6 @@ func (r *Migration) buildPipeline(vm *plan.VM) (pipeline []*plan.Step, err error case CopyDisksVirtV2V: task_name = DiskTransferV2v task_description = "Copy disks." - case ConvertOpenstackSnapshot: - task_name = ConvertOpenstackSnapshot - task_description = "Convert OpenStack snapshot." default: err = liberr.New(fmt.Sprintf("Unknown step '%s'. Not implemented.", step.Name)) return @@ -1721,6 +1772,36 @@ func (r *Migration) setDataVolumeCheckpoints(vm *plan.VMStatus) (err error) { return } +func (r *Migration) updateImageConversionProgress(pvc *core.PersistentVolumeClaim, step *plan.Step, progress float64) (err error) { + r.Log.Info("Updating progress for PVC", "pvc", pvc.Name) + taskName, err := r.builder.GetConversionTaskName(pvc) + if err != nil { + err = liberr.Wrap(err) + return + } + + var task *plan.Task + found := false + if task, found = step.FindTask(taskName); !found { + r.Log.Info("Task not found", "task", taskName) + return + } + + r.Log.Info("Updating progress", "progress", progress, "task", taskName) + + task.Progress.Completed = int64(progress) + if task.Progress.Completed >= task.Progress.Total { + task.Phase = Completed + task.Reason = TransferCompleted + task.Progress.Completed = task.Progress.Total + task.MarkCompleted() + } + + step.ReflectTasks() + + return nil +} + func (r *Migration) updatePopulatorCopyProgress(vm *plan.VMStatus, step *plan.Step) (err error) { pvcs, err := r.kubevirt.getPVCs(vm.Ref) if err != nil { @@ -1757,7 +1838,6 @@ func (r *Migration) updatePopulatorCopyProgress(vm *plan.VMStatus, step *plan.St if err != nil { return } - percent := float64(transferredBytes/0x100000) / float64(task.Progress.Total) newProgress := int64(percent * float64(task.Progress.Total)) if newProgress == task.Progress.Completed { diff --git a/pkg/controller/plan/util/BUILD.bazel b/pkg/controller/plan/util/BUILD.bazel index 54588add3..e112ef518 100644 --- a/pkg/controller/plan/util/BUILD.bazel +++ b/pkg/controller/plan/util/BUILD.bazel @@ -11,8 +11,13 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/forklift/v1beta1", + "//pkg/apis/forklift/v1beta1/plan", + "//pkg/apis/forklift/v1beta1/ref", + "//pkg/controller/provider/web", "//pkg/controller/provider/web/openstack", "//pkg/controller/provider/web/ovirt", + "//pkg/lib/error", + "//pkg/lib/itinerary", "//pkg/settings", "//vendor/k8s.io/api/core/v1:core", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:meta", diff --git a/pkg/controller/plan/util/openstack.go b/pkg/controller/plan/util/openstack.go index 9b15a5850..809daacd7 100644 --- a/pkg/controller/plan/util/openstack.go +++ b/pkg/controller/plan/util/openstack.go @@ -1,9 +1,17 @@ package util import ( + "fmt" "net/url" + libitr "github.com/konveyor/forklift-controller/pkg/lib/itinerary" + api "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1" + "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1/plan" + "github.com/konveyor/forklift-controller/pkg/apis/forklift/v1beta1/ref" + "github.com/konveyor/forklift-controller/pkg/controller/provider/web" + model "github.com/konveyor/forklift-controller/pkg/controller/provider/web/openstack" + liberr "github.com/konveyor/forklift-controller/pkg/lib/error" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,3 +36,40 @@ func OpenstackVolumePopulator(image *openstack.Image, sourceUrl *url.URL, transf }, } } + +func CreateConversionTask(inventory web.Client, vmRef ref.Ref) (*plan.Task, error) { + workload := &model.Workload{} + err := inventory.Find(workload, vmRef) + if err != nil { + err = liberr.Wrap(err, "vm", vmRef.String()) + return nil, err + } + + if workload.ImageID == "" { + //nolint:nilnil + return nil, nil + } + + imageID := workload.ImageID + + // Find image in inventory + image := &model.Image{} + err = inventory.Get(image, imageID) + if err != nil { + err = liberr.Wrap(err, "image", imageID) + return nil, err + } + if image.DiskFormat == "raw" { + //nolint:nilnil + return nil, nil + } + + taskName := fmt.Sprintf("%s-convert", image.ID) + return &plan.Task{ + Name: taskName, + Progress: libitr.Progress{ + Completed: 0, + Total: 100, + }, + }, nil +}