Skip to content

Commit

Permalink
add local and remote mode for patroni attack
Browse files Browse the repository at this point in the history
Signed-off-by: Nikita Savchenko <[email protected]>
  • Loading branch information
Nikita Savchenko committed Feb 8, 2023
1 parent 537f8aa commit db9df01
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 124 deletions.
1 change: 1 addition & 0 deletions cmd/attack/attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewAttackCommand() *cobra.Command {
NewHTTPAttackCommand(&uid),
NewVMAttackCommand(&uid),
NewUserDefinedCommand(&uid),
NewPatroniAttackCommand(&uid),
)

return cmd
Expand Down
12 changes: 7 additions & 5 deletions cmd/attack/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,39 @@ func NewPatroniAttackCommand(uid *string) *cobra.Command {

cmd.PersistentFlags().StringVarP(&options.User, "user", "u", "patroni", "patroni cluster user")
cmd.PersistentFlags().StringVar(&options.Password, "password", "p", "patroni cluster password")
cmd.PersistentFlags().StringVarP(&options.Address, "address", "a", "", "patroni cluster address, any of available hosts")
cmd.PersistentFlags().BoolVarP(&options.LocalMode, "local-mode", "l", false, "execute patronictl on host with chaosd. User with privileges required.")
cmd.PersistentFlags().BoolVarP(&options.RemoteMode, "remote-mode", "r", false, "execute patroni command by REST API")

return cmd
}

func NewPatroniSwitchoverCommand(dep fx.Option, options *core.PatroniCommand) *cobra.Command {
cmd := &cobra.Command{
Use: "switchover",
Short: "exec switchover, default without another attack. Warning! Command is not recover!",
Short: "exec switchover command. Warning! Command is not recover!",
Run: func(*cobra.Command, []string) {
options.Action = core.SwitchoverAction
utils.FxNewAppWithoutLog(dep, fx.Invoke(PatroniAttackF)).Run()
},
}
cmd.Flags().StringVarP(&options.Address, "address", "a", "", "patroni cluster address, any of available hosts")
cmd.Flags().StringVarP(&options.Candidate, "candidate", "c", "", "switchover candidate, default random unit for replicas")
cmd.Flags().StringVarP(&options.Scheduled_at, "scheduled_at", "d", fmt.Sprintln(time.Now().Add(time.Second*60).Format(time.RFC3339)), "scheduled switchover, default now()+1 minute")
cmd.Flags().StringVarP(&options.Scheduled_at, "scheduled_at", "d", fmt.Sprint(time.Now().Add(time.Second*60).Format(time.RFC3339)), `scheduled switchover,
default now()+1 minute by remote mode`)

return cmd
}

func NewPatroniFailoverCommand(dep fx.Option, options *core.PatroniCommand) *cobra.Command {
cmd := &cobra.Command{
Use: "failover",
Short: "exec failover, default without another attack",
Short: "Exec failover command. Warning! Command is not recover!",
Run: func(*cobra.Command, []string) {
options.Action = core.FailoverAction
utils.FxNewAppWithoutLog(dep, fx.Invoke(PatroniAttackF)).Run()
},
}

cmd.Flags().StringVarP(&options.Address, "address", "a", "", "patroni cluster address, any of available hosts")
cmd.Flags().StringVarP(&options.Candidate, "leader", "c", "", "failover new leader, default random unit for replicas")
return cmd
}
Expand Down
1 change: 1 addition & 0 deletions pkg/core/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/jvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ type JVMStressSpec struct {
// only when SQL match the Database, Table and SQLType, chaosd will inject fault
// for example:
//
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
type JVMMySQLSpec struct {
// the version of mysql-connector-java, only support 5.X.X(set to 5) and 8.X.X(set to 8) now
MySQLConnectorVersion string `json:"mysql-connector-version,omitempty"`
Expand Down
24 changes: 17 additions & 7 deletions pkg/core/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,33 @@ type PatroniCommand struct {
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Scheduled_at string `json:"scheduled_at,omitempty"`
LocalMode bool `json:"local_mode,omitempty"`
RemoteMode bool `json:"remote_mode,omitempty"`
RecoverCmd string `json:"recoverCmd,omitempty"`
}

func (p *PatroniCommand) Validate() error {
if err := p.CommonAttackConfig.Validate(); err != nil {
return err
}
if len(p.Address) == 0 {
return errors.New("address not provided")
}

if len(p.User) == 0 {
return errors.New("patroni user not provided")
if !p.RemoteMode && !p.LocalMode {
return errors.New("local or remote mode required")
}

if len(p.Password) == 0 {
return errors.New("patroni password not provided")
if p.RemoteMode {

if len(p.Address) == 0 {
return errors.New("address not provided")
}

if len(p.User) == 0 {
return errors.New("patroni user not provided")
}

if len(p.Password) == 0 {
return errors.New("patroni password not provided")
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/server/chaosd/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package chaosd

import (
Expand Down
150 changes: 106 additions & 44 deletions pkg/server/chaosd/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
package chaosd

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os/exec"
"strings"

"github.com/chaos-mesh/chaosd/pkg/core"
"github.com/chaos-mesh/chaosd/pkg/server/utils"
"github.com/pingcap/errors"
"github.com/pingcap/log"

"github.com/chaos-mesh/chaosd/pkg/core"
"github.com/chaos-mesh/chaosd/pkg/server/utils"
)

type patroniAttack struct{}
Expand All @@ -34,17 +35,24 @@ var PatroniAttack AttackType = patroniAttack{}
func (patroniAttack) Attack(options core.AttackConfig, _ Environment) error {
attack := options.(*core.PatroniCommand)

candidate := attack.Candidate

leader := attack.Leader
var responce []byte

var scheduled_at string
var address string

var url string
var err error

values := make(map[string]string)

patroniInfo, err := utils.GetPatroniInfo(attack.Address)
if attack.RemoteMode {
address = attack.Address
} else if attack.LocalMode {
address, err = utils.GetLocalHostname()
if err != nil {
return errors.WithStack(err)
}
}

patroniInfo, err := utils.GetPatroniInfo(address)
if err != nil {
err = errors.Errorf("failed to get patroni info for %v: %v", options.String(), err)
return errors.WithStack(err)
Expand All @@ -55,75 +63,129 @@ func (patroniAttack) Attack(options core.AttackConfig, _ Environment) error {
return errors.WithStack(err)
}

if candidate == "" {
candidate = patroniInfo.Replicas[rand.Intn(len(patroniInfo.Replicas))]
sync_mode_check, err := isSynchronousClusterMode(address, attack.User, attack.Password)
if err != nil {
err = errors.Errorf("failed to check cluster synchronous mode for %v: %v", options.String(), err)
return errors.WithStack(err)
}

if attack.Candidate == "" {
if sync_mode_check {
values["candidate"] = patroniInfo.SyncStandby[rand.Intn(len(patroniInfo.SyncStandby))]
} else {
values["candidate"] = patroniInfo.Replicas[rand.Intn(len(patroniInfo.Replicas))]
}

}

if leader == "" {
leader = patroniInfo.Master
if attack.Leader == "" {
values["leader"] = patroniInfo.Master
}

switch options.String() {
case "switchover":
values["scheduled_at"] = attack.Scheduled_at

scheduled_at = attack.Scheduled_at
cmd := options.String()

values = map[string]string{"leader": leader, "scheduled_at": scheduled_at}
switch cmd {
case "switchover":

log.Info(fmt.Sprintf("Switchover will be done from %v to another available replica in %v", patroniInfo.Master, scheduled_at))
log.Info(fmt.Sprintf("Switchover will be done from %v to %v in %v", values["leader"], values["candidate"], values["scheduled_at"]))

case "failover":

values = map[string]string{"candidate": candidate}
log.Info(fmt.Sprintf("Failover will be done from %v to %v", values["leader"], values["candidate"]))

log.Info(fmt.Sprintf("Failover will be done from %v to %v", patroniInfo.Master, candidate))
}

if attack.RemoteMode {
responce, err = execPatroniAttackByRemoteMode(address, attack.User, attack.Password, cmd, values)
if err != nil {
return err
}
} else if attack.LocalMode {
responce, err = execPatroniAttackByLocalMode(cmd, values)
if err != nil {
return err
}
}

patroniAddr := attack.Address
if attack.RemoteMode {
log.S().Infof("Execute %v successfully: %v", cmd, string(responce))
}

cmd := options.String()
if attack.LocalMode {
log.S().Infof("Execute %v successfully", cmd)
fmt.Println(string(responce))
}

return nil
}

func execPatroniAttackByRemoteMode(patroniAddr string, user string, password string, cmd string, values map[string]string) ([]byte, error) {

data, err := json.Marshal(values)
if err != nil {
err = errors.Errorf("failed to marshal data: %v", values)
return errors.WithStack(err)
return nil, errors.WithStack(err)
}

url = fmt.Sprintf("http://%v:8008/%v", patroniAddr, cmd)
buf, err := utils.MakeHTTPRequest(http.MethodPost, patroniAddr, 8008, cmd, data, user, password)
if err != nil {
return nil, errors.WithStack(err)
}

request, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
return buf, nil
}

func execPatroniAttackByLocalMode(cmd string, values map[string]string) ([]byte, error) {
var cmdTemplate string

if cmd == "failover" {
cmdTemplate = fmt.Sprintf("patronictl %v --master %v --candidate %v --force", cmd, values["leader"], values["candidate"])
} else if cmd == "switchover" {
cmdTemplate = fmt.Sprintf("patronictl %v --master %v --candidate %v --scheduled %v --force", cmd, values["leader"], values["candidate"], values["scheduled_at"])
}

execCmd := exec.Command("bash", "-c", cmdTemplate)
output, err := execCmd.CombinedOutput()
if err != nil {
err = errors.Errorf("failed to %v: %v", cmd, err)
return errors.WithStack(err)
log.S().Errorf(fmt.Sprintf("failed to %v: %v", cmdTemplate, string(output)))
return nil, err
}

request.Header.Set("Content-Type", "application/json")
request.SetBasicAuth(attack.User, attack.Password)
if strings.Contains(string(output), "failed") {
err = errors.New(string(output))
return nil, err
}

client := &http.Client{}
resp, error := client.Do(request)
if error != nil {
err = errors.Errorf("failed to %v: %v", cmd, err)
return errors.WithStack(err)
return output, nil
}

func isSynchronousClusterMode(patroniAddr string, user string, password string) (bool, error) {

buf, err := utils.MakeHTTPRequest(http.MethodGet, patroniAddr, 8008, "config", []byte{}, user, password)
if err != nil {
return false, err
}

defer resp.Body.Close()
patroni_responce := make(map[string]interface{})

buf, err := io.ReadAll(resp.Body)
err = json.Unmarshal(buf, &patroni_responce)
if err != nil {
err = errors.Errorf("failed to read %v responce: %v", cmd, err)
return errors.WithStack(err)
return false, fmt.Errorf("bad request %v %v", err.Error(), http.StatusBadRequest)
}

if resp.StatusCode != 200 && resp.StatusCode != 202 {
err = errors.Errorf("failed to %v: status code %v, responce %v", cmd, resp.StatusCode, string(buf))
return errors.WithStack(err)
mode_check, ok := patroni_responce["synchronous_mode"].(bool)
if !ok {
return false, fmt.Errorf("failed to cast synchronous_mode field from patroni responce")
}

log.S().Infof("Execute %v successfully: %v", cmd, string(buf))
if mode_check {
return true, nil
}

return false, nil

return nil
}

func (patroniAttack) Recover(exp core.Experiment, _ Environment) error {
Expand Down
Loading

0 comments on commit db9df01

Please sign in to comment.