Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ go.work.sum
# mac
.DS_Store

#idea
.idea/
# build dir
dist/
79 changes: 47 additions & 32 deletions internal/pkg/object/command/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/patterninc/heimdall/pkg/result"
)

type sparkSubmitParameters struct {
Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"`
EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"`
}

// spark represents the Spark command context
type sparkCommandContext struct {
QueriesURI string `yaml:"queries_uri,omitempty" json:"queries_uri,omitempty"`
Expand All @@ -37,9 +42,10 @@ type sparkCommandContext struct {

// sparkJobContext represents the context for a spark job
type sparkJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
Query string `yaml:"query,omitempty" json:"query,omitempty"`
Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"`
SubmitParameters sparkSubmitParameters `yaml:"submit_parameters,omitempty" json:"submit_parameters,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
}

// sparkClusterContext represents the context for a spark cluster
Expand Down Expand Up @@ -105,19 +111,19 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.
}

// let's prepare job properties
if jobContext.Properties == nil {
jobContext.Properties = make(map[string]string)
if jobContext.SubmitParameters.Properties == nil {
jobContext.SubmitParameters.Properties = make(map[string]string)
}
for k, v := range s.Properties {
if _, found := jobContext.Properties[k]; !found {
jobContext.Properties[k] = v
if _, found := jobContext.SubmitParameters.Properties[k]; !found {
jobContext.SubmitParameters.Properties[k] = v
}
}

// do we have driver memory setting in the job properties?
if value, found := jobContext.Properties[driverMemoryProperty]; found {
if value, found := jobContext.SubmitParameters.Properties[driverMemoryProperty]; found {
clusterContext.Properties[driverMemoryProperty] = value
delete(jobContext.Properties, driverMemoryProperty)
delete(jobContext.SubmitParameters.Properties, driverMemoryProperty)
}

// setting AWS client
Expand Down Expand Up @@ -171,26 +177,7 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.

// let's set job driver
jobDriver := &types.JobDriver{}
jobParameters := getSparkSqlParameters(jobContext.Properties)
if jobContext.ReturnResult {

jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: []string{
queryURI,
resultURI,
},
SparkSubmitParameters: jobParameters,
}

} else {

jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{
EntryPoint: &queryURI,
SparkSqlParameters: jobParameters,
}

}
s.setJobDriver(jobContext, jobDriver, queryURI, resultURI)

// let's prepare job payload
jobPayload := &emrcontainers.StartJobRunInput{
Expand Down Expand Up @@ -271,6 +258,32 @@ timeoutLoop:

}

func (s *sparkCommandContext) setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, queryURI string, resultURI string) {
jobParameters := getSparkSubmitParameters(jobContext)
if jobContext.Arguments != nil {
jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: jobContext.Arguments,
SparkSubmitParameters: jobParameters,
}
return
}
if jobContext.ReturnResult {
jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: []string{queryURI, resultURI},
SparkSubmitParameters: jobParameters,
}
return
}

jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{
EntryPoint: &queryURI,
SparkSqlParameters: jobParameters,
}

}

func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error) {

// let's get the cluster ID
Expand All @@ -292,14 +305,16 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error

}

func getSparkSqlParameters(properties map[string]string) *string {

func getSparkSubmitParameters(context *sparkJobContext) *string {
properties := context.SubmitParameters.Properties
conf := make([]string, 0, len(properties))

for k, v := range properties {
conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v))
}

if context.SubmitParameters.EntryPoint != "" {
conf = append(conf, fmt.Sprintf("--class %s", context.SubmitParameters.EntryPoint))
}
return aws.String(strings.Join(conf, ` `))

}
Expand Down
17 changes: 15 additions & 2 deletions plugins/spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ A Spark command requires a SQL `query` in its job context and optionally job-spe
queries_uri: s3://bucket/spark/queries
results_uri: s3://bucket/spark/results
logs_uri: s3://bucket/spark/logs
<!--
- For SQL Python usage, provide the path to the `.py` file (e.g., s3://bucket/contrib/spark/spark-sql-s3-wrapper.py).
- For Spark Applications(JAR) usage, set this to the path of the JAR file.
-->
wrapper_uri: s3://bucket/contrib/spark/spark-sql-s3-wrapper.py
properties:
spark.executor.instances: "1"
Expand Down Expand Up @@ -76,9 +80,18 @@ A typical Spark job includes a SQL query and optional Spark properties:
"command_criteria": ["type:sparksql"],
"cluster_criteria": ["data_prod"],
"context": {
// For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and submit_parameters."entry_point".
"query": "SELECT * FROM my_table WHERE dt='2023-01-01'",
"properties": {
"spark.sql.shuffle.partitions": "10"
"arguments": ["SELECT 1", "s3:///"],
"submit_parameters": {
// All values in "properties" are passed as `--conf` Spark submit parameters. Defaults from the command or cluster properties are merged in.
"properties": {
"spark.executor.memory": "4g",
"spark.executor.cores": "2"
},
// The value of this property will be passed as the `--class` argument in Spark submit parameters,
// specifying the main class to execute in your Spark application.
"entry_point": "com.your.company.ClassName"
},
"return_result": true
}
Expand Down