Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ go.work.sum
# mac
.DS_Store

#idea
.idea/
# build dir
dist/
83 changes: 51 additions & 32 deletions internal/pkg/object/command/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/patterninc/heimdall/pkg/result"
)

type sparkSubmitParameters struct {
Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"`
EntryPoint string `yaml:"entry_point,omitempty" json:"entry_point,omitempty"`
}

// spark represents the Spark command context
type sparkCommandContext struct {
QueriesURI string `yaml:"queries_uri,omitempty" json:"queries_uri,omitempty"`
Expand All @@ -37,9 +42,10 @@ type sparkCommandContext struct {

// sparkJobContext represents the context for a spark job
type sparkJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
Properties map[string]string `yaml:"properties,omitempty" json:"properties,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
Query string `yaml:"query,omitempty" json:"query,omitempty"`
Arguments []string `yaml:"arguments,omitempty" json:"arguments,omitempty"`
Parameters *sparkSubmitParameters `yaml:"parameters,omitempty" json:"parameters,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
}

// sparkClusterContext represents the context for a spark cluster
Expand Down Expand Up @@ -104,20 +110,24 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.
}
}

if jobContext.Parameters == nil {
jobContext.Parameters = &sparkSubmitParameters{}
}

// let's prepare job properties
if jobContext.Properties == nil {
jobContext.Properties = make(map[string]string)
if jobContext.Parameters.Properties == nil {
jobContext.Parameters.Properties = make(map[string]string)
}
for k, v := range s.Properties {
if _, found := jobContext.Properties[k]; !found {
jobContext.Properties[k] = v
if _, found := jobContext.Parameters.Properties[k]; !found {
jobContext.Parameters.Properties[k] = v
}
}

// do we have driver memory setting in the job properties?
if value, found := jobContext.Properties[driverMemoryProperty]; found {
if value, found := jobContext.Parameters.Properties[driverMemoryProperty]; found {
clusterContext.Properties[driverMemoryProperty] = value
delete(jobContext.Properties, driverMemoryProperty)
delete(jobContext.Parameters.Properties, driverMemoryProperty)
}

// setting AWS client
Expand Down Expand Up @@ -171,26 +181,7 @@ func (s *sparkCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.

// let's set job driver
jobDriver := &types.JobDriver{}
jobParameters := getSparkSqlParameters(jobContext.Properties)
if jobContext.ReturnResult {

jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: []string{
queryURI,
resultURI,
},
SparkSubmitParameters: jobParameters,
}

} else {

jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{
EntryPoint: &queryURI,
SparkSqlParameters: jobParameters,
}

}
s.setJobDriver(jobContext, jobDriver, queryURI, resultURI)

// let's prepare job payload
jobPayload := &emrcontainers.StartJobRunInput{
Expand Down Expand Up @@ -271,6 +262,32 @@ timeoutLoop:

}

func (s *sparkCommandContext) setJobDriver(jobContext *sparkJobContext, jobDriver *types.JobDriver, queryURI string, resultURI string) {
jobParameters := getSparkSubmitParameters(jobContext)
if jobContext.Arguments != nil {
jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: jobContext.Arguments,
SparkSubmitParameters: jobParameters,
}
return
}
if jobContext.ReturnResult {
jobDriver.SparkSubmitJobDriver = &types.SparkSubmitJobDriver{
EntryPoint: s.WrapperURI,
EntryPointArguments: []string{queryURI, resultURI},
SparkSubmitParameters: jobParameters,
}
return
}

jobDriver.SparkSqlJobDriver = &types.SparkSqlJobDriver{
EntryPoint: &queryURI,
SparkSqlParameters: jobParameters,
}

}

func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error) {

// let's get the cluster ID
Expand All @@ -292,14 +309,16 @@ func getClusterID(svc *emrcontainers.Client, clusterName string) (*string, error

}

func getSparkSqlParameters(properties map[string]string) *string {

func getSparkSubmitParameters(context *sparkJobContext) *string {
properties := context.Parameters.Properties
conf := make([]string, 0, len(properties))

for k, v := range properties {
conf = append(conf, fmt.Sprintf("--conf %s=%s", k, v))
}

if context.Parameters.EntryPoint != "" {
conf = append(conf, fmt.Sprintf("--class %s", context.Parameters.EntryPoint))
}
return aws.String(strings.Join(conf, ` `))

}
Expand Down
17 changes: 15 additions & 2 deletions plugins/spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ A Spark command requires a SQL `query` in its job context and optionally job-spe
queries_uri: s3://bucket/spark/queries
results_uri: s3://bucket/spark/results
logs_uri: s3://bucket/spark/logs
<!--
- For SQL Python usage, provide the path to the `.py` file (e.g., s3://bucket/contrib/spark/spark-sql-s3-wrapper.py).
- For Spark Applications(JAR) usage, set this to the path of the JAR file.
-->
wrapper_uri: s3://bucket/contrib/spark/spark-sql-s3-wrapper.py
properties:
spark.executor.instances: "1"
Expand Down Expand Up @@ -76,9 +80,18 @@ A typical Spark job includes a SQL query and optional Spark properties:
"command_criteria": ["type:sparksql"],
"cluster_criteria": ["data_prod"],
"context": {
// For SQL jobs, specify the "query" field. For Spark jobs that execute a custom JAR, use "arguments" and parameters."entry_point".
"query": "SELECT * FROM my_table WHERE dt='2023-01-01'",
"properties": {
"spark.sql.shuffle.partitions": "10"
"arguments": ["SELECT 1", "s3:///"],
"parameters": {
// All values in "properties" are passed as `--conf` Spark submit parameters. Defaults from the command or cluster properties are merged in.
"properties": {
"spark.executor.memory": "4g",
"spark.executor.cores": "2"
},
// The value of this property will be passed as the `--class` argument in Spark submit parameters,
// specifying the main class to execute in your Spark application.
"entry_point": "com.your.company.ClassName"
},
"return_result": true
}
Expand Down