Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 96 additions & 68 deletions pkg/ddc/alluxio/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,76 +100,104 @@
return valueFile.Name(), nil
}

// genDataLoadValue generates configuration values for a DataLoad operation
// Parameters:
// - image: Container image to use for the data load operation
// - targetDataset: Target dataset object where data will be loaded
// - dataload: DataLoad custom resource specification
// Returns:
// - *cdataload.DataLoadValue: Fully populated configuration object for data loading
// - error: Any error that occurs during processing
func (e *AlluxioEngine) genDataLoadValue(image string, targetDataset *datav1alpha1.Dataset, dataload *datav1alpha1.DataLoad) (*cdataload.DataLoadValue, error) {
// image pull secrets
// if the environment variable is not set, it is still an empty slice
imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey)

dataloadInfo := cdataload.DataLoadInfo{
BackoffLimit: 3,
TargetDataset: dataload.Spec.Dataset.Name,
LoadMetadata: dataload.Spec.LoadMetadata,
Image: image,
Labels: dataload.Spec.PodMetadata.Labels,
Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations),
ImagePullSecrets: imagePullSecrets,
Policy: string(dataload.Spec.Policy),
Schedule: dataload.Spec.Schedule,
Resources: dataload.Spec.Resources,
}

// pod affinity
if dataload.Spec.Affinity != nil {
dataloadInfo.Affinity = dataload.Spec.Affinity
}

// inject the node affinity by previous operation pod.
var err error
dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(e.Client, dataload.Spec.RunAfter, dataload.Namespace, dataloadInfo.Affinity)
if err != nil {
return nil, err
}

// node selector
if dataload.Spec.NodeSelector != nil {
if dataloadInfo.NodeSelector == nil {
dataloadInfo.NodeSelector = make(map[string]string)
}
dataloadInfo.NodeSelector = dataload.Spec.NodeSelector
}

// pod tolerations
if len(dataload.Spec.Tolerations) > 0 {
if dataloadInfo.Tolerations == nil {
dataloadInfo.Tolerations = make([]v1.Toleration, 0)
}
dataloadInfo.Tolerations = dataload.Spec.Tolerations
}

// scheduler name
if len(dataload.Spec.SchedulerName) > 0 {
dataloadInfo.SchedulerName = dataload.Spec.SchedulerName
}

targetPaths := []cdataload.TargetPath{}
for _, target := range dataload.Spec.Target {
fluidNative := utils.IsTargetPathUnderFluidNativeMounts(target.Path, *targetDataset)
targetPaths = append(targetPaths, cdataload.TargetPath{
Path: target.Path,
Replicas: target.Replicas,
FluidNative: fluidNative,
})
}
dataloadInfo.TargetPaths = targetPaths
dataLoadValue := &cdataload.DataLoadValue{
Name: dataload.Name,
OwnerDatasetId: utils.GetDatasetId(targetDataset.Namespace, targetDataset.Name, string(targetDataset.UID)),
DataLoadInfo: dataloadInfo,
Owner: transformer.GenerateOwnerReferenceFromObject(dataload),
}

return dataLoadValue, nil
// Retrieve image pull secrets from environment variables
// Returns empty slice if environment variable isn't set
imagePullSecrets := docker.GetImagePullSecretsFromEnv(common.EnvImagePullSecretsKey)

// Build base DataLoad configuration structure
dataloadInfo := cdataload.DataLoadInfo{
BackoffLimit: 3, // Number of retries for failed jobs
TargetDataset: dataload.Spec.Dataset.Name, // Name of target dataset
LoadMetadata: dataload.Spec.LoadMetadata, // Whether to load metadata
Image: image, // Container image for data loader
Labels: dataload.Spec.PodMetadata.Labels, // Pod labels
// Merge annotations with affinity injection
Annotations: dataflow.InjectAffinityAnnotation(dataload.Annotations, dataload.Spec.PodMetadata.Annotations),
ImagePullSecrets: imagePullSecrets, // Credentials for pulling images
Policy: string(dataload.Spec.Policy), // Execution policy (e.g., Once, Cron)
Schedule: dataload.Spec.Schedule, // Cron schedule for periodic execution
Resources: dataload.Spec.Resources, // CPU/memory requirements
}

// Apply affinity settings if specified in DataLoad CR
if dataload.Spec.Affinity != nil {
dataloadInfo.Affinity = dataload.Spec.Affinity
}

// Inject dependency-based affinity constraints
// Ensures this operation runs after specified predecessor operations
var err error
dataloadInfo.Affinity, err = dataflow.InjectAffinityByRunAfterOp(
e.Client, // Kubernetes API client
dataload.Spec.RunAfter, // Operations that must complete first
dataload.Namespace, // Kubernetes namespace
dataloadInfo.Affinity, // Current affinity settings
)
if err != nil {
return nil, err // Propagate affinity injection errors
}

// Configure node selection constraints
if dataload.Spec.NodeSelector != nil {
// Initialize map if empty
if dataloadInfo.NodeSelector == nil {
dataloadInfo.NodeSelector = make(map[string]string)
}
dataloadInfo.NodeSelector = dataload.Spec.NodeSelector
}

// Configure tolerations for tainted nodes
if len(dataload.Spec.Tolerations) > 0 {
// Initialize slice if empty
if dataloadInfo.Tolerations == nil {
dataloadInfo.Tolerations = make([]v1.Toleration, 0)
}
dataloadInfo.Tolerations = dataload.Spec.Tolerations
}

// Assign custom scheduler if specified
if len(dataload.Spec.SchedulerName) > 0 {
dataloadInfo.SchedulerName = dataload.Spec.SchedulerName
}

// Process target data paths
targetPaths := []cdataload.TargetPath{}
for _, target := range dataload.Spec.Target {
// Check if path is within Fluid-native mounts
fluidNative := utils.IsTargetPathUnderFluidNativeMounts(target.Path, *targetDataset)

// Append path configuration
targetPaths = append(targetPaths, cdataload.TargetPath{
Path: target.Path, // Absolute data path to load
Replicas: target.Replicas, // Number of data replicas to create
FluidNative: fluidNative, // Whether path is Fluid-native
})
}
dataloadInfo.TargetPaths = targetPaths

// Assemble final configuration object
dataLoadValue := &cdataload.DataLoadValue{
Name: dataload.Name, // DataLoad job name
OwnerDatasetId: utils.GetDatasetId( // Unique dataset identifier
targetDataset.Namespace,
targetDataset.Name,
string(targetDataset.UID),
DataLoadInfo: dataloadInfo, // Core configuration

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / staticcheck

syntax error: unexpected : in argument list; possibly missing comma or ) (compile)

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / staticcheck

syntax error: unexpected : in argument list; possibly missing comma or ) (compile)

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / staticcheck

syntax error: unexpected : in argument list; possibly missing comma or ) (compile)

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' in argument list

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected : in argument list; possibly missing comma or )) (typecheck)

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected : in argument list; possibly missing comma or )) (typecheck)

Check failure on line 194 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / lint

syntax error: unexpected : in argument list; possibly missing comma or )) (typecheck)
// Owner reference for garbage collection
Owner: transformer.GenerateOwnerReferenceFromObject(dataload),

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' in argument list

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' in argument list

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' in argument list

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' in argument list

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' in argument list

Check failure on line 196 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' in argument list
}

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

expected operand, found '}'

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

expected operand, found '}'

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

expected operand, found '}'

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

expected operand, found '}'

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

expected operand, found '}'

Check failure on line 197 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

expected operand, found '}'

return dataLoadValue, nil

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' before newline in argument list

Check failure on line 199 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' in argument list
}

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

expected operand, found '}'

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

expected operand, found '}'

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

expected operand, found '}'

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

expected operand, found '}'

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

expected operand, found '}'

Check failure on line 200 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

expected operand, found '}'

// CheckRuntimeReady checks if the Alluxio runtime is operational.
// It obtains master pod details, creates file utilities, and checks readiness.
Expand All @@ -181,9 +209,9 @@
podName, containerName := e.getMasterPodInfo()
fileUtils := operations.NewAlluxioFileUtils(podName, containerName, e.namespace, e.Log)
ready = fileUtils.Ready()
if !ready {

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' in argument list

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' in argument list

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' in argument list

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' in argument list

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' in argument list

Check failure on line 212 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' in argument list
e.Log.Info("runtime not ready", "runtime", ready)

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' before newline in composite literal

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' before newline in composite literal

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' before newline in composite literal

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' before newline in composite literal

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' before newline in composite literal

Check failure on line 213 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' before newline in composite literal
return false

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / build

expected operand, found 'return'

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.29.2)

expected operand, found 'return'

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.28.7)

expected operand, found 'return'

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.22.17)

expected operand, found 'return'

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.24.17)

expected operand, found 'return'

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

missing ',' in composite literal

Check failure on line 214 in pkg/ddc/alluxio/load_data.go

View workflow job for this annotation

GitHub Actions / kind-e2e-test (v1.26.15)

expected operand, found 'return'
}
return true
}
Loading