Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "8ddad14e124639b9921e03c92775629df7cb7d80"
PROTON_COMMIT := "6c23cfa641ff762cb1082abc2a6edb89eccde42c"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
12 changes: 12 additions & 0 deletions client/jsonschema/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@
},
"airflow": {
"$ref": "#/$defs/JobSpecMetadataAirflow"
},
"kubernetes": {
"$ref": "#/$defs/JobSpecMetadataKubernetes"
}
},
"additionalProperties": false,
"type": "object"
},
"JobSpecMetadataKubernetes": {
"properties": {
"service_account": {
"type": "string"
}
},
"additionalProperties": false,
Expand Down
35 changes: 29 additions & 6 deletions client/local/model/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ type JobSpecDependencyHTTP struct {
}

type JobSpecMetadata struct {
Resource *JobSpecMetadataResource `yaml:"resource,omitempty"`
Airflow *JobSpecMetadataAirflow `yaml:"airflow,omitempty"`
Resource *JobSpecMetadataResource `yaml:"resource,omitempty"`
Airflow *JobSpecMetadataAirflow `yaml:"airflow,omitempty"`
Kubernetes *JobSpecMetadataKubernetes `yaml:"kubernetes,omitempty"`
}

type JobSpecMetadataResource struct {
Expand All @@ -198,6 +199,10 @@ type JobSpecMetadataAirflow struct {
Queue string `yaml:"queue" json:"queue"`
}

type JobSpecMetadataKubernetes struct {
ServiceAccount string `yaml:"service_account,omitempty"`
}

func (j *JobSpec) ToProto() *pb.JobSpecification {
taskConfig := j.getProtoJobConfigItems()
js := &pb.JobSpecification{
Expand Down Expand Up @@ -263,9 +268,16 @@ func (j *JobSpec) getProtoJobMetadata() *pb.JobMetadata {
Queue: j.Metadata.Airflow.Queue,
}
}
var kubernetes *pb.JobSpecMetadataKubernetes
if j.Metadata.Kubernetes != nil {
kubernetes = &pb.JobSpecMetadataKubernetes{
ServiceAccount: j.Metadata.Kubernetes.ServiceAccount,
}
}
return &pb.JobMetadata{
Resource: resource,
Airflow: airflow,
Resource: resource,
Airflow: airflow,
Kubernetes: kubernetes,
}
}

Expand Down Expand Up @@ -635,9 +647,20 @@ func toJobSpecMetadata(protoMetadata *pb.JobMetadata) *JobSpecMetadata {
Queue: protoMetadata.Airflow.Queue,
}
}

var metadataKubernetesSpec *JobSpecMetadataKubernetes
if protoMetadata.Kubernetes != nil {
if protoMetadata.Kubernetes.ServiceAccount != "" {
metadataKubernetesSpec = &JobSpecMetadataKubernetes{
ServiceAccount: protoMetadata.Kubernetes.ServiceAccount,
}
}
}

metadataSpec = &JobSpecMetadata{
Resource: metadataResourceSpec,
Airflow: metadataAirflowSpec,
Resource: metadataResourceSpec,
Airflow: metadataAirflowSpec,
Kubernetes: metadataKubernetesSpec,
}
}
return metadataSpec
Expand Down
6 changes: 6 additions & 0 deletions client/local/model/job_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func (*JobSpecTestSuite) getCompleteJobSpec() model.JobSpec {
Pool: "poolA",
Queue: "queueA",
},
Kubernetes: &model.JobSpecMetadataKubernetes{
ServiceAccount: "serviceAccountA",
},
},
}
}
Expand Down Expand Up @@ -273,6 +276,9 @@ func (*JobSpecTestSuite) getCompleteJobSpecProto() *pb.JobSpecification {
Pool: "poolA",
Queue: "queueA",
},
Kubernetes: &pb.JobSpecMetadataKubernetes{
ServiceAccount: "serviceAccountA",
},
},
}
}
Expand Down
15 changes: 13 additions & 2 deletions core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ func toMetadata(jobMetadata *pb.JobMetadata) (*job.Metadata, error) {
}
metadataBuilder = metadataBuilder.WithScheduler(schedulerMetadata)
}
if jobMetadata.Kubernetes != nil {
metadataKubernetes := job.NewKubernetesMetadata(jobMetadata.Kubernetes.ServiceAccount)
metadataBuilder = metadataBuilder.WithKubernetes(metadataKubernetes)
}
metadata, err := metadataBuilder.Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -543,9 +547,16 @@ func fromMetadata(metadata *job.Metadata) *pb.JobMetadata {
metadataSchedulerProto.Queue = metadata.Scheduler()["queue"]
}
}

metadataKubernetesProto := &pb.JobSpecMetadataKubernetes{}
if metadata.Kubernetes() != nil {
metadataKubernetesProto.ServiceAccount = metadata.Kubernetes().ServiceAccount()
}

return &pb.JobMetadata{
Resource: metadataResourceProto,
Airflow: metadataSchedulerProto,
Resource: metadataResourceProto,
Airflow: metadataSchedulerProto,
Kubernetes: metadataKubernetesProto,
}
}

Expand Down
5 changes: 4 additions & 1 deletion core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,18 @@ func TestNewJobHandler(t *testing.T) {
Request: &pb.JobSpecMetadataResourceConfig{Cpu: "1", Memory: "8"},
Limit: &pb.JobSpecMetadataResourceConfig{Cpu: ".5", Memory: "4"},
},
Airflow: &pb.JobSpecMetadataAirflow{Pool: "100", Queue: "50"},
Airflow: &pb.JobSpecMetadataAirflow{Pool: "100", Queue: "50"},
Kubernetes: &pb.JobSpecMetadataKubernetes{ServiceAccount: "sample-service-account"},
}

resourceRequestConfig := job.NewMetadataResourceConfig("1", "8")
resourceLimitConfig := job.NewMetadataResourceConfig(".5", "4")
resourceMetadata := job.NewResourceMetadata(resourceRequestConfig, resourceLimitConfig)
kubernetesMetadata := job.NewKubernetesMetadata("sample-service-account")
metadataSpec, _ := job.NewMetadataBuilder().
WithResource(resourceMetadata).
WithScheduler(map[string]string{"pool": "100", "queue": "50"}).
WithKubernetes(kubernetesMetadata).
Build()

log := log.NewNoop()
Expand Down
26 changes: 24 additions & 2 deletions core/job/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,22 @@ func NewResourceMetadata(request, limit *MetadataResourceConfig) *MetadataResour
return &MetadataResource{request: request, limit: limit}
}

type MetadataKubernetes struct {
serviceAccount string
}

func (m MetadataKubernetes) ServiceAccount() string {
return m.serviceAccount
}

func NewKubernetesMetadata(serviceAccount string) *MetadataKubernetes {
return &MetadataKubernetes{serviceAccount: serviceAccount}
}

type Metadata struct {
resource *MetadataResource
scheduler map[string]string
resource *MetadataResource
scheduler map[string]string
kubernetes *MetadataKubernetes
}

func (m Metadata) Resource() *MetadataResource {
Expand All @@ -521,6 +534,10 @@ func (m Metadata) Scheduler() map[string]string {
return m.scheduler
}

func (m Metadata) Kubernetes() *MetadataKubernetes {
return m.kubernetes
}

func (m Metadata) validate() error {
return validateMap(m.scheduler)
}
Expand Down Expand Up @@ -552,6 +569,11 @@ func (m *MetadataBuilder) WithScheduler(scheduler map[string]string) *MetadataBu
return m
}

func (m *MetadataBuilder) WithKubernetes(kubernetes *MetadataKubernetes) *MetadataBuilder {
m.metadata.kubernetes = kubernetes
return m
}

type Hook struct {
name string
version string
Expand Down
9 changes: 7 additions & 2 deletions core/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,9 @@ type Webhook struct {
}

type RuntimeConfig struct {
Resource *Resource
Scheduler map[string]string
Resource *Resource
Scheduler map[string]string
Kubernetes *Kubernetes
}

type Resource struct {
Expand All @@ -415,6 +416,10 @@ type ResourceConfig struct {
Memory string
}

type Kubernetes struct {
ServiceAccount string
}

type Upstreams struct {
HTTP []*HTTPUpstreams
UpstreamJobs []*JobUpstream
Expand Down
3 changes: 3 additions & 0 deletions ext/scheduler/airflow/dag/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func setupJobDetails(tnnt tenant.Tenant) *scheduler.JobWithDetails {
Memory: "2G",
},
},
Kubernetes: &scheduler.Kubernetes{
ServiceAccount: "sample-service-account",
},
}

tnnt1, _ := tenant.NewTenant("project", "namespace")
Expand Down
3 changes: 3 additions & 0 deletions ext/scheduler/airflow/dag/expected_dag.2.1.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -176,6 +177,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -211,6 +213,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down
3 changes: 3 additions & 0 deletions ext/scheduler/airflow/dag/expected_dag.2.4.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -176,6 +177,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -211,6 +213,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down
3 changes: 3 additions & 0 deletions ext/scheduler/airflow/dag/expected_dag.2.6.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -176,6 +177,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -211,6 +213,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down
3 changes: 3 additions & 0 deletions ext/scheduler/airflow/dag/expected_dag.2.9.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -178,6 +179,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down Expand Up @@ -213,6 +215,7 @@ def get_entrypoint_cmd(plugin_entrypoint_script):
do_xcom_push=False,
env_vars=executor_env_vars,
container_resources=resources,
service_account_name="sample-service-account",
reattach_on_restart=True,
volume_mounts=asset_volume_mounts,
volumes=[volume],
Expand Down
21 changes: 19 additions & 2 deletions ext/scheduler/airflow/dag/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ func PrepareHooksForJob(job *scheduler.Job, pluginRepo PluginRepo) (Hooks, error
}

type RuntimeConfig struct {
Resource *Resource
Airflow AirflowConfig
Resource *Resource
Airflow AirflowConfig
Kubernetes *Kubernetes
}

func SetupRuntimeConfig(jobDetails *scheduler.JobWithDetails) RuntimeConfig {
Expand All @@ -116,6 +117,9 @@ func SetupRuntimeConfig(jobDetails *scheduler.JobWithDetails) RuntimeConfig {
if resource := ToResource(jobDetails.RuntimeConfig.Resource); resource != nil {
runtimeConf.Resource = resource
}
if kubernetes := ToKubernetes(jobDetails.RuntimeConfig.Kubernetes); kubernetes != nil {
runtimeConf.Kubernetes = kubernetes
}
return runtimeConf
}

Expand Down Expand Up @@ -177,6 +181,19 @@ func ToAirflowConfig(schedulerConf map[string]string) AirflowConfig {
return conf
}

type Kubernetes struct {
ServiceAccount string
}

func ToKubernetes(config *scheduler.Kubernetes) *Kubernetes {
if config == nil {
return nil
}
return &Kubernetes{
ServiceAccount: config.ServiceAccount,
}
}

func SLAMissDuration(job *scheduler.JobWithDetails) (int64, error) {
var slaMissDurationInSec int64
for _, notify := range job.Alerts { // We are ranging and picking one value
Expand Down
Loading