From 42a689a1549064b27ad06dcd20c7183d0c59cda4 Mon Sep 17 00:00:00 2001 From: Stan Babourine Date: Mon, 21 Jul 2025 13:39:02 -0600 Subject: [PATCH 1/7] bootstrapping support for arguments for spark jobs --- internal/pkg/object/command/spark/spark.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 948b3d9..2ecf101 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -38,6 +38,7 @@ type sparkCommandContext struct { // sparkJobContext represents the context for a spark job type sparkJobContext struct { Query string `yaml:"query,omitempty" json:"query,omitempty"` + Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` } From 7679c3937c9e2eba296ad92743c141ad242ec6c5 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Mon, 21 Jul 2025 17:13:28 -0600 Subject: [PATCH 2/7] Small fixes --- .gitignore | 2 ++ internal/pkg/object/command/spark/spark.go | 26 +++++++++++++--------- plugins/spark/README.md | 7 ++++++ 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 97bc153..1744184 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,7 @@ go.work.sum # mac .DS_Store +#idea +.idea/ # build dir dist/ diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 2ecf101..f50bf48 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -41,6 +41,7 @@ type sparkJobContext struct { Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` + EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"` } // sparkClusterContext represents the context for a spark cluster @@ -172,15 +173,18 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. // let's set job driver jobDriver := &types.JobDriver{} - jobParameters := getSparkSqlParameters(jobContext.Properties) - if jobContext.ReturnResult { + jobParameters := getSparkSubmitParameters(jobContext) + + if jobContext.ReturnResult || jobContext.Arguments != nil { + + args := jobContext.Arguments + if args == nil { + args = []string{queryURI, resultURI} + } jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ - EntryPoint: s.WrapperURI, - EntryPointArguments: []string{ - queryURI, - resultURI, - }, + EntryPoint: s.WrapperURI, + EntryPointArguments: args, SparkSubmitParameters: jobParameters, } @@ -293,14 +297,16 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error } -func getSparkSqlParameters(properties map[string]string) *string { - +func getSparkSubmitParameters(context *sparkJobContext) *string { + properties := context.Properties conf := make([]string, 0, len(properties)) for k, v := range properties { conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v)) } - + if context.EntryPoint != "" { + conf = append(conf, fmt.Sprintf("--class %s", context.EntryPoint)) + } return aws.String(strings.Join(conf, ` `)) } diff --git a/plugins/spark/README.md b/plugins/spark/README.md index 1078711..feb5425 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -26,6 +26,10 @@ A Spark command requires a SQL `query` in its job context and optionally job-spe queries_uri: s3://bucket/spark/queries results_uri: s3://bucket/spark/results logs_uri: s3://bucket/spark/logs + wrapper_uri: s3://bucket/contrib/spark/spark-sql-s3-wrapper.py properties: spark.executor.instances: "1" @@ -76,7 +80,10 @@ A typical Spark job includes a SQL query and optional Spark properties: "command_criteria": ["type:sparksql"], "cluster_criteria": ["data_prod"], "context": { + // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and "entry_point". "query": "SELECT * FROM my_table WHERE dt='2023-01-01'", + "arguments": ["SELECT 1", "s3:///"], + "entry_point": "com.your.company.ClassName", "properties": { "spark.sql.shuffle.partitions": "10" }, From 4b730502d3270998ed3c1062d784f1b8e6c387be Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Mon, 21 Jul 2025 17:16:09 -0600 Subject: [PATCH 3/7] Small fixes --- internal/pkg/object/command/spark/spark.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index f50bf48..0ac3cc9 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -174,17 +174,16 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. // let's set job driver jobDriver := &types.JobDriver{} jobParameters := getSparkSubmitParameters(jobContext) - - if jobContext.ReturnResult || jobContext.Arguments != nil { - - args := jobContext.Arguments - if args == nil { - args = []string{queryURI, resultURI} + if jobContext.Arguments != nil { + jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ + EntryPoint: s.WrapperURI, + EntryPointArguments: jobContext.Arguments, + SparkSubmitParameters: jobParameters, } - + } else if jobContext.ReturnResult { jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ EntryPoint: s.WrapperURI, - EntryPointArguments: args, + EntryPointArguments: []string{queryURI, resultURI}, SparkSubmitParameters: jobParameters, } From 38eae1d487311b1b5d49cbabe1953449a3875931 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Mon, 21 Jul 2025 17:18:46 -0600 Subject: [PATCH 4/7] Small fixes --- internal/pkg/object/command/spark/spark.go | 49 ++++++++++++---------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 0ac3cc9..58e5cc7 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -173,28 +173,7 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. // let's set job driver jobDriver := &types.JobDriver{} - jobParameters := getSparkSubmitParameters(jobContext) - if jobContext.Arguments != nil { - jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ - EntryPoint: s.WrapperURI, - EntryPointArguments: jobContext.Arguments, - SparkSubmitParameters: jobParameters, - } - } else if jobContext.ReturnResult { - jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ - EntryPoint: s.WrapperURI, - EntryPointArguments: []string{queryURI, resultURI}, - SparkSubmitParameters: jobParameters, - } - - } else { - - jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{ - EntryPoint: &queryURI, - SparkSqlParameters: jobParameters, - } - - } + setJobDriver(jobContext, jobDriver, s, queryURI, resultURI) // let's prepare job payload jobPayload := &emrcontainers.StartJobRunInput{ @@ -275,6 +254,32 @@ timeoutLoop: } +func setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, s *sparkCommandContext, queryURI string, resultURI string) { + jobParameters := getSparkSubmitParameters(jobContext) + if jobContext.Arguments != nil { + jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ + EntryPoint: s.WrapperURI, + EntryPointArguments: jobContext.Arguments, + SparkSubmitParameters: jobParameters, + } + return + } + if jobContext.ReturnResult { + jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ + EntryPoint: s.WrapperURI, + EntryPointArguments: []string{queryURI, resultURI}, + SparkSubmitParameters: jobParameters, + } + return + } + + jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{ + EntryPoint: &queryURI, + SparkSqlParameters: jobParameters, + } + +} + func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error) { // let's get the cluster ID From 9bdc77604c0cf1f60fd6a81a9d4c8abd7878fc85 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Tue, 22 Jul 2025 08:38:20 -0600 Subject: [PATCH 5/7] Fix review comments --- internal/pkg/object/command/spark/spark.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 58e5cc7..a0194f2 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -173,7 +173,7 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. // let's set job driver jobDriver := &types.JobDriver{} - setJobDriver(jobContext, jobDriver, s, queryURI, resultURI) + s.setJobDriver(jobContext, jobDriver, queryURI, resultURI) // let's prepare job payload jobPayload := &emrcontainers.StartJobRunInput{ @@ -254,7 +254,7 @@ timeoutLoop: } -func setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, s *sparkCommandContext, queryURI string, resultURI string) { +func (s *sparkCommandContext) setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, queryURI string, resultURI string) { jobParameters := getSparkSubmitParameters(jobContext) if jobContext.Arguments != nil { jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ From 6ac2a63acc2186b123c7e95661780a2d47ca37c1 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Tue, 22 Jul 2025 10:05:30 -0600 Subject: [PATCH 6/7] Improve submite parameters --- internal/pkg/object/command/spark/spark.go | 32 ++++++++++++---------- plugins/spark/README.md | 14 +++++++--- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index a0194f2..a5df24d 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -26,6 +26,11 @@ import ( "github.com/patterninc/heimdall/pkg/result" ) +type sparkSubmitParameters struct { + Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` + EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"` +} + // spark represents the Spark command context type sparkCommandContext struct { QueriesURI string `yaml:"queries_uri,omitempty" json:"queries_uri,omitempty"` @@ -37,11 +42,10 @@ type sparkCommandContext struct { // sparkJobContext represents the context for a spark job type sparkJobContext struct { - Query string `yaml:"query,omitempty" json:"query,omitempty"` - Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` - Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` - ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` - EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"` + Query string `yaml:"query,omitempty" json:"query,omitempty"` + Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` + SubmitParameters sparkSubmitParameters `yaml:"submit_parameters,omitempty" json:"submit_parameters,omitempty"` + ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` } // sparkClusterContext represents the context for a spark cluster @@ -107,19 +111,19 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. } // let's prepare job properties - if jobContext.Properties == nil { - jobContext.Properties = make(map[string]string) + if jobContext.SubmitParameters.Properties == nil { + jobContext.SubmitParameters.Properties = make(map[string]string) } for k, v := range s.Properties { - if _, found := jobContext.Properties[k]; !found { - jobContext.Properties[k] = v + if _, found := jobContext.SubmitParameters.Properties[k]; !found { + jobContext.SubmitParameters.Properties[k] = v } } // do we have driver memory setting in the job properties? - if value, found := jobContext.Properties[driverMemoryProperty]; found { + if value, found := jobContext.SubmitParameters.Properties[driverMemoryProperty]; found { clusterContext.Properties[driverMemoryProperty] = value - delete(jobContext.Properties, driverMemoryProperty) + delete(jobContext.SubmitParameters.Properties, driverMemoryProperty) } // setting AWS client @@ -302,14 +306,14 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error } func getSparkSubmitParameters(context *sparkJobContext) *string { - properties := context.Properties + properties := context.SubmitParameters.Properties conf := make([]string, 0, len(properties)) for k, v := range properties { conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v)) } - if context.EntryPoint != "" { - conf = append(conf, fmt.Sprintf("--class %s", context.EntryPoint)) + if context.SubmitParameters.EntryPoint != "" { + conf = append(conf, fmt.Sprintf("--class %s", context.SubmitParameters.EntryPoint)) } return aws.String(strings.Join(conf, ` `)) diff --git a/plugins/spark/README.md b/plugins/spark/README.md index feb5425..5c484e1 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -80,12 +80,18 @@ A typical Spark job includes a SQL query and optional Spark properties: "command_criteria": ["type:sparksql"], "cluster_criteria": ["data_prod"], "context": { - // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and "entry_point". + // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and submit_parameters."entry_point". "query": "SELECT * FROM my_table WHERE dt='2023-01-01'", "arguments": ["SELECT 1", "s3:///"], - "entry_point": "com.your.company.ClassName", - "properties": { - "spark.sql.shuffle.partitions": "10" + "submit_parameters": { + // All values in "properties" are passed as `--conf` Spark submit parameters. Defaults from the command or cluster properties are merged in. + "properties": { + "spark.executor.memory": "4g", + "spark.executor.cores": "2" + }, + // The value of this property will be passed as the `--class` argument in Spark submit parameters, + // specifying the main class to execute in your Spark application. + "entry_point": "com.your.company.ClassName" }, "return_result": true } From 9e3cf1e0f0ccd64becf946b865d9e63e250a559d Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Tue, 22 Jul 2025 10:21:48 -0600 Subject: [PATCH 7/7] Support args --- internal/pkg/object/command/spark/spark.go | 30 ++++++++++++---------- plugins/spark/README.md | 4 +-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index a5df24d..9d0f382 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -42,10 +42,10 @@ type sparkCommandContext struct { // sparkJobContext represents the context for a spark job type sparkJobContext struct { - Query string `yaml:"query,omitempty" json:"query,omitempty"` - Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` - SubmitParameters sparkSubmitParameters `yaml:"submit_parameters,omitempty" json:"submit_parameters,omitempty"` - ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` + Query string `yaml:"query,omitempty" json:"query,omitempty"` + Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` + Parameters *sparkSubmitParameters `yaml:"parameters,omitempty" json:"parameters,omitempty"` + ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` } // sparkClusterContext represents the context for a spark cluster @@ -110,20 +110,24 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. } } + if jobContext.Parameters == nil { + jobContext.Parameters = &sparkSubmitParameters{} + } + // let's prepare job properties - if jobContext.SubmitParameters.Properties == nil { - jobContext.SubmitParameters.Properties = make(map[string]string) + if jobContext.Parameters.Properties == nil { + jobContext.Parameters.Properties = make(map[string]string) } for k, v := range s.Properties { - if _, found := jobContext.SubmitParameters.Properties[k]; !found { - jobContext.SubmitParameters.Properties[k] = v + if _, found := jobContext.Parameters.Properties[k]; !found { + jobContext.Parameters.Properties[k] = v } } // do we have driver memory setting in the job properties? - if value, found := jobContext.SubmitParameters.Properties[driverMemoryProperty]; found { + if value, found := jobContext.Parameters.Properties[driverMemoryProperty]; found { clusterContext.Properties[driverMemoryProperty] = value - delete(jobContext.SubmitParameters.Properties, driverMemoryProperty) + delete(jobContext.Parameters.Properties, driverMemoryProperty) } // setting AWS client @@ -306,14 +310,14 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error } func getSparkSubmitParameters(context *sparkJobContext) *string { - properties := context.SubmitParameters.Properties + properties := context.Parameters.Properties conf := make([]string, 0, len(properties)) for k, v := range properties { conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v)) } - if context.SubmitParameters.EntryPoint != "" { - conf = append(conf, fmt.Sprintf("--class %s", context.SubmitParameters.EntryPoint)) + if context.Parameters.EntryPoint != "" { + conf = append(conf, fmt.Sprintf("--class %s", context.Parameters.EntryPoint)) } return aws.String(strings.Join(conf, ` `)) diff --git a/plugins/spark/README.md b/plugins/spark/README.md index 5c484e1..5c0eb79 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -80,10 +80,10 @@ A typical Spark job includes a SQL query and optional Spark properties: "command_criteria": ["type:sparksql"], "cluster_criteria": ["data_prod"], "context": { - // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and submit_parameters."entry_point". + // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and parameters."entry_point". "query": "SELECT * FROM my_table WHERE dt='2023-01-01'", "arguments": ["SELECT 1", "s3:///"], - "submit_parameters": { + "parameters": { // All values in "properties" are passed as `--conf` Spark submit parameters. Defaults from the command or cluster properties are merged in. "properties": { "spark.executor.memory": "4g",