diff --git a/go.mod b/go.mod index 33b0ad8..c6bd2c5 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.18.3 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.18.3 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 + github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0 github.com/aws/aws-sdk-go-v2/service/glue v1.123.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.86.0 diff --git a/go.sum b/go.sum index 298765f..4359628 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2 h1:sBpc8Ph6CpfZsEdkz/8bfg8WhKlW github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.2/go.mod h1:Z2lDojZB+92Wo6EKiZZmJid9pPrDJW2NNIXSlaEfVlU= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0 h1:b7F96mjkzsqymMSGhuCqBQTZFx3mhTMa6IoG6SoVvC8= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.46.0/go.mod h1:F8Rqs4FVGBTUzx3wbFm7HB/mgIA4Tc6/x0yQmjoB+/w= +github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0 h1:E5/BzpoN6fc/xWtKiFPUJBW6nW3KFINCz6so7v/fQ8E= +github.com/aws/aws-sdk-go-v2/service/ecs v1.62.0/go.mod h1:UrdK8ip8HSwnESeuXhte4vlRVv0GIOpC92LR1+2m+zA= github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0 h1:g36yievH3ecx+76/79+LIal5FqIXT+2J3Ztynayj9d4= github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.37.0/go.mod h1:7J9Pidm1n/FvnKMcCUzi8eWNDzzqR6w3sbjhXrMXaxk= github.com/aws/aws-sdk-go-v2/service/glue v1.123.0 h1:oI5uX+NPxqkIP1Qy8XTlTiG3blhDDI0h1OtrOs14KZw= diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go new file mode 100644 index 0000000..db25180 --- /dev/null +++ b/internal/pkg/object/command/ecs/ecs.go @@ -0,0 +1,565 @@ +package ecs + +import ( + ct "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/ecs" + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/duration" + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/job" + "github.com/patterninc/heimdall/pkg/plugin" + "github.com/patterninc/heimdall/pkg/result" + "github.com/patterninc/heimdall/pkg/result/column" +) + +// ECS command context structure +type ecsCommandContext struct { + TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"` + TaskCount int `yaml:"task_count,omitempty" json:"task_count,omitempty"` + ContainerOverrides []types.ContainerOverride `yaml:"container_overrides,omitempty" json:"container_overrides,omitempty"` + PollingInterval duration.Duration `yaml:"polling_interval,omitempty" json:"polling_interval,omitempty"` + Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` + MaxFailCount int `yaml:"max_fail_count,omitempty" json:"max_fail_count,omitempty"` // max failures before giving up +} + +// ECS cluster context structure +type ecsClusterContext struct { + CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"` + Memory int `yaml:"memory,omitempty" json:"memory,omitempty"` + MaxTaskCount int `yaml:"max_task_count,omitempty" json:"max_task_count,omitempty"` + ExecutionRoleARN string `yaml:"execution_role_arn,omitempty" json:"execution_role_arn,omitempty"` + TaskRoleARN string `yaml:"task_role_arn,omitempty" json:"task_role_arn,omitempty"` + ClusterName string `yaml:"cluster_name,omitempty" json:"cluster_name,omitempty"` + LaunchType string `yaml:"launch_type,omitempty" json:"launch_type,omitempty"` + VPCConfig vpcConfig `yaml:"vpc_config,omitempty" json:"vpc_config,omitempty"` +} + +// VPC configuration structure +type vpcConfig struct { + Subnets []string `yaml:"subnets,omitempty" json:"subnets,omitempty"` + SecurityGroups []string `yaml:"security_groups,omitempty" json:"security_groups,omitempty"` +} + +// Task definition wrapper with pre-computed essential containers map +type taskDefinitionWrapper struct { + TaskDefinition *types.TaskDefinition + EssentialContainers map[string]bool +} + +// Task position tracker structure +type taskTracker struct { + Name string + ActiveARN string + TaskNum int // Original task number (0, 1, 2, etc.) + Retries int + ExecutionTime float64 + FailedARNs []string // History of ARNs for this position + Completed bool +} + +// executionContext holds the final resolved configuration for job execution. +type executionContext struct { + TaskCount int `json:"task_count"` + TaskDefinitionWrapper *taskDefinitionWrapper `json:"task_definition_wrapper"` + ContainerOverrides []types.ContainerOverride `json:"container_overrides"` + ClusterConfig *ecsClusterContext `json:"cluster_config"` + + PollingInterval duration.Duration `json:"polling_interval"` + Timeout duration.Duration `json:"timeout"` + MaxFailCount int `json:"max_fail_count"` + + ecsClient *ecs.Client + taskDefARN *string + tasks map[string]*taskTracker +} + +const ( + defaultPollingInterval = duration.Duration(30 * time.Second) + defaultTaskTimeout = duration.Duration(1 * time.Hour) + defaultMaxFailCount = 1 + defaultTaskCount = 1 + startedByPrefix = "heimdall-job-" + errMaxFailCount = "task %s failed %d times (max: %d), giving up" + errPollingTimeout = "polling timed out for arns %v after %v" +) + +var ( + ctx = ct.Background() + errMissingTemplate = fmt.Errorf("task definition template is required") +) + +func New(commandContext *context.Context) (plugin.Handler, error) { + + e := &ecsCommandContext{ + PollingInterval: defaultPollingInterval, + Timeout: defaultTaskTimeout, + MaxFailCount: defaultMaxFailCount, + TaskCount: defaultTaskCount, + } + + if commandContext != nil { + if err := commandContext.Unmarshal(e); err != nil { + return nil, err + } + } + + return e.handler, nil + +} + +// handler implements the main ECS plugin logic +func (e *ecsCommandContext) handler(r *plugin.Runtime, job *job.Job, cluster *cluster.Cluster) error { + + // Build execution context with resolved configuration and loaded template + execCtx, err := buildExecutionContext(e, job, cluster) + if err != nil { + return err + } + + // register task definition + if err := execCtx.registerTaskDefinition(); err != nil { + return err + } + + // Start tasks + if err := execCtx.startTasks(job.ID); err != nil { + return err + } + + // Poll for completion + if err := execCtx.pollForCompletion(); err != nil { + return err + } + + // Store results + if err := storeResults(execCtx, job); err != nil { + return err + } + + return nil + +} + +// prepare and register task definition with ECS +func (execCtx *executionContext) registerTaskDefinition() error { + registerInput := &ecs.RegisterTaskDefinitionInput{ + Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)), + RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate}, + NetworkMode: types.NetworkModeAwsvpc, + Cpu: aws.String(fmt.Sprintf("%d", execCtx.ClusterConfig.CPU)), + Memory: aws.String(fmt.Sprintf("%d", execCtx.ClusterConfig.Memory)), + ExecutionRoleArn: aws.String(execCtx.ClusterConfig.ExecutionRoleARN), + TaskRoleArn: aws.String(execCtx.ClusterConfig.TaskRoleARN), + ContainerDefinitions: execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions, + } + + registerOutput, err := execCtx.ecsClient.RegisterTaskDefinition(ctx, registerInput) + if err != nil { + return err + } + + execCtx.taskDefARN = registerOutput.TaskDefinition.TaskDefinitionArn + + return nil + +} + +// startTasks launches all tasks and returns a map of task trackers +func (execCtx *executionContext) startTasks(jobID string) error { + + for i := 0; i < execCtx.TaskCount; i++ { + taskARN, err := runTask(execCtx, fmt.Sprintf("%s%s-%d", startedByPrefix, jobID, i), i) + if err != nil { + return err + } + taskName := fmt.Sprintf("%s%s-%d", startedByPrefix, jobID, i) + execCtx.tasks[taskName] = &taskTracker{ + Name: taskName, + ActiveARN: taskARN, + TaskNum: i, + } + } + + return nil +} + +// monitor tasks until completion, faliure, or timeout +func (execCtx *executionContext) pollForCompletion() error { + + startTime := time.Now() + stopTime := startTime.Add(time.Duration(execCtx.Timeout)) + + // Poll until all tasks are complete or timeout + for { + // Describe the uncompleted tasks we're tracking + var activeARNs []string + for _, tracker := range execCtx.tasks { + if !tracker.Completed { + activeARNs = append(activeARNs, tracker.ActiveARN) + } + } + + // If no active tasks, we're done + if len(activeARNs) == 0 { + break + } + + describeInput := &ecs.DescribeTasksInput{ + Cluster: aws.String(execCtx.ClusterConfig.ClusterName), + Tasks: activeARNs, + } + + describeOutput, err := execCtx.ecsClient.DescribeTasks(ctx, describeInput) + if err != nil { + return err + } + + // Check if all tasks are complete + allComplete := true + + for _, task := range describeOutput.Tasks { + + // If the task is not stopped, it's not complete + if aws.ToString(task.LastStatus) != "STOPPED" { + allComplete = false + continue + } + + // If task has stopped, grab its tracker to start updating + tracker, exists := execCtx.tasks[aws.ToString(task.StartedBy)] + if !exists { + return fmt.Errorf("could not find tracker for StartedBy tag %s", aws.ToString(task.StartedBy)) + } + + // Check for task failures based on exit code + if isTaskSuccessful(task, execCtx) { + // Update the tracker directly + tracker.ExecutionTime = time.Since(startTime).Seconds() // Total time from start + tracker.Completed = true + continue + } + + tracker.Retries++ + tracker.FailedARNs = append(tracker.FailedARNs, aws.ToString(task.TaskArn)) + + // Exit if we've failed too many times + if tracker.Retries >= execCtx.MaxFailCount { + + // Stop all other running tasks + reason := fmt.Sprintf(errMaxFailCount, tracker.ActiveARN, tracker.Retries, execCtx.MaxFailCount) + if err := stopAllTasks(execCtx, reason); err != nil { + return err + } + + return fmt.Errorf("%s", reason) + } + + newTaskARN, err := runTask(execCtx, tracker.Name, tracker.TaskNum) + if err != nil { + return err + } + + // Assign the new task ARN to the tracker + tracker.ActiveARN = newTaskARN + + // Task failed but will be restarted, so mark as not complete + allComplete = false + continue + } + + // If all tasks are complete, break out of the loop + if allComplete { + break + } + + // Check if we've timed out + if time.Now().After(stopTime) { + // Collect ARNs of tasks that did not complete + var incompleteARNs []string + for _, tracker := range execCtx.tasks { + if !tracker.Completed { + incompleteARNs = append(incompleteARNs, tracker.ActiveARN) + } + } + + // Stop all remaining tasks + reason := fmt.Sprintf(errPollingTimeout, incompleteARNs, execCtx.Timeout) + if err := stopAllTasks(execCtx, reason); err != nil { + return err + } + + // Return error with information about incomplete tasks + return fmt.Errorf("%s", reason) + } + + // Sleep until next poll time + time.Sleep(time.Duration(execCtx.PollingInterval)) + } + + // If you're here, all tasks are complete + return nil + +} + +func buildExecutionContext(commandCtx *ecsCommandContext, j *job.Job, c *cluster.Cluster) (*executionContext, error) { + + execCtx := &executionContext{ + tasks: make(map[string]*taskTracker), + } + + // Create a context from commandCtx and unmarshal onto execCtx (defaults) + commandContext := context.New(commandCtx) + if err := commandContext.Unmarshal(execCtx); err != nil { + return nil, err + } + + // Overlay job context (overrides command values) + if j.Context != nil { + if err := j.Context.Unmarshal(execCtx); err != nil { + return nil, err + } + } + + // Add cluster config (no overlapping values) + clusterContext := &ecsClusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterContext); err != nil { + return nil, err + } + } + execCtx.ClusterConfig = clusterContext + + // Load task definition template + taskDefWrapper, err := loadTaskDefinitionTemplate(commandCtx.TaskDefinitionTemplate) + if err != nil { + return nil, err + } + execCtx.TaskDefinitionWrapper = taskDefWrapper // Store the wrapper for polling + + // Build container overrides for all containers + if err := buildContainerOverrides(execCtx); err != nil { + return nil, err + } + + // Validate the resolved configuration + if err := validateExecutionContext(execCtx); err != nil { + return nil, err + } + + // initialize AWS session + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + execCtx.ecsClient = ecs.NewFromConfig(cfg) + + return execCtx, nil + +} + +// validateExecutionContext validates the final resolved configuration +func validateExecutionContext(ctx *executionContext) error { + + if ctx.TaskCount <= 0 || ctx.TaskCount > ctx.ClusterConfig.MaxTaskCount { + return fmt.Errorf("task count (%d) needs to be greater than 0 and less than cluster max task count (%d)", ctx.TaskCount, ctx.ClusterConfig.MaxTaskCount) + } + + return nil + +} + +// buildContainerOverrides processes container overrides and builds the final overrides for all containers +func buildContainerOverrides(execCtx *executionContext) error { + + // Create a map of container names + existingContainers := make(map[string]bool) + for _, container := range execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions { + existingContainers[aws.ToString(container.Name)] = true + } + + // Create a map of execution context overrides + containerOverridesMap := make(map[string]types.ContainerOverride) + for _, containerOverride := range execCtx.ContainerOverrides { + // Validate that the container name exists + if !existingContainers[aws.ToString(containerOverride.Name)] { + return fmt.Errorf("container override '%s' not found in task definition template", aws.ToString(containerOverride.Name)) + } + containerOverridesMap[aws.ToString(containerOverride.Name)] = containerOverride + } + + // Build container overrides for all containers in the task definition + var containerOverrides []types.ContainerOverride + for _, container := range execCtx.TaskDefinitionWrapper.TaskDefinition.ContainerDefinitions { + containerName := aws.ToString(container.Name) + + // Use existing override if it exists, otherwise create a blank one + if override, exists := containerOverridesMap[containerName]; exists { + containerOverrides = append(containerOverrides, override) + } else { + containerOverrides = append(containerOverrides, types.ContainerOverride{ + Name: aws.String(containerName), + }) + } + } + + execCtx.ContainerOverrides = containerOverrides + + return nil + +} + +// stopAllTasks stops all non-completed tasks with the given reason +func stopAllTasks(execCtx *executionContext, reason string) error { + + for _, t := range execCtx.tasks { + if t.Completed { + continue + } + stopInput := &ecs.StopTaskInput{ + Cluster: aws.String(execCtx.ClusterConfig.ClusterName), + Task: aws.String(t.ActiveARN), + Reason: aws.String(reason), + } + + _, err := execCtx.ecsClient.StopTask(ctx, stopInput) + if err != nil { + return err + } + } + + return nil + +} + +func loadTaskDefinitionTemplate(templatePath string) (*taskDefinitionWrapper, error) { + + if templatePath == `` { + return nil, errMissingTemplate + } + + data, err := os.ReadFile(templatePath) + if err != nil { + return nil, err + } + + var taskDef types.TaskDefinition + if err := json.Unmarshal(data, &taskDef); err != nil { + return nil, err + } + + // Pre-compute essential containers map + essentialContainers := make(map[string]bool) + for _, containerDef := range taskDef.ContainerDefinitions { + if containerDef.Essential != nil && *containerDef.Essential { + essentialContainers[aws.ToString(containerDef.Name)] = true + } + } + + return &taskDefinitionWrapper{ + TaskDefinition: &taskDef, + EssentialContainers: essentialContainers, + }, nil + +} + +// runTask runs a single task and returns the task ARN +func runTask(execCtx *executionContext, startedBy string, taskNum int) (string, error) { + + // Create a copy of the overrides and add TASK_NAME and TASK_NUM env variables + finalOverrides := append([]types.ContainerOverride{}, execCtx.ContainerOverrides...) + + for i := range finalOverrides { + finalOverrides[i].Environment = append(finalOverrides[i].Environment, + types.KeyValuePair{ + Name: aws.String("TASK_NAME"), + Value: aws.String(startedBy), + }, + types.KeyValuePair{ + Name: aws.String("TASK_NUM"), + Value: aws.String(fmt.Sprintf("%d", taskNum)), + }, + ) + } + + // build run task input + runTaskInput := &ecs.RunTaskInput{ + Cluster: aws.String(execCtx.ClusterConfig.ClusterName), + TaskDefinition: execCtx.taskDefARN, + LaunchType: types.LaunchType(execCtx.ClusterConfig.LaunchType), + Count: aws.Int32(1), + StartedBy: aws.String(startedBy), + Overrides: &types.TaskOverride{ + ContainerOverrides: finalOverrides, + }, + NetworkConfiguration: &types.NetworkConfiguration{ + AwsvpcConfiguration: &types.AwsVpcConfiguration{ + Subnets: execCtx.ClusterConfig.VPCConfig.Subnets, + SecurityGroups: execCtx.ClusterConfig.VPCConfig.SecurityGroups, + AssignPublicIp: types.AssignPublicIpDisabled, + }, + }, + } + + runTaskOutput, err := execCtx.ecsClient.RunTask(ctx, runTaskInput) + if err != nil { + return ``, err + } + + taskARN := aws.ToString(runTaskOutput.Tasks[0].TaskArn) + + return taskARN, nil + +} + +// tasks are successful if all essential containers exit with a zero exit code +func isTaskSuccessful(task types.Task, execCtx *executionContext) bool { + // Check all containers in the running task + for _, container := range task.Containers { + containerName := aws.ToString(container.Name) + + if execCtx.TaskDefinitionWrapper.EssentialContainers[containerName] { + if container.ExitCode != nil && *container.ExitCode != 0 { + return false + } + } + } + + return true + +} + +// storeResults builds and stores the final result for the job. +func storeResults(execCtx *executionContext, j *job.Job) error { + + // Build result + j.Result = &result.Result{} + j.Result.Columns = []*column.Column{ + {Name: "task_arn", Type: "string"}, + {Name: "duration", Type: "float"}, + {Name: "retries", Type: "int"}, + {Name: "failed_arns", Type: "string"}, + } + + // Create result data from task results + j.Result.Data = make([][]interface{}, 0, len(execCtx.tasks)) + for _, tracker := range execCtx.tasks { + j.Result.Data = append(j.Result.Data, []interface{}{ + tracker.ActiveARN, + tracker.ExecutionTime, + tracker.Retries, + strings.Join(tracker.FailedARNs, ","), + }) + } + + return nil + +} diff --git a/pkg/duration/duration.go b/pkg/duration/duration.go new file mode 100644 index 0000000..d87c259 --- /dev/null +++ b/pkg/duration/duration.go @@ -0,0 +1,51 @@ +package duration + +import ( + "encoding/json" + "time" + + "gopkg.in/yaml.v3" +) + +type Duration time.Duration + +// UnmarshalYAML allows YAML like "duration: 15s" +func (d *Duration) UnmarshalYAML(value *yaml.Node) error { + + var s string + if err := value.Decode(&s); err != nil { + return err + } + + dur, err := time.ParseDuration(s) + if err != nil { + return err + } + *d = Duration(dur) + + return nil + +} + +// UnmarshalJSON allows JSON like "duration": "15s" +func (d *Duration) UnmarshalJSON(data []byte) error { + + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + dur, err := time.ParseDuration(s) + if err != nil { + return err + } + *d = Duration(dur) + + return nil + +} + +// MarshalJSON marshals the duration as a string +func (d Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(d).String()) +} diff --git a/plugins/ecs/README.md b/plugins/ecs/README.md new file mode 100644 index 0000000..bef4fc1 --- /dev/null +++ b/plugins/ecs/README.md @@ -0,0 +1,382 @@ +# 🚀 ECS Fargate Plugin + +The **ECS Fargate Plugin** enables Heimdall to run AWS ECS tasks on Fargate clusters. It builds task definitions dynamically from templates and runs short-lived batch jobs with robust failure tracking and retry mechanisms. + +--- + +## 🧩 Plugin Overview + +* **Plugin Name:** `ecs` +* **Execution Mode:** Asynchronous +* **Use Case:** Running containerized batch jobs on AWS ECS Fargate with automatic retries and failure management + +--- + +## ⚙️ Defining an ECS Command + +An ECS command requires a task definition template and configuration for running tasks: + +```yaml +- name: ecs-batch-job-0.0.1 + status: active + plugin: ecs + version: 0.0.1 + description: Run batch jobs on ECS Fargate with retry logic + context: + task_definition_template: /path/to/task-definition.json + task_count: 3 + container_overrides: + - name: main + command: ["/usr/local/bin/my-script.sh", "arg1", "arg2"] + environment: + - name: ENV_VAR + value: "value" + polling_interval: "30s" # 30 seconds + timeout: "1h" # 1 hour + max_fail_count: 3 # max retries per task + tags: + - type:ecs + cluster_tags: + - type:fargate +``` + +🔸 This defines an ECS command that: +- Uses a task definition template from a local JSON file +- Runs 3 tasks in parallel +- Overrides the command and environment variables for the "main" container +- Polls for completion every 30 seconds with a 1-hour timeout +- Retries failed tasks up to 3 times before giving up + +**Duration Format:** +- Use human-readable duration strings: `"30s"`, `"1h"`, `"45m"`, `"2h15m30s"` +- Examples: `"15s"`, `"1h"`, `"30m"`, `"2h30m"`, `"3600s"` + +--- + +## 🖥️ Cluster Configuration + +ECS clusters must specify Fargate configuration, IAM roles, and network settings: + +```yaml +- name: fargate-cluster + status: active + version: 0.0.1 + description: ECS Fargate cluster for batch jobs + context: + cpu: 256 + memory: 512 + max_task_count: 10 + execution_role_arn: arn:aws:iam::123456789012:role/ecsTaskExecutionRole + task_role_arn: arn:aws:iam::123456789012:role/ecsTaskRole + cluster_name: my-fargate-cluster + launch_type: FARGATE + vpc_config: + subnets: + - subnet-12345678 + - subnet-87654321 + security_groups: + - sg-12345678 + tags: + - type:fargate + - data:prod +``` + +--- + +## 🚀 Submitting an ECS Job + +A typical ECS job includes task configuration and optional overrides: + +```json +{ + "name": "run-batch-job", + "version": "0.0.1", + "command_criteria": ["type:ecs"], + "cluster_criteria": ["type:fargate"], + "context": { + "task_count": 2, + "container_overrides": [ + { + "name": "main", + "command": ["/usr/local/bin/process-data.sh"], + "environment": [ + { + "name": "DATA_SOURCE", + "value": "s3://my-bucket/data/" + }, + { + "name": "BATCH_SIZE", + "value": "1000" + } + ] + } + ] + } +} +``` + +🔹 The job will: +- Run 2 tasks in parallel +- Override the command to run `process-data.sh` +- Set the `DATA_SOURCE` and `BATCH_SIZE` environment variables +- **Automatically add `TASK_NAME` environment variable** to each container (e.g., `heimdall-job-{job-id}-{task-number}`) + +--- + +## 🔄 Failure Tracking & Retry Logic + +The plugin implements robust failure tracking with the following features: + +### Automatic Retries +- Failed tasks are automatically retried up to `max_fail_count` times +- Each retry creates a new task with a unique ARN +- Failed ARNs are tracked in the job results + +### Failure Scenarios +1. **Task Failure**: Non-zero exit code from essential containers +2. **Timeout**: Job exceeds the configured timeout period +3. **Max Retries**: Task fails more than `max_fail_count` times + +### Failure Response +- When a single task exceeds `max_fail_count`, all other running tasks are stopped +- When timeout occurs, all remaining tasks are stopped + +--- + +## 📦 Task Definition Template + +The task definition template should be a complete ECS task definition JSON file: + +```json +{ + "family": "batch-job", + "requiresCompatibilities": ["FARGATE"], + "networkMode": "awsvpc", + "cpu": "256", + "memory": "512", + "executionRoleArn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole", + "taskRoleArn": "arn:aws:iam::123456789012:role/ecsTaskRole", + "containerDefinitions": [ + { + "name": "main", + "image": "alpine:latest", + "command": ["echo", "Hello World"], + "essential": true, + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/batch-job", + "awslogs-region": "us-west-2", + "awslogs-stream-prefix": "ecs" + } + }, + "environment": [ + { + "name": "ENVIRONMENT", + "value": "production" + }, + { + "name": "LOG_LEVEL", + "value": "info" + } + ] + } + ] +} +``` + +--- + +## 📊 Job Results + +The plugin returns comprehensive task execution results including: + +```json +{ + "columns": [ + {"name": "task_arn", "type": "string"}, + {"name": "duration", "type": "float"}, + {"name": "retries", "type": "int"}, + {"name": "failed_arns", "type": "string"} + ], + "data": [ + ["arn:aws:ecs:us-west-2:123456789012:task/abc123", 45.2, 0, ""], + ["arn:aws:ecs:us-west-2:123456789012:task/def456", 43.8, 2, "arn:aws:ecs:us-west-2:123456789012:task/old1,arn:aws:ecs:us-west-2:123456789012:task/old2"] + ] +} +``` + +**Result Fields:** +- `task_arn`: The final task ARN (successful or last retry) +- `duration`: Total execution time in seconds +- `retries`: Number of retries attempted +- `failed_arns`: Comma-separated list of failed task ARNs + +--- + +## 🔍 Task Monitoring + +The plugin tracks tasks using the `startedBy` tag with format `heimdall-job-{job_id}-{task_number}`. This allows for: + +- Status polling to check task completion +- Retrieving task results and exit codes +- Monitoring execution times +- Tracking retry attempts and failed ARNs + +--- + +## 🌍 Environment Variables + +### Automatic Variables +The plugin automatically adds the following environment variable to all containers: +- `TASK_NAME`: The unique task identifier (e.g., `heimdall-job-abc123-0`) with the final digit being the task index + +### Custom Variables +You can add custom environment variables through: +1. **Task Definition Template**: Base environment variables +2. **Container Overrides**: Container Overrides can exist in both command context and job context. Job context will override command context, and command context will override any ENVs in the task definition template. + +**Example:** +```yaml +container_overrides: + - name: "main" + environment: + - name: "CUSTOM_VAR" + value: "custom_value" + - name: "ANOTHER_VAR" + value: "another_value" +``` + +**Final Environment Variables:** +- `ENVIRONMENT=production` (from template) +- `LOG_LEVEL=info` (from template) +- `CUSTOM_VAR=custom_value` (from override) +- `ANOTHER_VAR=another_value` (from override) +- `TASK_NAME=heimdall-job-abc123-0` (automatically added) + +--- + +## 🧠 Best Practices + +* **Task Definition Templates**: Use complete task definitions with proper logging configuration +* **Resource Limits**: Set appropriate CPU and memory limits based on workload requirements +* **IAM Roles**: Ensure execution and task roles have minimal necessary permissions +* **Network Configuration**: Use private subnets when possible, public subnets only if internet access is required +* **Container Overrides**: Use overrides for job-specific parameters rather than modifying templates +* **Retry Configuration**: Set appropriate `max_fail_count` based on your application's reliability +* **Timeout Settings**: Configure timeouts based on expected job duration using human-readable format +* **Environment Variables**: Use `TASK_NAME` for task-specific logic in your containers + +--- + +## 🔐 Security Considerations + +* **IAM Roles**: Use least-privilege IAM roles for task execution +* **Network Security**: Configure security groups to allow only necessary traffic +* **Container Images**: Use trusted base images and scan for vulnerabilities +* **Environment Variables**: Avoid passing sensitive data as environment variables; use AWS Secrets Manager instead +* **Task Isolation**: Each task runs with its own `TASK_NAME` for proper isolation + +--- + +## 📝 Complete Example + +### Task Definition Template (`task-definition.json`) +```json +{ + "family": "batch-job-template", + "requiresCompatibilities": ["FARGATE"], + "networkMode": "awsvpc", + "cpu": "256", + "memory": "512", + "executionRoleArn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole", + "taskRoleArn": "arn:aws:iam::123456789012:role/ecsTaskRole", + "containerDefinitions": [ + { + "name": "main", + "image": "alpine:latest", + "command": ["sh", "-c", "echo \"Task name: $TASK_NAME\" && exit 0"], + "essential": true, + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-group": "/ecs/batch-jobs", + "awslogs-region": "us-west-2", + "awslogs-stream-prefix": "ecs" + } + }, + "environment": [ + { + "name": "ENVIRONMENT", + "value": "production" + }, + { + "name": "LOG_LEVEL", + "value": "info" + } + ] + } + ], + "ephemeralStorage": { + "sizeInGiB": 21 + }, + "runtimePlatform": { + "cpuArchitecture": "X86_64", + "operatingSystemFamily": "LINUX" + } +} +``` + +### Command Configuration +```yaml +- name: ecs-batch-job-0.0.1 + status: active + plugin: ecs + version: 0.0.1 + description: Run batch jobs on ECS Fargate with retry logic + context: + task_definition_template: task-definition.json + task_count: 2 + container_overrides: + - name: main + environment: + - name: CUSTOM_VAR + value: custom_value + polling_interval: "30s" # 30 seconds + timeout: "1h" # 1 hour + max_fail_count: 3 + tags: + - type:ecs + cluster_tags: + - type:fargate +``` + +### Job Submission +```json +{ + "name": "run-batch-job", + "version": "0.0.1", + "command_criteria": ["type:ecs"], + "cluster_criteria": ["type:fargate"], + "context": { + "task_count": 2, + "container_overrides": [ + { + "name": "main", + "environment": [ + { + "name": "DATA_SOURCE", + "value": "s3://my-bucket/data/" + } + ] + } + ] + } +} +``` + +This configuration will run 2 tasks, each with the environment variables: +- `ENVIRONMENT=production` +- `LOG_LEVEL=info` +- `DATA_SOURCE=s3://my-bucket/data/` +- `TASK_NAME=heimdall-job-{job-id}-{task-number}` (automatically added) \ No newline at end of file diff --git a/plugins/ecs/ecs.go b/plugins/ecs/ecs.go new file mode 100644 index 0000000..10f8f62 --- /dev/null +++ b/plugins/ecs/ecs.go @@ -0,0 +1,12 @@ +package main + +import ( + "github.com/patterninc/heimdall/internal/pkg/object/command/ecs" + "github.com/patterninc/heimdall/pkg/context" + "github.com/patterninc/heimdall/pkg/plugin" +) + +// New creates a new ECS plugin handler. +func New(commandContext *context.Context) (plugin.Handler, error) { + return ecs.New(commandContext) +}