Skip to content

Commit

Permalink
新增repeater写入kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
吴迎松 committed Sep 19, 2017
1 parent 20724b9 commit 0f1fd94
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
5 changes: 5 additions & 0 deletions conf/repeater.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# opentsdb, repeater, kafka
backend=opentsdb
tcp_bind=0.0.0.0:10040

opentsdb_addr=127.0.0.1:4242

repeater_addr=

brokers =
topics = owl

max_packet_size=4096
buffer_size=1048576

Expand Down
4 changes: 4 additions & 0 deletions repeater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
TCP_BIND string

OPENTSDB_ADDR string
KAFKA_BROKERS []string
KAFKA_TOPIC string

REPEATER_ADDR string
//LOG CONFIG
Expand All @@ -43,6 +45,8 @@ func InitGlobalConfig() error {
TCP_BIND: cfg.MustValue(goconfig.DEFAULT_SECTION, "tcp_bind", DEFAULT_TCP_BIND),
OPENTSDB_ADDR: cfg.MustValue(goconfig.DEFAULT_SECTION, "opentsdb_addr", DEFAULT_OPENTSDB_ADDR),
REPEATER_ADDR: cfg.MustValue(goconfig.DEFAULT_SECTION, "repeater_addr", DEFAULT_REPEATER_ADDR),
KAFKA_BROKERS: cfg.MustValueArray(goconfig.DEFAULT_SECTION, "kafka_brokers", ","),
KAFKA_TOPIC: cfg.MustValue(goconfig.DEFAULT_SECTION, "kafka_topic", "owl"),

MAX_PACKET_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "max_packet_size", DEFAULT_MAX_PACKET_SIZE),
BUFFER_SIZE: cfg.MustInt64(goconfig.DEFAULT_SECTION, "buffer_size", DEFAULT_BUFFER_SIZE),
Expand Down
2 changes: 1 addition & 1 deletion repeater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {
}

if err = InitRepeater(); err != nil {
fmt.Println(err)
lg.Error("init repeater error, %s", err)
return
}
go repeater.Forward()
Expand Down
21 changes: 9 additions & 12 deletions repeater/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"errors"
"fmt"
"owl/common/tcp"
"owl/common/types"
Expand All @@ -24,21 +23,19 @@ func InitRepeater() error {
repeater = &Repeater{}
repeater.srv = s
repeater.buffer = make(chan []byte, GlobalConfig.BUFFER_SIZE)
var err error
switch GlobalConfig.BACKEND {
case "opentsdb":
backend, err := backend.NewOpentsdbBackend(GlobalConfig.OPENTSDB_ADDR)
if err != nil {
return err
}
repeater.backend = backend
repeater.backend, err = backend.NewOpentsdbBackend(GlobalConfig.OPENTSDB_ADDR)
case "repeater":
backend, err := backend.NewRepeaterBackend(GlobalConfig.REPEATER_ADDR)
if err != nil {
return err
}
repeater.backend = backend
repeater.backend, err = backend.NewRepeaterBackend(GlobalConfig.REPEATER_ADDR)
case "kafka":
repeater.backend, err = backend.NewKafkaBackend(GlobalConfig.KAFKA_BROKERS, GlobalConfig.KAFKA_TOPIC)
default:
return errors.New(fmt.Sprintf("unsupported backend %s", GlobalConfig.BACKEND))
err = fmt.Errorf("unsupported backend %s", GlobalConfig.BACKEND)
}
if err != nil {
return fmt.Errorf("%s error:%s", GlobalConfig.BACKEND, err)
}
return repeater.srv.ListenAndServe()
}
Expand Down

0 comments on commit 0f1fd94

Please sign in to comment.