Skip to content

Commit

Permalink
Sync from server repo (3cd882fc98a)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Nov 28, 2023
1 parent 6f31867 commit 4d7b04d
Show file tree
Hide file tree
Showing 18 changed files with 376 additions and 59 deletions.
3 changes: 2 additions & 1 deletion vclusterops/https_poll_node_state_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func makeHTTPSPollNodeStateOp(logger vlog.Printer, hosts []string,
}

func (op *httpsPollNodeStateOp) getPollingTimeout() int {
return op.timeout
// a negative value indicates no timeout and should never be used for this op
return util.Max(op.timeout, 0)
}

func (op *httpsPollNodeStateOp) setupClusterHTTPRequest(hosts []string) error {
Expand Down
3 changes: 2 additions & 1 deletion vclusterops/https_poll_subscription_state_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func makeHTTPSPollSubscriptionStateOp(logger vlog.Printer, hosts []string,
}

func (op *httpsPollSubscriptionStateOp) getPollingTimeout() int {
return op.timeout
// a negative value indicates no timeout and should never be used for this op
return util.Max(op.timeout, 0)
}

func (op *httpsPollSubscriptionStateOp) setupClusterHTTPRequest(hosts []string) error {
Expand Down
6 changes: 3 additions & 3 deletions vclusterops/https_re_ip_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type httpsReIPOp struct {
opBase
opHTTPSBase
hostToReIP []string
reIPList map[string]reIPInfo
reIPList map[string]ReIPInfo
nodeNamesList []string
upHosts []string
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (op *httpsReIPOp) setupClusterHTTPRequest(hosts []string) error {
}

func (op *httpsReIPOp) prepare(execContext *opEngineExecContext) error {
op.reIPList = make(map[string]reIPInfo)
op.reIPList = make(map[string]ReIPInfo)
// update reIPList from input node names and execContext.networkProfiles
for i := 0; i < len(op.nodeNamesList); i++ {
nodeNameToReIP := op.nodeNamesList[i]
Expand All @@ -86,7 +86,7 @@ func (op *httpsReIPOp) prepare(execContext *opEngineExecContext) error {
if !ok {
return fmt.Errorf("[%s] unable to find network profile for address %s", op.name, targetAddress)
}
info := reIPInfo{
info := ReIPInfo{
NodeName: nodeNameToReIP,
TargetAddress: targetAddress,
TargetControlAddress: profile.Address,
Expand Down
124 changes: 124 additions & 0 deletions vclusterops/nma_check_system_tables_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
(c) Copyright [2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/vlog"
)

type nmaCheckSystemTablesOp struct {
scrutinizeOpBase
}

type checkSystemTablesResponseData struct {
Name string `json:"name"`
}

func makeNMACheckSystemTablesOp(logger vlog.Printer,
id string,
hostNodeNameMap map[string]string) nmaCheckSystemTablesOp {
// base members
op := nmaCheckSystemTablesOp{}
op.name = "NMACheckSystemTablesOp"
op.logger = logger.WithName(op.name)

// scrutinize members
op.id = id
op.batch = scrutinizeBatchSystemTables
op.hostNodeNameMap = hostNodeNameMap
op.httpMethod = GetMethod
op.urlSuffix = "/vs-status"

return op
}

func (op *nmaCheckSystemTablesOp) getPollingTimeout() int {
return ScrutinizePollingTimeout
}

func (op *nmaCheckSystemTablesOp) prepare(execContext *opEngineExecContext) error {
// only run this task on a single up node
// if there are no up nodes, skip this task
if len(execContext.upHosts) == 0 {
op.logger.PrintWarning("no up hosts to collect system tables from, skipping the operation")
op.skipExecute = true
return nil
}
host := getInitiator(execContext.upHosts)

// construct host list for interface purposes
op.hosts = []string{host}
err := validateHostMaps(op.hosts, op.hostNodeNameMap)
if err != nil {
return err
}

// prepare GET request with no params or body
execContext.dispatcher.setup(op.hosts)
return op.setupClusterHTTPRequest(op.hosts)
}

func (op *nmaCheckSystemTablesOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *nmaCheckSystemTablesOp) finalize(_ *opEngineExecContext) error {
return nil
}

func (op *nmaCheckSystemTablesOp) processResult(execContext *opEngineExecContext) error {
// wait until initiator has finished gathering system tables
err := pollState(op, execContext)
if err != nil {
return fmt.Errorf("failed to stage system tables, %w", err)
}

// parse and log responses
op.logger.Info("system tables finished staging")
tableList := make([]checkSystemTablesResponseData, 0)
return processStagedItemsResult(&op.scrutinizeOpBase, tableList)
}

func (op *nmaCheckSystemTablesOp) shouldStopPolling() (bool, error) {
var allErrs error
isProcessing := false

// in practice, only one host
for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)
if result.isPassing() {
// passing but not success -> 202 accepted
if !result.isSuccess() {
isProcessing = true
}
} else {
allErrs = errors.Join(allErrs, result.err)
}
}

// any errors -> finished
// 200 -> finished (isProcessing == false)
// 202 -> continue (isProcessing == true)
stop := (allErrs != nil) || !isProcessing
return stop, allErrs
}
14 changes: 7 additions & 7 deletions vclusterops/nma_download_file_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,27 @@ type downloadFileRequestData struct {

// ClusterLeaseNotExpiredFailure is returned when an attempt is made to use a
// communal storage before the lease for it has expired.
type clusterLeaseNotExpiredError struct {
type ClusterLeaseNotExpiredError struct {
Expiration string
}

func (e *clusterLeaseNotExpiredError) Error() string {
func (e *ClusterLeaseNotExpiredError) Error() string {
return fmt.Sprintf("revive database cannot continue because the communal storage location might still be in use."+
" The cluster lease will expire at %s(UTC)."+
" Please ensure that the other cluster has stopped and try revive_db after the cluster lease expiration",
e.Expiration)
}

// reviveDBNodeCountMismatchError is returned when the number of nodes in new cluster
// ReviveDBNodeCountMismatchError is returned when the number of nodes in new cluster
// does not match the number of nodes in original cluster
type reviveDBNodeCountMismatchError struct {
type ReviveDBNodeCountMismatchError struct {
ReviveDBStep string
FailureHost string
NumOfNewNodes int
NumOfOldNodes int
}

func (e *reviveDBNodeCountMismatchError) Error() string {
func (e *ReviveDBNodeCountMismatchError) Error() string {
return fmt.Sprintf(`[%s] nodes mismatch found on host %s: the number of the new nodes in --hosts is %d,`+
` but the number of the old nodes in description file is %d`,
e.ReviveDBStep, e.FailureHost, e.NumOfNewNodes, e.NumOfOldNodes)
Expand Down Expand Up @@ -221,7 +221,7 @@ func (op *nmaDownloadFileOp) processResult(execContext *opEngineExecContext) err
}

if len(descFileContent.NodeList) != len(op.newNodes) {
err := &reviveDBNodeCountMismatchError{
err := &ReviveDBNodeCountMismatchError{
ReviveDBStep: op.name,
FailureHost: host,
NumOfNewNodes: len(op.newNodes),
Expand Down Expand Up @@ -302,7 +302,7 @@ func (op *nmaDownloadFileOp) clusterLeaseCheck(clusterLeaseExpiration string) er

// current time < expire time, it means that the cluster lease is not expired
if utcNow.Before(utcExpiration) {
return &clusterLeaseNotExpiredError{Expiration: clusterLeaseExpiration}
return &ClusterLeaseNotExpiredError{Expiration: clusterLeaseExpiration}
}

op.logger.PrintInfo("Cluster lease check has passed. We proceed to revive the database")
Expand Down
2 changes: 1 addition & 1 deletion vclusterops/nma_download_file_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestClusteLeaseExpiryError(t *testing.T) {
err := op.clusterLeaseCheck(fakeLeaseTime.Format(expirationStringLayout))
assert.Error(t, err)
// Ensure we get a specific error type back
clusterLeaseErr := &clusterLeaseNotExpiredError{}
clusterLeaseErr := &ClusterLeaseNotExpiredError{}
ok := errors.As(err, &clusterLeaseErr)
assert.True(t, ok)
assert.Contains(t, err.Error(), "The cluster lease will expire at")
Expand Down
25 changes: 25 additions & 0 deletions vclusterops/nma_get_scrutinize_tar_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type nmaGetScrutinizeTarOp struct {
scrutinizeOpBase
useInitiator bool
}

func makeNMAGetScrutinizeTarOp(logger vlog.Printer,
Expand Down Expand Up @@ -54,6 +55,12 @@ func makeNMAGetScrutinizeTarOp(logger vlog.Printer,
return op, err
}

// useSingleHost indicates that the tarball should only be retrieved from the first
// up node
func (op *nmaGetScrutinizeTarOp) useSingleHost() {
op.useInitiator = true
}

// createOutputDir creates a subdirectory {id} under /tmp/scrutinize/remote, which
// may also be created by this function. the "remote" subdirectory is created to
// separate local scrutinize data staged by the NMA (placed in /tmp/scrutinize/) from
Expand All @@ -75,6 +82,24 @@ func (op *nmaGetScrutinizeTarOp) createOutputDir() error {
}

func (op *nmaGetScrutinizeTarOp) prepare(execContext *opEngineExecContext) error {
// for the system table batch
if op.useInitiator {
if len(execContext.upHosts) == 0 {
op.logger.PrintWarning("no up hosts to collect system tables from, skipping the operation")
op.skipExecute = true
return nil
}
host := getInitiator(execContext.upHosts)
op.hosts = []string{host}

// the initiator host should have been in the original host list, and already
// validated, but let's not assume
err := validateHostMaps(op.hosts, op.hostNodeNameMap)
if err != nil {
return err
}
}

hostToFilePathsMap := map[string]string{}
for _, host := range op.hosts {
hostToFilePathsMap[host] = fmt.Sprintf("%s/%s/%s-%s.tgz",
Expand Down
12 changes: 6 additions & 6 deletions vclusterops/nma_re_ip_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

type nmaReIPOp struct {
opBase
reIPList []reIPInfo
reIPList []ReIPInfo
vdb *VCoordinationDatabase
primaryNodeCount uint
hostRequestBodyMap map[string]string
Expand All @@ -35,7 +35,7 @@ type nmaReIPOp struct {
}

func makeNMAReIPOp(logger vlog.Printer,
reIPList []reIPInfo,
reIPList []ReIPInfo,
vdb *VCoordinationDatabase,
trimReIPData bool) nmaReIPOp {
op := nmaReIPOp{}
Expand All @@ -47,7 +47,7 @@ func makeNMAReIPOp(logger vlog.Printer,
return op
}

type reIPInfo struct {
type ReIPInfo struct {
NodeName string `json:"node_name"`
NodeAddress string `json:"-"`
TargetAddress string `json:"address"`
Expand All @@ -57,7 +57,7 @@ type reIPInfo struct {

type reIPParams struct {
CatalogPath string `json:"catalog_path"`
ReIPInfoList []reIPInfo `json:"re_ip_list"`
ReIPInfoList []ReIPInfo `json:"re_ip_list"`
}

func (op *nmaReIPOp) updateRequestBody(_ *opEngineExecContext) error {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (op *nmaReIPOp) trimReIPList(execContext *opEngineExecContext) error {
nodeNamesWithLatestCatalog[vnode.Name] = struct{}{}
}

var trimmedReIPList []reIPInfo
var trimmedReIPList []ReIPInfo
nodesToTrim := make(map[string]string)
for _, reIPInfo := range op.reIPList {
if _, exist := nodeNamesWithLatestCatalog[reIPInfo.NodeName]; exist {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (op *nmaReIPOp) processResult(_ *opEngineExecContext) error {
op.logResponse(host, result)

if result.isPassing() {
var reIPResult []reIPInfo
var reIPResult []ReIPInfo
err := op.parseAndCheckResponse(host, result.content, &reIPResult)
if err != nil {
err = fmt.Errorf("[%s] fail to parse result on host %s, details: %w",
Expand Down
2 changes: 1 addition & 1 deletion vclusterops/nma_stage_dc_tables_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ func (op *nmaStageDCTablesOp) finalize(_ *opEngineExecContext) error {

func (op *nmaStageDCTablesOp) processResult(_ *opEngineExecContext) error {
fileList := make([]stageDCTablesResponseData, 0)
return processStagedFilesResult(&op.scrutinizeOpBase, fileList)
return processStagedItemsResult(&op.scrutinizeOpBase, fileList)
}
2 changes: 1 addition & 1 deletion vclusterops/nma_stage_error_report_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ func (op *nmaStageErrorReportOp) finalize(_ *opEngineExecContext) error {

func (op *nmaStageErrorReportOp) processResult(_ *opEngineExecContext) error {
fileList := make([]stageErrorReportResponseData, 0)
return processStagedFilesResult(&op.scrutinizeOpBase, fileList)
return processStagedItemsResult(&op.scrutinizeOpBase, fileList)
}
Loading

0 comments on commit 4d7b04d

Please sign in to comment.