Skip to content

Commit 0fa7a8e

Browse files
authored
Kafka attack: support more auth mechanisms (#233)
* kafka attack: support more auth mechanism Signed-off-by: cwen0 <[email protected]> * fix typo Signed-off-by: cwen0 <[email protected]> * format header Signed-off-by: cwen0 <[email protected]> * format header Signed-off-by: cwen0 <[email protected]> --------- Signed-off-by: cwen0 <[email protected]>
1 parent 3a6efb7 commit 0fa7a8e

File tree

4 files changed

+61
-11
lines changed

4 files changed

+61
-11
lines changed

Diff for: cmd/attack/kafka.go

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func NewKafkaFillCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comma
6767
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
6868
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
6969
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
70+
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plain", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
7071
cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 4*1024, "the size of each message")
7172
cmd.Flags().Uint64VarP(&options.MaxBytes, "max-bytes", "m", 1<<34, "the max bytes to fill")
7273
cmd.Flags().StringVarP(&options.ReloadCommand, "reload-cmd", "r", "", "the command to reload kafka config")
@@ -87,6 +88,7 @@ func NewKafkaFloodCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comm
8788
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
8889
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
8990
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
91+
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plain", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
9092
cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 1024, "the size of each message")
9193
cmd.Flags().UintVarP(&options.Threads, "threads", "t", 100, "the numbers of worker threads")
9294
return cmd
@@ -107,6 +109,7 @@ func NewKafkaIOCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Command
107109
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
108110
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
109111
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
112+
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plain", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
110113
cmd.Flags().StringVarP(&options.ConfigFile, "config", "c", "/etc/kafka/server.properties", "the path of server config")
111114
cmd.Flags().BoolVarP(&options.NonReadable, "non-readable", "r", false, "make kafka cluster non-readable")
112115
cmd.Flags().BoolVarP(&options.NonWritable, "non-writable", "w", false, "make kafka cluster non-writable")

Diff for: pkg/core/jvm.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ type JVMStressSpec struct {
139139

140140
// JVMMySQLSpec is the specification of MySQL fault injection in JVM
141141
// only when SQL match the Database, Table and SQLType, chaosd will inject fault
142-
// for examle:
142+
// for example:
143+
//
143144
// SQL is "select * from test.t1",
144145
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
145146
type JVMMySQLSpec struct {

Diff for: pkg/core/kafka.go

+42-9
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,18 @@ type KafkaAttackAction string
2424

2525
const (
2626
// Kafka actions
27-
KafkaFillAction = "fill"
28-
KafkaFloodAction = "flood"
29-
KafkaIOAction = "io"
27+
KafkaFillAction KafkaAttackAction = "fill"
28+
KafkaFloodAction = "flood"
29+
KafkaIOAction = "io"
30+
)
31+
32+
type KafkaAuthMechanism string
33+
34+
const (
35+
SaslPlain KafkaAuthMechanism = "sasl/plain"
36+
SaslScream256 = "sasl/scram-sha-256"
37+
SaslScram512 = "sasl/scram-sha-512"
38+
AuthMechanismEmpty = ""
3039
)
3140

3241
var _ AttackConfig = &KafkaCommand{}
@@ -39,12 +48,13 @@ type KafkaCommand struct {
3948
Topic string `json:"topic,omitempty"`
4049

4150
// options for fill and flood attack
42-
Host string `json:"host,omitempty"`
43-
Port uint16 `json:"port,omitempty"`
44-
Username string `json:"username,omitempty"`
45-
Password string `json:"password,omitempty"`
46-
MessageSize uint `json:"messageSize,omitempty"`
47-
MaxBytes uint64 `json:"maxBytes,omitempty"`
51+
Host string `json:"host,omitempty"`
52+
Port uint16 `json:"port,omitempty"`
53+
Username string `json:"username,omitempty"`
54+
Password string `json:"password,omitempty"`
55+
AuthMechanism string `json:"authMechanism,omitempty"`
56+
MessageSize uint `json:"messageSize,omitempty"`
57+
MaxBytes uint64 `json:"maxBytes,omitempty"`
4858

4959
// options for fill attack
5060
ReloadCommand string `json:"reloadCommand,omitempty"`
@@ -69,6 +79,10 @@ func (c *KafkaCommand) Validate() error {
6979
return errors.New("topic is required")
7080
}
7181

82+
if err := c.validateAuthMechanism(); err != nil {
83+
return err
84+
}
85+
7286
switch c.Action {
7387
case KafkaFillAction:
7488
return c.validateFillAction()
@@ -81,6 +95,25 @@ func (c *KafkaCommand) Validate() error {
8195
}
8296
}
8397

98+
func (c *KafkaCommand) validateAuthMechanism() error {
99+
if c.Username != "" && c.AuthMechanism == "" {
100+
return errors.New("auth mechanism is required")
101+
}
102+
103+
switch KafkaAuthMechanism(c.AuthMechanism) {
104+
case SaslPlain:
105+
fallthrough
106+
case SaslScram512:
107+
fallthrough
108+
case SaslScream256:
109+
fallthrough
110+
case AuthMechanismEmpty:
111+
return nil
112+
default:
113+
return errors.Errorf("invalid auth mechanism: %s", c.AuthMechanism)
114+
}
115+
}
116+
84117
func (c *KafkaCommand) validateDSNAndMessageSize() error {
85118
if c.Host == "" {
86119
return errors.New("host is required")

Diff for: pkg/server/chaosd/kafka.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/pingcap/log"
3333
perr "github.com/pkg/errors"
3434
client "github.com/segmentio/kafka-go"
35+
"github.com/segmentio/kafka-go/sasl/plain"
3536
"github.com/segmentio/kafka-go/sasl/scram"
3637
"go.uber.org/zap"
3738

@@ -77,7 +78,19 @@ func newDialer(attack *core.KafkaCommand) (dialer *client.Dialer, err error) {
7778
DualStack: true,
7879
}
7980
if attack.Username != "" {
80-
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password)
81+
switch core.KafkaAuthMechanism(attack.AuthMechanism) {
82+
case core.SaslPlain:
83+
dialer.SASLMechanism = plain.Mechanism{
84+
Username: attack.Username,
85+
Password: attack.Password,
86+
}
87+
case core.SaslScream256:
88+
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA256, attack.Username, attack.Password)
89+
case core.SaslScram512:
90+
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password)
91+
default:
92+
return nil, errors.Errorf("invalid auth mechanism: %s", attack.AuthMechanism)
93+
}
8194
if err != nil {
8295
return nil, perr.Wrap(err, "create scram mechanism")
8396
}

0 commit comments

Comments
 (0)