diff --git a/.gitignore b/.gitignore index 97bc153..1744184 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,7 @@ go.work.sum # mac .DS_Store +#idea +.idea/ # build dir dist/ diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 948b3d9..9d0f382 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -26,6 +26,11 @@ import ( "github.com/patterninc/heimdall/pkg/result" ) +type sparkSubmitParameters struct { + Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` + EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"` +} + // spark represents the Spark command context type sparkCommandContext struct { QueriesURI string `yaml:"queries_uri,omitempty" json:"queries_uri,omitempty"` @@ -37,9 +42,10 @@ type sparkCommandContext struct { // sparkJobContext represents the context for a spark job type sparkJobContext struct { - Query string `yaml:"query,omitempty" json:"query,omitempty"` - Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"` - ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` + Query string `yaml:"query,omitempty" json:"query,omitempty"` + Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"` + Parameters *sparkSubmitParameters `yaml:"parameters,omitempty" json:"parameters,omitempty"` + ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"` } // sparkClusterContext represents the context for a spark cluster @@ -104,20 +110,24 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. } } + if jobContext.Parameters == nil { + jobContext.Parameters = &sparkSubmitParameters{} + } + // let's prepare job properties - if jobContext.Properties == nil { - jobContext.Properties = make(map[string]string) + if jobContext.Parameters.Properties == nil { + jobContext.Parameters.Properties = make(map[string]string) } for k, v := range s.Properties { - if _, found := jobContext.Properties[k]; !found { - jobContext.Properties[k] = v + if _, found := jobContext.Parameters.Properties[k]; !found { + jobContext.Parameters.Properties[k] = v } } // do we have driver memory setting in the job properties? - if value, found := jobContext.Properties[driverMemoryProperty]; found { + if value, found := jobContext.Parameters.Properties[driverMemoryProperty]; found { clusterContext.Properties[driverMemoryProperty] = value - delete(jobContext.Properties, driverMemoryProperty) + delete(jobContext.Parameters.Properties, driverMemoryProperty) } // setting AWS client @@ -171,26 +181,7 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster. // let's set job driver jobDriver := &types.JobDriver{} - jobParameters := getSparkSqlParameters(jobContext.Properties) - if jobContext.ReturnResult { - - jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ - EntryPoint: s.WrapperURI, - EntryPointArguments: []string{ - queryURI, - resultURI, - }, - SparkSubmitParameters: jobParameters, - } - - } else { - - jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{ - EntryPoint: &queryURI, - SparkSqlParameters: jobParameters, - } - - } + s.setJobDriver(jobContext, jobDriver, queryURI, resultURI) // let's prepare job payload jobPayload := &emrcontainers.StartJobRunInput{ @@ -271,6 +262,32 @@ timeoutLoop: } +func (s *sparkCommandContext) setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, queryURI string, resultURI string) { + jobParameters := getSparkSubmitParameters(jobContext) + if jobContext.Arguments != nil { + jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ + EntryPoint: s.WrapperURI, + EntryPointArguments: jobContext.Arguments, + SparkSubmitParameters: jobParameters, + } + return + } + if jobContext.ReturnResult { + jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{ + EntryPoint: s.WrapperURI, + EntryPointArguments: []string{queryURI, resultURI}, + SparkSubmitParameters: jobParameters, + } + return + } + + jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{ + EntryPoint: &queryURI, + SparkSqlParameters: jobParameters, + } + +} + func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error) { // let's get the cluster ID @@ -292,14 +309,16 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error } -func getSparkSqlParameters(properties map[string]string) *string { - +func getSparkSubmitParameters(context *sparkJobContext) *string { + properties := context.Parameters.Properties conf := make([]string, 0, len(properties)) for k, v := range properties { conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v)) } - + if context.Parameters.EntryPoint != "" { + conf = append(conf, fmt.Sprintf("--class %s", context.Parameters.EntryPoint)) + } return aws.String(strings.Join(conf, ` `)) } diff --git a/plugins/spark/README.md b/plugins/spark/README.md index 1078711..5c0eb79 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -26,6 +26,10 @@ A Spark command requires a SQL `query` in its job context and optionally job-spe queries_uri: s3://bucket/spark/queries results_uri: s3://bucket/spark/results logs_uri: s3://bucket/spark/logs + wrapper_uri: s3://bucket/contrib/spark/spark-sql-s3-wrapper.py properties: spark.executor.instances: "1" @@ -76,9 +80,18 @@ A typical Spark job includes a SQL query and optional Spark properties: "command_criteria": ["type:sparksql"], "cluster_criteria": ["data_prod"], "context": { + // For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and parameters."entry_point". "query": "SELECT * FROM my_table WHERE dt='2023-01-01'", - "properties": { - "spark.sql.shuffle.partitions": "10" + "arguments": ["SELECT 1", "s3:///"], + "parameters": { + // All values in "properties" are passed as `--conf` Spark submit parameters. Defaults from the command or cluster properties are merged in. + "properties": { + "spark.executor.memory": "4g", + "spark.executor.cores": "2" + }, + // The value of this property will be passed as the `--class` argument in Spark submit parameters, + // specifying the main class to execute in your Spark application. + "entry_point": "com.your.company.ClassName" }, "return_result": true }