Skip to content

Commit f32d70c

Browse files
authored
fix (job run-input) : create asset subdir (#702)
* fix (job run-input) : create asset subdir * fix : add JOB_NAME in executor env * fix : tests for base_dag
1 parent c0424ef commit f32d70c

File tree

3 files changed

+8
-0
lines changed

3 files changed

+8
-0
lines changed

client/cmd/job/run_input.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ func (j *jobRunInputCommand) writeJobAssetsToFiles(
191191
writeToFileFn := utils.WriteStringToFileIndexed()
192192
for fileName, fileContent := range jobResponse.Files {
193193
filePath := filepath.Join(dirPath, fileName)
194+
195+
assetSubDir := filepath.Dir(filePath)
196+
if err := os.MkdirAll(assetSubDir, fs.FileMode(permission)); err != nil {
197+
return fmt.Errorf("failed to create asset sub-directory at %s: %w", assetSubDir, err)
198+
}
199+
194200
if err := writeToFileFn(filePath, fileContent, j.logger.Writer()); err != nil {
195201
return fmt.Errorf("failed to write asset file at %s: %w", filePath, err)
196202
}

ext/scheduler/airflow2/resources/base_dag.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
executor_env_vars = [
131131
k8s.V1EnvVar(name="JOB_LABELS",value='{{.Job.GetLabelsAsString}}'),
132132
k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR),
133+
k8s.V1EnvVar(name="JOB_NAME",value='{{$.Job.Name}}'),
133134
]
134135

135136
init_env_vars = [

ext/scheduler/airflow2/resources/expected_compiled_template.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
executor_env_vars = [
9393
k8s.V1EnvVar(name="JOB_LABELS",value='orchestrator=optimus'),
9494
k8s.V1EnvVar(name="JOB_DIR",value=JOB_DIR),
95+
k8s.V1EnvVar(name="JOB_NAME",value='foo'),
9596
]
9697

9798
init_env_vars = [

0 commit comments

Comments
 (0)