-
Notifications
You must be signed in to change notification settings - Fork 6
DATA-5849: File download support #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
7aac8f6
3661e5f
f811ade
f6820bd
4399b49
ff76d4c
34b5434
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |
| "fmt" | ||
| "os" | ||
| "strings" | ||
| "text/template" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
|
|
@@ -21,6 +22,120 @@ import ( | |
| "github.com/patterninc/heimdall/pkg/result/column" | ||
| ) | ||
|
|
||
| var ( | ||
| templatePath = "internal/pkg/object/command/ecs/startup_script_template.sh" | ||
| ) | ||
|
|
||
| // FileDownload represents a file to be downloaded before container execution | ||
| type FileDownload struct { | ||
| Source string `yaml:"source,omitempty" json:"source,omitempty"` // S3 URI or HTTP URL | ||
| Destination string `yaml:"destination,omitempty" json:"destination,omitempty"` // Local path in container | ||
| } | ||
|
|
||
| // StartupScriptConfig represents configuration for the startup script | ||
| type StartupScriptConfig struct { | ||
| ScriptPath string `yaml:"script_path,omitempty" json:"script_path,omitempty"` // Path to the startup script | ||
| DownloadDir string `yaml:"download_dir,omitempty" json:"download_dir,omitempty"` // Directory to download files to | ||
| Timeout int `yaml:"timeout,omitempty" json:"timeout,omitempty"` // Timeout in seconds | ||
| CreateDirs bool `yaml:"create_dirs,omitempty" json:"create_dirs,omitempty"` // Create destination directories | ||
| } | ||
|
|
||
| // ScriptTemplateData represents data for populating the startup script template | ||
| type ScriptTemplateData struct { | ||
|
||
| DownloadDir string | ||
| Timeout int | ||
| CreateDirs bool | ||
| Downloads []ScriptDownload | ||
| } | ||
|
|
||
| // ScriptDownload represents a download item for the script template | ||
| type ScriptDownload struct { | ||
| Source string | ||
| Destination string | ||
| IsS3 bool | ||
|
||
| } | ||
|
|
||
| // ContainerModificationOption represents a generic option for modifying container definitions | ||
| type ContainerModificationOption func(*types.ContainerDefinition) error | ||
|
||
|
|
||
| // ContainerOption represents a generic option for container modifications | ||
| type ContainerOption struct { | ||
| ModifyContainer ContainerModificationOption | ||
| Description string | ||
| } | ||
|
|
||
| // WithStartupScriptWrapper creates a container option that injects startup script for file downloads | ||
| func (execCtx *executionContext) WithStartupScriptWrapper() ContainerOption { | ||
|
||
| return ContainerOption{ | ||
| ModifyContainer: func(container *types.ContainerDefinition) error { | ||
| return execCtx.modifyContainerWithStartupScript(container) | ||
| }, | ||
| Description: "Inject startup script for file downloads", | ||
| } | ||
| } | ||
|
|
||
| // modifyContainerWithStartupScript modifies a container definition to include startup script for downloads | ||
| func (execCtx *executionContext) modifyContainerWithStartupScript(container *types.ContainerDefinition) error { | ||
| if len(execCtx.FileDownloads) == 0 { | ||
| return nil // No downloads configured, no modification needed | ||
| } | ||
|
|
||
| // Generate startup script | ||
| startupScript, err := generateStartupScript(execCtx.FileDownloads, execCtx.StartupScriptConfig) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to generate startup script: %w", err) | ||
| } | ||
|
|
||
| // Store original command | ||
| originalCommand := container.Command | ||
| if originalCommand == nil { | ||
| originalCommand = []string{} | ||
| } | ||
|
|
||
| // Create startup script command | ||
| scriptCmd := []string{"sh", "-c", startupScript} | ||
| container.Command = scriptCmd | ||
|
|
||
| // Add environment variables for the startup script | ||
| if container.Environment == nil { | ||
| container.Environment = []types.KeyValuePair{} | ||
| } | ||
|
|
||
| // Add original command as environment variable for the startup script | ||
| originalCmdStr := strings.Join(originalCommand, " ") | ||
| container.Environment = append(container.Environment, | ||
| types.KeyValuePair{ | ||
| Name: aws.String("ORIGINAL_COMMAND"), | ||
| Value: aws.String(originalCmdStr), | ||
| }) | ||
| fmt.Println() | ||
prasadlohakpure marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return nil | ||
| } | ||
|
|
||
| // getDefaultContainerOptions returns the default container options | ||
| func (execCtx *executionContext) getDefaultContainerOptions() []ContainerOption { | ||
|
||
| options := []ContainerOption{} | ||
|
|
||
| // Add startup script wrapper option if file downloads are configured | ||
| if len(execCtx.FileDownloads) > 0 { | ||
| options = append(options, execCtx.WithStartupScriptWrapper()) | ||
| } | ||
|
|
||
| return options | ||
| } | ||
|
|
||
| // applyContainerOptions applies container options to container definitions | ||
| func (execCtx *executionContext) applyContainerOptions(containerDefinitions []types.ContainerDefinition, options []ContainerOption) error { | ||
| for _, option := range options { | ||
| for i := range containerDefinitions { | ||
|
||
| if err := option.ModifyContainer(&containerDefinitions[i]); err != nil { | ||
| return fmt.Errorf("failed to apply container option '%s': %w", option.Description, err) | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // ECS command context structure | ||
| type ecsCommandContext struct { | ||
| TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"` | ||
|
|
@@ -29,6 +144,10 @@ type ecsCommandContext struct { | |
| PollingInterval duration.Duration `yaml:"polling_interval,omitempty" json:"polling_interval,omitempty"` | ||
| Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` | ||
| MaxFailCount int `yaml:"max_fail_count,omitempty" json:"max_fail_count,omitempty"` // max failures before giving up | ||
|
|
||
| // File download configuration | ||
| FileDownloads []FileDownload `yaml:"file_downloads,omitempty" json:"file_downloads,omitempty"` | ||
| StartupScriptConfig *StartupScriptConfig `yaml:"startup_script_config,omitempty" json:"startup_script_config,omitempty"` | ||
| } | ||
|
|
||
| // ECS cluster context structure | ||
|
|
@@ -77,6 +196,10 @@ type executionContext struct { | |
| Timeout duration.Duration `json:"timeout"` | ||
| MaxFailCount int `json:"max_fail_count"` | ||
|
|
||
| // File download configuration | ||
| FileDownloads []FileDownload `json:"file_downloads"` | ||
| StartupScriptConfig *StartupScriptConfig `json:"startup_script_config"` | ||
|
|
||
| ecsClient *ecs.Client | ||
| taskDefARN *string | ||
| tasks map[string]*taskTracker | ||
|
|
@@ -87,6 +210,8 @@ const ( | |
| defaultTaskTimeout = duration.Duration(1 * time.Hour) | ||
| defaultMaxFailCount = 1 | ||
| defaultTaskCount = 1 | ||
| defaultDownloadDir = "/tmp/downloads" | ||
| defaultTimeout = 300 | ||
| startedByPrefix = "heimdall-job-" | ||
| errMaxFailCount = "task %s failed %d times (max: %d), giving up" | ||
| errPollingTimeout = "polling timed out for arns %v after %v" | ||
|
|
@@ -97,13 +222,77 @@ var ( | |
| errMissingTemplate = fmt.Errorf("task definition template is required") | ||
| ) | ||
|
|
||
| // generateStartupScript creates a startup script for downloading files using a template | ||
| func generateStartupScript(fileDownloads []FileDownload, config *StartupScriptConfig) (string, error) { | ||
| if len(fileDownloads) == 0 { | ||
|
||
| return "#!/bin/bash\necho 'No files to download'\nexec \"$@\"", nil | ||
| } | ||
|
|
||
| downloadDir := defaultDownloadDir | ||
| timeout := defaultTimeout | ||
| createDirs := true | ||
|
|
||
| if config != nil { | ||
| if config.DownloadDir != "" { | ||
| downloadDir = config.DownloadDir | ||
| } | ||
| if config.Timeout > 0 { | ||
| timeout = config.Timeout | ||
| } | ||
| createDirs = config.CreateDirs | ||
| } | ||
|
|
||
| // Prepare template data | ||
| templateData := ScriptTemplateData{ | ||
| DownloadDir: downloadDir, | ||
| Timeout: timeout, | ||
| CreateDirs: createDirs, | ||
| Downloads: make([]ScriptDownload, 0, len(fileDownloads)), | ||
| } | ||
|
|
||
| // Convert file downloads to script downloads | ||
| for _, download := range fileDownloads { | ||
| scriptDownload := ScriptDownload{ | ||
| Source: download.Source, | ||
| Destination: download.Destination, | ||
| IsS3: strings.HasPrefix(download.Source, "s3://"), | ||
| } | ||
| templateData.Downloads = append(templateData.Downloads, scriptDownload) | ||
| } | ||
|
|
||
| // Load template | ||
| templateContent, err := os.ReadFile(templatePath) | ||
| if err != nil { | ||
| return "", fmt.Errorf("failed to read template file: %w", err) | ||
| } | ||
|
|
||
| // Parse template | ||
| tmpl, err := template.New("startup_script").Parse(string(templateContent)) | ||
| if err != nil { | ||
| return "", fmt.Errorf("failed to parse template: %w", err) | ||
|
||
| } | ||
|
|
||
| // Execute template | ||
| var script strings.Builder | ||
| if err := tmpl.Execute(&script, templateData); err != nil { | ||
| return "", fmt.Errorf("failed to execute template: %w", err) | ||
| } | ||
|
|
||
| return script.String(), nil | ||
| } | ||
|
|
||
| func New(commandContext *context.Context) (plugin.Handler, error) { | ||
|
|
||
| e := &ecsCommandContext{ | ||
| PollingInterval: defaultPollingInterval, | ||
| Timeout: defaultTaskTimeout, | ||
| MaxFailCount: defaultMaxFailCount, | ||
| TaskCount: defaultTaskCount, | ||
| StartupScriptConfig: &StartupScriptConfig{ | ||
| DownloadDir: defaultDownloadDir, | ||
| Timeout: defaultTimeout, | ||
| CreateDirs: true, | ||
| }, | ||
| } | ||
|
|
||
| if commandContext != nil { | ||
|
|
@@ -151,6 +340,15 @@ func (e *ecsCommandContext) handler(r *plugin.Runtime, job *job.Job, cluster *cl | |
|
|
||
| // prepare and register task definition with ECS | ||
| func (execCtx *executionContext) registerTaskDefinition() error { | ||
| // Start with the original container definitions | ||
| containerDefinitions := execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions | ||
|
|
||
| // Apply container options using the options pattern | ||
| containerOptions := execCtx.getDefaultContainerOptions() | ||
|
||
| if err := execCtx.applyContainerOptions(containerDefinitions, containerOptions); err != nil { | ||
| return fmt.Errorf("failed to apply container options: %w", err) | ||
| } | ||
|
|
||
| registerInput := &ecs.RegisterTaskDefinitionInput{ | ||
| Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)), | ||
| RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate}, | ||
|
|
@@ -159,7 +357,7 @@ func (execCtx *executionContext) registerTaskDefinition() error { | |
| Memory: aws.String(fmt.Sprintf("%d", execCtx.ClusterConfig.Memory)), | ||
| ExecutionRoleArn: aws.String(execCtx.ClusterConfig.ExecutionRoleARN), | ||
| TaskRoleArn: aws.String(execCtx.ClusterConfig.TaskRoleARN), | ||
| ContainerDefinitions: execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions, | ||
| ContainerDefinitions: containerDefinitions, | ||
| } | ||
|
|
||
| registerOutput, err := execCtx.ecsClient.RegisterTaskDefinition(ctx, registerInput) | ||
|
|
@@ -373,6 +571,23 @@ func validateExecutionContext(ctx *executionContext) error { | |
| return fmt.Errorf("task count (%d) needs to be greater than 0 and less than cluster max task count (%d)", ctx.TaskCount, ctx.ClusterConfig.MaxTaskCount) | ||
| } | ||
|
|
||
| // Validate file downloads configuration | ||
| for i, download := range ctx.FileDownloads { | ||
| if download.Source == "" { | ||
| return fmt.Errorf("file download %d: source is required", i) | ||
| } | ||
| if download.Destination == "" { | ||
| return fmt.Errorf("file download %d: destination is required", i) | ||
| } | ||
| } | ||
|
|
||
| // Validate startup script configuration | ||
| if ctx.StartupScriptConfig != nil { | ||
| if ctx.StartupScriptConfig.Timeout < 0 { | ||
| return fmt.Errorf("timeout cannot be negative") | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
|
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| #!/bin/bash | ||
|
||
| set -e | ||
| echo 'Starting file downloads to {{.DownloadDir}}...' | ||
| {{if .CreateDirs}}mkdir -p {{.DownloadDir}}{{end}} | ||
|
|
||
| if ! command -v aws &> /dev/null; then | ||
|
||
| echo 'Installing AWS CLI...' | ||
| if apk update && apk add aws-cli; then | ||
| echo 'AWS CLI installed successfully' | ||
| else | ||
| echo 'ERROR: Failed to install AWS CLI' | ||
| exit 1 | ||
| fi | ||
| fi | ||
|
|
||
| {{range .Downloads}} | ||
| # Download: {{.Source}} | ||
| mkdir -p $(dirname {{.Destination}}) | ||
| {{if .IsS3}} | ||
| echo "Downloading from S3: {{.Source}}" | ||
| if aws s3 cp '{{.Source}}' '{{.Destination}}' --cli-read-timeout {{$.Timeout}} --cli-connect-timeout {{$.Timeout}}; then | ||
|
||
| echo "Successfully downloaded: {{.Source}}" | ||
|
|
||
| if [ -f '{{.Destination}}' ] && [ -s '{{.Destination}}' ]; then | ||
| echo "File verification passed: {{.Destination}}" | ||
| file_size=$(stat -c%s '{{.Destination}}' 2>/dev/null || echo "unknown") | ||
| echo "File size: $file_size bytes" | ||
| else | ||
| echo "ERROR: Downloaded file is empty or missing: {{.Destination}}" | ||
| exit 1 | ||
| fi | ||
| else | ||
| echo "ERROR: Failed to download from S3: {{.Source}}" | ||
| exit 1 | ||
| fi | ||
| {{end}} | ||
| {{end}} | ||
| echo 'All files downloaded successfully' | ||
| echo 'Starting main application...' | ||
| # Execute the original command | ||
| if [ -n "$ORIGINAL_COMMAND" ]; then | ||
| echo "Executing: $ORIGINAL_COMMAND" | ||
| exec $ORIGINAL_COMMAND | ||
prasadlohakpure marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else | ||
| echo "No original command found, executing: $@" | ||
|
||
| exec "$@" | ||
| fi | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded template path will fail when deployed as the relative path won't exist. Consider embedding the template file using
embed.FSor making the path configurable.