Skip to content

Commit

Permalink
Add config advertise-addr for drainer (pingcap#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored and IANTHEREAL committed Jun 18, 2019
1 parent b892cf5 commit 968b6ac
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 19 deletions.
5 changes: 4 additions & 1 deletion cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

# addr (i.e. 'host:port') to listen on for drainer connections
# will register this addr into etcd
# addr = "127.0.0.1:8249"
addr = "127.0.0.1:8249"

# addr(i.e. 'host:port') to advertise to the public
advertise-addr = ""

# the interval time (in seconds) of detect pumps' status
detect-interval = 10
Expand Down
39 changes: 26 additions & 13 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Config struct {
*flag.FlagSet `json:"-"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
Expand Down Expand Up @@ -109,6 +110,7 @@ func NewConfig() *Config {
fs.PrintDefaults()
}
fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8249), "addr (i.e. 'host:port') to listen on for drainer connections")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public, default to be the same value as -addr")
fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "drainer data directory path (default data.drainer)")
fs.IntVar(&cfg.DetectInterval, "detect-interval", defaultDetectInterval, "the interval time (in seconds) of detect pumps' status")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
Expand Down Expand Up @@ -224,19 +226,11 @@ func (cfg *Config) configFromFile(path string) error {

// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check ListenAddr
urllis, err := url.Parse(cfg.ListenAddr)
if err != nil {
return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err)
if err := validateAddr(cfg.ListenAddr); err != nil {
return errors.Annotate(err, "invalid addr")
}

if !util.IsValidateListenHost(host) {
log.Warn("pump may not be able to access drainer using this listen addr config", zap.String("listen addr", host))
if err := validateAddr(cfg.AdvertiseAddr); err != nil {
return errors.Annotate(err, "invalid advertise-addr")
}

// check EtcdEndpoints
Expand Down Expand Up @@ -264,7 +258,9 @@ func (cfg *Config) validate() error {
func (cfg *Config) adjustConfig() error {
// adjust configuration
util.AdjustString(&cfg.ListenAddr, util.DefaultListenAddr(8249))
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.AdvertiseAddr, cfg.ListenAddr)
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing
util.AdjustString(&cfg.DataDir, defaultDataDir)
util.AdjustInt(&cfg.DetectInterval, defaultDetectInterval)

Expand Down Expand Up @@ -349,3 +345,20 @@ func (cfg *Config) adjustConfig() error {

return nil
}

func validateAddr(addr string) error {
urllis, err := url.Parse(addr)
if err != nil {
return errors.Annotatef(err, "failed to parse addr %v", addr)
}

var host string
if host, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Annotatef(err, "invalid host %v", urllis.Host)
}

if !util.IsValidateListenHost(host) {
log.Warn("pump may not be able to access drainer using this addr", zap.String("listen addr", addr))
}
return nil
}
27 changes: 24 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coreos/etcd/integration"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var testEtcdCluster *integration.ClusterV3
Expand All @@ -36,17 +37,19 @@ type testDrainerSuite struct{}

func (t *testDrainerSuite) TestConfig(c *C) {
args := []string{
"-metrics-addr", "127.0.0.1:9091",
"-metrics-addr", "192.168.15.10:9091",
"-txn-batch", "1",
"-data-dir", "data.drainer",
"-dest-db-type", "mysql",
"-config", "../cmd/drainer/drainer.toml",
"-addr", "192.168.15.10:8257",
"-advertise-addr", "192.168.15.10:8257",
}

cfg := NewConfig()
err := cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "127.0.0.1:9091")
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
c.Assert(cfg.DataDir, Equals, "data.drainer")
c.Assert(cfg.SyncerCfg.TxnBatch, Equals, 1)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "mysql")
Expand All @@ -61,9 +64,13 @@ func (t *testDrainerSuite) TestValidate(c *C) {

cfg.ListenAddr = "http://123:9091"
err := cfg.validate()
c.Assert(err, ErrorMatches, ".*ListenAddr.*")
c.Assert(err, ErrorMatches, ".*invalid addr.*")

cfg.ListenAddr = "http://192.168.10.12:9091"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*invalid advertise-addr.*")

cfg.AdvertiseAddr = "http://192.168.10.12:9091"
cfg.EtcdURLs = "127.0.0.1:2379,127.0.0.1:2380"
err = cfg.validate()
c.Assert(err, ErrorMatches, ".*EtcdURLs.*")
Expand Down Expand Up @@ -96,4 +103,18 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)

cfg = NewConfig()
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://"+util.DefaultListenAddr(8249))
c.Assert(cfg.AdvertiseAddr, Equals, cfg.ListenAddr)

cfg = NewConfig()
cfg.ListenAddr = "0.0.0.0:8257"
cfg.AdvertiseAddr = "192.168.15.12:8257"
err = cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.ListenAddr, Equals, "http://0.0.0.0:8257")
c.Assert(cfg.AdvertiseAddr, Equals, "http://192.168.15.12:8257")
}
4 changes: 2 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func NewServer(cfg *Config) (*Server, error) {
)
}

advURL, err := url.Parse(cfg.ListenAddr)
advURL, err := url.Parse(cfg.AdvertiseAddr)
if err != nil {
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.ListenAddr)
return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr)
}

status := node.NewStatus(ID, advURL.Host, node.Online, 0, syncer.GetLatestCommitTS(), util.GetApproachTS(latestTS, latestTime))
Expand Down

0 comments on commit 968b6ac

Please sign in to comment.