Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Cleanup DLQ on errors when creating consumer-group (#318)
Browse files Browse the repository at this point in the history
* Cleanup DLQ destination when create cg fails

* t

* revert whitespace additions
  • Loading branch information
Kiran RG authored Nov 3, 2017
1 parent 81b1e01 commit 774d3e8
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,18 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetOptions()).Consistency(s.highConsLevel).Exec()

if err != nil {

if dlqUUID != nil {

if e := s.DeleteDestinationUUID(nil, &m.DeleteDestinationUUIDRequest{UUID: dlqUUID}); e != nil {
s.log.WithFields(bark.Fields{
common.TagDst: *dlqUUID,
common.TagCnsm: cgUUID,
common.TagErr: err,
}).Error(`CreateConsumerGroup - failed to cleanup DLQ destination`)
}
}

return nil, &shared.InternalServiceError{
Message: fmt.Sprintf("CreateConsumerGroup - insert into consumer_groups table failed, dst=%v, cg=%v, err=%v",
createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err),
Expand Down Expand Up @@ -1374,6 +1386,18 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
if err = s.session.Query(sqlDeleteCGByUUID, cgUUID).Exec(); err != nil {
s.log.WithFields(bark.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Warn(`CreateConsumerGroup - failed to delete orphan record after a failed CAS attempt, ,`)
}

if dlqUUID != nil {

if e := s.DeleteDestinationUUID(nil, &m.DeleteDestinationUUIDRequest{UUID: dlqUUID}); e != nil {
s.log.WithFields(bark.Fields{
common.TagDst: *dlqUUID,
common.TagCnsm: cgUUID,
common.TagErr: err,
}).Error(`CreateConsumerGroup - failed to cleanup DLQ destination`)
}
}

return nil, &shared.EntityAlreadyExistsError{
Message: fmt.Sprintf("CreateConsumerGroup - Group exists, dst=%v cg=%v err=%v", createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err),
}
Expand Down

0 comments on commit 774d3e8

Please sign in to comment.