diff --git a/binlogctl/config.go b/binlogctl/config.go index 6fae7a53e..413a0a4be 100644 --- a/binlogctl/config.go +++ b/binlogctl/config.go @@ -58,6 +58,9 @@ const ( // OfflineDrainer is comamnd used for offlien drainer. OfflineDrainer = "offline-drainer" + + // Encrypt is command used for encrypt password. + Encrypt = "encrypt" ) // Config holds the configuration of drainer @@ -74,6 +77,7 @@ type Config struct { SSLKey string `toml:"ssl-key" json:"ssl-key"` State string `toml:"state" json:"state"` ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"` + Text string `toml:"text" json:"text"` tls *tls.Config printVersion bool } @@ -83,7 +87,7 @@ func NewConfig() *Config { cfg := &Config{} cfg.FlagSet = flag.NewFlagSet("binlogctl", flag.ContinueOnError) - cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\"") + cfg.FlagSet.StringVar(&cfg.Command, "cmd", "pumps", "operator: \"generate_meta\", \"pumps\", \"drainers\", \"update-pump\", \"update-drainer\", \"pause-pump\", \"pause-drainer\", \"offline-pump\", \"offline-drainer\", \"encrypt\"") cfg.FlagSet.StringVar(&cfg.NodeID, "node-id", "", "id of node, use to update some node with operation update-pump, update-drainer, pause-pump, pause-drainer, offline-pump and offline-drainer") cfg.FlagSet.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "meta directory path") cfg.FlagSet.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints") @@ -93,6 +97,7 @@ func NewConfig() *Config { cfg.FlagSet.StringVar(&cfg.TimeZone, "time-zone", "", "set time zone if you want save time info in savepoint file, for example `Asia/Shanghai` for CST time, `Local` for local time") cfg.FlagSet.StringVar(&cfg.State, "state", "", "set node's state, can set to online, pausing, paused, closing or offline.") cfg.FlagSet.BoolVar(&cfg.ShowOfflineNodes, "show-offline-nodes", false, "include offline nodes when querying pumps/drainers") + cfg.FlagSet.StringVar(&cfg.Text, "text", "", "text to be encrypt when using encrypt command") cfg.FlagSet.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") return cfg diff --git a/binlogctl/encrypt.go b/binlogctl/encrypt.go new file mode 100644 index 000000000..5dc41a730 --- /dev/null +++ b/binlogctl/encrypt.go @@ -0,0 +1,18 @@ +package binlogctl + +import ( + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/pkg/encrypt" + "go.uber.org/zap" +) + +// EncryptHandler log the encrypted text if success or return error. +func EncryptHandler(text string) error { + enc, err := encrypt.Encrypt(text) + if err != nil { + return err + } + + log.Info("encrypt text", zap.String("encrypted", string(enc))) + return nil +} diff --git a/cmd/binlogctl/main.go b/cmd/binlogctl/main.go index eb9e104f4..7f56473fd 100644 --- a/cmd/binlogctl/main.go +++ b/cmd/binlogctl/main.go @@ -60,6 +60,12 @@ func main() { err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close) case ctl.OfflineDrainer: err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close) + case ctl.Encrypt: + if len(cfg.Text) == 0 { + err = errors.New("need to specify the text to be encrypt") + } else { + err = ctl.EncryptHandler(cfg.Text) + } default: err = errors.NotSupportedf("cmd %s", cfg.Command) } diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 75c3d14af..b6cb921a7 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -89,6 +89,8 @@ relay-read-buf-size = 8 host = "127.0.0.1" user = "root" password = "" +# if encrypted_password is not empty, password will be ignored. +encrypted_password = "" port = 3306 [syncer.to.checkpoint] @@ -101,6 +103,8 @@ port = 3306 # schema = "tidb_binlog" # host = "127.0.0.1" # user = "root" +# if encrypted_password is not empty, password will be ignored. +# encrypted_password = "" # password = "" # port = 3306 diff --git a/drainer/config.go b/drainer/config.go index 64a3aec8b..ddc0f8c3d 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/flags" "github.com/pingcap/tidb-binlog/pkg/security" @@ -380,11 +381,28 @@ func (cfg *Config) adjustConfig() error { } cfg.SyncerCfg.To.User = user } - if len(cfg.SyncerCfg.To.Password) == 0 { + + if len(cfg.SyncerCfg.To.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `to.encrypted_password`") + } + + cfg.SyncerCfg.To.Password = decrypt + } else if len(cfg.SyncerCfg.To.Password) == 0 { cfg.SyncerCfg.To.Password = os.Getenv("MYSQL_PSWD") } } + if len(cfg.SyncerCfg.To.Checkpoint.EncryptedPassword) > 0 { + decrypt, err := encrypt.Decrypt(cfg.SyncerCfg.To.EncryptedPassword) + if err != nil { + return errors.Annotate(err, "failed to decrypt password in `checkpoint.encrypted_password`") + } + + cfg.SyncerCfg.To.Checkpoint.Password = decrypt + } + cfg.SyncerCfg.adjustWorkCount() cfg.SyncerCfg.adjustDoDBAndTable() diff --git a/drainer/config_test.go b/drainer/config_test.go index abe561059..93a4dd378 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -21,9 +21,11 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" dsync "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/encrypt" "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb-binlog/pkg/util" pkgzk "github.com/pingcap/tidb-binlog/pkg/zk" @@ -151,6 +153,7 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249)) c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr) + // test EncryptedPassword cfg = NewConfig() cfg.ListenAddr = "0.0.0.0:8257" cfg.AdvertiseAddr = "192.168.15.12:8257" @@ -158,6 +161,33 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257") c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257") + + cfg = NewConfig() + encrypted, err := encrypt.Encrypt("origin") + c.Assert(err, IsNil) + + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: string(encrypted), + }, + } + err = cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.SyncerCfg.To.Password, check.Equals, "origin") + c.Assert(cfg.SyncerCfg.To.Checkpoint.Password, check.Equals, "origin") + + // test false positive + cfg.SyncerCfg.To = &dsync.DBConfig{ + EncryptedPassword: "what ever" + string(encrypted), + Checkpoint: dsync.CheckpointConfig{ + EncryptedPassword: "what ever" + string(encrypted), + }, + } + + c.Logf("to.password: %v", cfg.SyncerCfg.To.Password) + err = cfg.adjustConfig() + c.Assert(err, NotNil) } func (t *testDrainerSuite) TestConfigParsingFileWithInvalidOptions(c *C) { diff --git a/drainer/sync/util.go b/drainer/sync/util.go index 3fd182d2f..8738dc530 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -20,12 +20,14 @@ import ( // DBConfig is the DB configuration. type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` - Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` - BinlogFileDir string `toml:"dir" json:"dir"` + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` + Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` + BinlogFileDir string `toml:"dir" json:"dir"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` @@ -43,7 +45,9 @@ type CheckpointConfig struct { Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` + // if EncryptedPassword is not empty, Password will be ignore. + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` } type baseError struct { diff --git a/pkg/encrypt/encrypt.go b/pkg/encrypt/encrypt.go new file mode 100644 index 000000000..690a07942 --- /dev/null +++ b/pkg/encrypt/encrypt.go @@ -0,0 +1,159 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "encoding/hex" + "os" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/log" +) + +var ( + defaultSecretKey, _ = hex.DecodeString("a529b7665997f043a30ac8fadcb51d6aa032c226ab5b7750530b12b8c1a16a48") + secretKey []byte + ivSep = []byte("@") // ciphertext format: iv + ivSep + encrypted-plaintext +) + +var initSecretKeyOnce sync.Once +var initSecretKeyErr error + +func initSecretKey() error { + hexKey := os.Getenv("BINLOG_SECRET_KEY") + if len(hexKey) == 0 { + log.Warn("use the default secret key to encrypt") + secretKey = defaultSecretKey + return nil + } + + key, err := hex.DecodeString(hexKey) + if err != nil { + return errors.Trace(err) + } + + return SetSecretKey(key) +} + +// SetSecretKey sets the secret key which used to encrypt +func SetSecretKey(key []byte) error { + switch len(key) { + case 16, 24, 32: + break + default: + return errors.Errorf("secretKey not valid: %v", key) + } + secretKey = key + return nil +} + +// Encrypt tries to encrypt plaintext to base64 encoded ciphertext +func Encrypt(plaintext string) (string, error) { + ciphertext, err := encrypt([]byte(plaintext)) + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(ciphertext), nil +} + +// Decrypt tries to decrypt base64 encoded ciphertext to plaintext +func Decrypt(ciphertextB64 string) (string, error) { + ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64) + if err != nil { + return "", errors.Annotatef(err, "base 64 failed to decode: %s", ciphertext) + } + + plaintext, err := decrypt(ciphertext) + if err != nil { + return "", errors.Trace(err) + } + return string(plaintext), nil +} + +// encrypt encrypts plaintext to ciphertext +func encrypt(plaintext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, errors.Trace(err) + } + + iv, err := genIV(block.BlockSize()) + if err != nil { + return nil, err + } + + ciphertext := make([]byte, 0, len(iv)+len(ivSep)+len(plaintext)) + ciphertext = append(ciphertext, iv...) + ciphertext = append(ciphertext, ivSep...) + ciphertext = append(ciphertext, plaintext...) // will be overwrite by XORKeyStream + + stream := cipher.NewCFBEncrypter(block, iv) + stream.XORKeyStream(ciphertext[len(iv)+len(ivSep):], plaintext) + + return ciphertext, nil +} + +// decrypt decrypts ciphertext to plaintext +func decrypt(ciphertext []byte) ([]byte, error) { + initSecretKeyOnce.Do(func() { + initSecretKeyErr = initSecretKey() + }) + if initSecretKeyErr != nil { + return nil, initSecretKeyErr + } + + block, err := aes.NewCipher(secretKey) + if err != nil { + return nil, err + } + + if len(ciphertext) < block.BlockSize()+len(ivSep) { + // return nil, terror.ErrCiphertextLenNotValid.Generate(block.BlockSize()+len(ivSep), len(ciphertext)) + return nil, errors.Errorf("ciphertext not valid") + } + + if !bytes.Equal(ciphertext[block.BlockSize():block.BlockSize()+len(ivSep)], ivSep) { + // return nil, terror.ErrCiphertextContextNotValid.Generate() + return nil, errors.Errorf("ciphertext not valid") + } + + iv := ciphertext[:block.BlockSize()] + ciphertext = ciphertext[block.BlockSize()+len(ivSep):] + plaintext := make([]byte, len(ciphertext)) + + stream := cipher.NewCFBDecrypter(block, iv) + stream.XORKeyStream(plaintext, ciphertext) + + return plaintext, nil +} + +func genIV(n int) ([]byte, error) { + b := make([]byte, n) + _, err := rand.Read(b) + return b, errors.Trace(err) +} diff --git a/pkg/encrypt/encrypt_test.go b/pkg/encrypt/encrypt_test.go new file mode 100644 index 000000000..9889a5acf --- /dev/null +++ b/pkg/encrypt/encrypt_test.go @@ -0,0 +1,92 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package encrypt + +import ( + "crypto/aes" + "crypto/rand" + "testing" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testEncryptSuite{}) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testEncryptSuite struct { +} + +func (t *testEncryptSuite) TestSetSecretKey(c *C) { + // 16 bit + b16 := make([]byte, 16) + _, err := rand.Read(b16) + c.Assert(err, IsNil) + + err = SetSecretKey(b16) + c.Assert(err, IsNil) + + // 20 bit + b20 := make([]byte, 20) + _, err = rand.Read(b20) + c.Assert(err, IsNil) + + err = SetSecretKey(b20) + c.Assert(err, NotNil) +} + +func removeChar(input []byte, c byte) []byte { + i := 0 + for _, x := range input { + if x != c { + input[i] = x + i++ + } + } + return input[:i] +} + +func (t *testEncryptSuite) TestEncrypt(c *C) { + plaintext := []byte("a plain text") + + // encrypt + ciphertext, err := encrypt(plaintext) + c.Assert(err, IsNil) + + // decrypt + plaintext2, err := decrypt(ciphertext) + c.Assert(err, IsNil) + c.Assert(plaintext2, DeepEquals, plaintext) + + // invalid length + _, err = decrypt(ciphertext[:len(ciphertext)-len(plaintext)-1]) + c.Assert(err, NotNil) + + // invalid content + _, err = decrypt(removeChar(ciphertext, ivSep[0])) + c.Assert(err, NotNil) + + // a special case, we construct a ciphertext that can be decrypted but the + // plaintext is not what we want. This is because currently encrypt mechanism + // doesn't keep enough information to decide whether the new ciphertext is valid + block, err := aes.NewCipher(secretKey) + c.Assert(err, IsNil) + blockSize := block.BlockSize() + c.Assert(len(ciphertext), Greater, blockSize+2) + plaintext3, err := decrypt(append(ciphertext[1:blockSize+1], append([]byte{ivSep[0]}, ciphertext[blockSize+2:]...)...)) + c.Assert(err, IsNil) + c.Assert(plaintext3, Not(DeepEquals), plaintext) +} diff --git a/scripts/groovy/binlog_ghpr_integration.groovy b/scripts/groovy/binlog_ghpr_integration.groovy index 3d56554c0..68b1ef949 100644 --- a/scripts/groovy/binlog_ghpr_integration.groovy +++ b/scripts/groovy/binlog_ghpr_integration.groovy @@ -24,7 +24,7 @@ if (m3) { m3 = null println "TIDB_BRANCH=${TIDB_BRANCH}" -catchError { +try { def buildSlave = "${GO_BUILD_SLAVE}" stage('Prepare') { node (buildSlave) { @@ -94,6 +94,7 @@ catchError { tests["Integration Test"] = { podTemplate(label: label, + idleMinutes: 60, containers: [ containerTemplate(name: 'golang',alwaysPullImage: false, image: "${GO_DOCKER_IMAGE}", resourceRequestCpu: '2000m', resourceRequestMemory: '4Gi', @@ -122,6 +123,8 @@ catchError { emptyDirVolume(mountPath: '/home/jenkins', memory: true) ]) { node(label) { + println "debug node:\n ssh root@172.16.5.15" + println "debug command:\nkubectl -n jenkins-ci exec -ti ${NODE_NAME} bash" container("golang") { def ws = pwd() deleteDir() @@ -163,6 +166,10 @@ catchError { } currentBuild.result = "SUCCESS" +}catch (Exception e) { + currentBuild.result = "FAILURE" + slackcolor = 'danger' + echo "${e}" } stage('Summary') {