Skip to content

Commit

Permalink
Fix downstream bug
Browse files Browse the repository at this point in the history
after deleting dataset, the event of deleting job can be
sent to LC.

Signed-off-by: JimmyYang20 <[email protected]>
  • Loading branch information
JimmyYang20 committed Dec 15, 2021
1 parent 1edba9a commit 3fc0a15
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
22 changes: 14 additions & 8 deletions pkg/globalmanager/controllers/incrementallearning/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
// more details at https://github.com/kubernetes/kubernetes/issues/3030
job.Kind = KindName

jobConditions := job.Status.Conditions
if len(jobConditions) == 0 {
return nil
}

dataName := job.Spec.Dataset.Name
// LC has dataset object on this node that may call dataset node
var dsNodeName string
ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("dataset(%s/%s) not found", job.Namespace, dataName)
klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err)
} else {
dsNodeName = ds.Spec.NodeName
}
// LC has dataset object on this node that may call dataset node
dsNodeName := ds.Spec.NodeName

var trainNodeName string
var evalNodeName string
Expand Down Expand Up @@ -102,6 +99,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
return nil
}

if dsNodeName == "" {
return nil
}

jobConditions := job.Status.Conditions
if len(jobConditions) == 0 {
return nil
}

latestCondition := jobConditions[len(jobConditions)-1]
currentType := latestCondition.Type
jobStage := latestCondition.Stage
Expand Down
26 changes: 15 additions & 11 deletions pkg/globalmanager/controllers/lifelonglearning/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package lifelonglearning

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -40,18 +38,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
// more details at https://github.com/kubernetes/kubernetes/issues/3030
job.Kind = KindName

jobConditions := job.Status.Conditions
if len(jobConditions) == 0 {
return nil
}

dataName := job.Spec.Dataset.Name
// LC has dataset object on this node that may call dataset node
var dsNodeName string
ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("dataset(%s/%s) not found", job.Namespace, dataName)
klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err)
} else {
dsNodeName = ds.Spec.NodeName
}
// LC has dataset object on this node that may call dataset node
dsNodeName := ds.Spec.NodeName

var trainNodeName string
var evalNodeName string
Expand All @@ -78,13 +73,22 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
return nil
}

if dsNodeName == "" {
return nil
}

jobConditions := job.Status.Conditions
if len(jobConditions) == 0 {
return nil
}

latestCondition := jobConditions[len(jobConditions)-1]
currentType := latestCondition.Type
jobStage := latestCondition.Stage

syncJobWithNodeName := func(nodeName string) {
if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil {
klog.Warningf("Error to sync incremental learning job %s to node %s in stage %s: %v",
klog.Warningf("Error to sync lifelong learning job %s to node %s in stage %s: %v",
job.Name, nodeName, jobStage, err)
}
}
Expand Down

0 comments on commit 3fc0a15

Please sign in to comment.