diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index db2c38b2..00605622 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -1341,6 +1341,18 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r createRequest.GetOptions()).Consistency(s.highConsLevel).Exec() if err != nil { + + if dlqUUID != nil { + + if e := s.DeleteDestinationUUID(nil, &m.DeleteDestinationUUIDRequest{UUID: dlqUUID}); e != nil { + s.log.WithFields(bark.Fields{ + common.TagDst: *dlqUUID, + common.TagCnsm: cgUUID, + common.TagErr: err, + }).Error(`CreateConsumerGroup - failed to cleanup DLQ destination`) + } + } + return nil, &shared.InternalServiceError{ Message: fmt.Sprintf("CreateConsumerGroup - insert into consumer_groups table failed, dst=%v, cg=%v, err=%v", createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err), @@ -1374,6 +1386,18 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r if err = s.session.Query(sqlDeleteCGByUUID, cgUUID).Exec(); err != nil { s.log.WithFields(bark.Fields{common.TagCnsm: common.FmtCnsm(cgUUID), common.TagErr: err}).Warn(`CreateConsumerGroup - failed to delete orphan record after a failed CAS attempt, ,`) } + + if dlqUUID != nil { + + if e := s.DeleteDestinationUUID(nil, &m.DeleteDestinationUUIDRequest{UUID: dlqUUID}); e != nil { + s.log.WithFields(bark.Fields{ + common.TagDst: *dlqUUID, + common.TagCnsm: cgUUID, + common.TagErr: err, + }).Error(`CreateConsumerGroup - failed to cleanup DLQ destination`) + } + } + return nil, &shared.EntityAlreadyExistsError{ Message: fmt.Sprintf("CreateConsumerGroup - Group exists, dst=%v cg=%v err=%v", createRequest.GetDestinationPath(), createRequest.GetConsumerGroupName(), err), }