Skip to content

Commit

Permalink
better error handling on placement does not exist (#355)
Browse files Browse the repository at this point in the history
* better error handling on placement does not exist

* better error handling

* more test coverage

* handle table non exist error in job config handler
  • Loading branch information
Jian Shen authored Mar 6, 2020
1 parent 42e7331 commit 765a163
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 19 deletions.
35 changes: 17 additions & 18 deletions controller/handlers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,15 @@ func (h ConfigHandler) GetJob(w *utils.ResponseWriter, r *http.Request) {

table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name)
if err != nil {
w.WriteErrorWithCode(
http.StatusInternalServerError,
ErrFailedToFetchTableSchemaForJobConfig)
return
if !mutatorCom.IsNonExist(err) {
w.WriteErrorWithCode(
http.StatusInternalServerError,
ErrFailedToFetchTableSchemaForJobConfig)
return
}
} else {
job.AresTableConfig.Table = table
}
job.AresTableConfig.Table = table

numShards, err := h.getNumShards(req.Namespace)
if err != nil {
Expand Down Expand Up @@ -158,24 +161,20 @@ func (h ConfigHandler) GetJobs(w *utils.ResponseWriter, r *http.Request) {
return
}

numValidJobs := 0
for _, job := range jobs {
for idx, job := range jobs {
jobs[idx].NumShards = numShards
table, err := h.schemaMutator.GetTable(req.Namespace, job.AresTableConfig.Name)
if err != nil {
// ignore job with table not found
if mutatorCom.IsNonExist(err) {
continue
// only return system error when error is not NonExist
if !mutatorCom.IsNonExist(err) {
w.WriteErrorWithCode(http.StatusInternalServerError, ErrFailedToFetchTableSchemaForJobConfig)
return
}
err = ErrFailedToFetchTableSchemaForJobConfig
w.WriteErrorWithCode(http.StatusInternalServerError, err)
return
} else {
jobs[idx].AresTableConfig.Table = table
}
jobs[numValidJobs] = job
jobs[numValidJobs].AresTableConfig.Table = table
jobs[numValidJobs].NumShards = numShards
numValidJobs++
}
w.WriteObject(jobs[:numValidJobs])
w.WriteObject(jobs)
}

// DeleteJob swagger:route DELETE /config/{namespace}/jobs/{job} deleteJob
Expand Down
24 changes: 24 additions & 0 deletions controller/handlers/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (h *PlacementHandler) Get(rw *utils.ResponseWriter, r *http.Request) {
}
plm, err := h.placementMutator.GetCurrentPlacement(req.Namespace)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand All @@ -117,6 +121,10 @@ func (h *PlacementHandler) Add(rw *utils.ResponseWriter, r *http.Request) {
// validate all shards available in placement before adding instance
plm, err := h.placementMutator.AddInstance(req.Namespace, instances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand All @@ -141,6 +149,10 @@ func (h *PlacementHandler) Replace(rw *utils.ResponseWriter, r *http.Request) {
// validate all shards are available before replace instance
plm, err := h.placementMutator.ReplaceInstance(req.Namespace, req.Body.LeavingInstances, newInstances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand All @@ -164,6 +176,10 @@ func (h *PlacementHandler) Remove(rw *utils.ResponseWriter, r *http.Request) {
// validate all shards are available before replace instance
plm, err := h.placementMutator.RemoveInstance(req.Namespace, req.Body.LeavingInstances)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand All @@ -181,6 +197,10 @@ func (h *PlacementHandler) MarkNamespaceAvailable(rw *utils.ResponseWriter, r *h

plm, err := h.placementMutator.MarkNamespaceAvailable(req.Namespace)
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand All @@ -203,6 +223,10 @@ func (h *PlacementHandler) MarkInstanceAvailable(rw *utils.ResponseWriter, r *ht
p, err = h.placementMutator.MarkShardsAvailable(req.Namespace, req.Instance, req.Body.Shards)
}
if err != nil {
if mutatorCom.IsNonExist(err) {
rw.WriteErrorWithCode(http.StatusNotFound, err)
return
}
rw.WriteError(err)
return
}
Expand Down
2 changes: 2 additions & 0 deletions controller/mutators/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
ErrInstanceDoesNotExist = NotExist("Instance does not exist")
// ErrSubscriberDoesNotExist indicates an subscriber does not exist
ErrSubscriberDoesNotExist = NotExist("Subscriber does not exist")
// ErrPlacementDoesNotExist indicates that placement does not exist
ErrPlacementDoesNotExist = NotExist("placement does not exist")

// ErrMsgFailedToBuildInitialPlacement represents error message for initial placement failure
ErrMsgFailedToBuildInitialPlacement = "failed build initial placement"
Expand Down
22 changes: 22 additions & 0 deletions controller/mutators/etcd/placement_mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package etcd

import (
"fmt"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -77,6 +78,9 @@ func (p *placementMutator) GetCurrentPlacement(namespace string) (placement.Plac
}
plm, err := placementSvc.Placement()
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToGetCurrentPlacement)
}
return plm, nil
Expand All @@ -91,6 +95,9 @@ func (p *placementMutator) AddInstance(namespace string, instances []placement.I
}
plm, _, err := placementSvc.AddInstances(instances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToAddInstance)
}
return plm, nil
Expand All @@ -105,6 +112,9 @@ func (p *placementMutator) ReplaceInstance(namespace string, leavingInstances []
}
plm, _, err := placementSvc.ReplaceInstances(leavingInstances, newInstances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToReplaceInstance)
}
return plm, nil
Expand All @@ -119,6 +129,9 @@ func (p *placementMutator) RemoveInstance(namespace string, leavingInstances []s
}
plm, err := placementSvc.RemoveInstances(leavingInstances)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToRemoveInstance)
}
return plm, nil
Expand All @@ -132,6 +145,9 @@ func (p *placementMutator) MarkNamespaceAvailable(namespace string) (placement.P
}
plm, err := placementSvc.MarkAllShardsAvailable()
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
Expand All @@ -145,6 +161,9 @@ func (p *placementMutator) MarkInstanceAvailable(namespace string, instance stri
}
plm, err := placementSvc.MarkInstanceAvailable(instance)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
Expand All @@ -158,6 +177,9 @@ func (p *placementMutator) MarkShardsAvailable(namespace string, instance string
}
plm, err := placementSvc.MarkShardsAvailable(instance, shards...)
if err != nil {
if err == kv.ErrNotFound {
return nil, common.ErrPlacementDoesNotExist
}
return nil, utils.StackError(err, common.ErrMsgFailedToMarkAvailable)
}
return plm, nil
Expand Down
23 changes: 22 additions & 1 deletion controller/mutators/etcd/placement_mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"
"github.com/stretchr/testify/assert"
"github.com/uber/aresdb/cluster/kvstore"
"github.com/uber/aresdb/controller/mutators/common"
"testing"
)

Expand Down Expand Up @@ -101,8 +102,28 @@ func TestPlacementMutator(t *testing.T) {
initInstances[0], _ = placement.NewInstanceFromProto(&instancepb0)
initInstances[1], _ = placement.NewInstanceFromProto(&instancepb1)

// 0. all procedure should fail with non exist placement before initialization
plmt, err := placementMutator.GetCurrentPlacement(testNamespace)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())
assert.Nil(t, plmt)
plmt, err = placementMutator.MarkNamespaceAvailable(testNamespace)
assert.Nil(t, plmt)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())
plmt, err = placementMutator.ReplaceInstance(testNamespace, []string{}, []placement.Instance{})
assert.Nil(t, plmt)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())
plmt, err = placementMutator.MarkInstanceAvailable(testNamespace, "0")
assert.Nil(t, plmt)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())
plmt, err = placementMutator.AddInstance(testNamespace, []placement.Instance{})
assert.Nil(t, plmt)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())
plmt, err = placementMutator.RemoveInstance(testNamespace, []string{"0"})
assert.Nil(t, plmt)
assert.EqualError(t, err, common.ErrPlacementDoesNotExist.Error())

// 1. initialize placement
plmt, err := placementMutator.BuildInitialPlacement(testNamespace, 2, 2, initInstances)
plmt, err = placementMutator.BuildInitialPlacement(testNamespace, 2, 2, initInstances)
assert.NoError(t, err)
initPlacement, _ := plmt.Proto()

Expand Down

0 comments on commit 765a163

Please sign in to comment.