forked from fgeller/kt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
129 lines (103 loc) · 1.87 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/Shopify/sarama"
)
type consumerConfig struct {
topic string
brokers []string
startOffset int64
endOffset int64
json bool
timeout time.Duration
args struct {
topic string
brokers string
timeout time.Duration
offsets string
json bool
}
}
type listConfig struct {
brokers []string
args struct {
brokers string
}
}
var config struct {
consume consumerConfig
list listConfig
}
func listenForInterrupt() chan struct{} {
closer := make(chan struct{})
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Kill, os.Interrupt)
<-signals
log.Printf("Received interrupt - shutting down...")
close(closer)
}()
return closer
}
func print(msg *sarama.ConsumerMessage) {
if config.consume.json {
fmt.Printf(
`{"partition":%v,"offset":%v,"key":%#v,"message":%#v}
`,
msg.Partition,
msg.Offset,
string(msg.Key),
string(msg.Value),
)
return
}
fmt.Printf(
"Partition=%v Offset=%v Key=%s Message=%s\n",
msg.Partition,
msg.Offset,
msg.Key,
msg.Value,
)
}
type command struct {
flags *flag.FlagSet
parseArgs func([]string)
run func(chan struct{})
}
var usageMessage = `kt is a tool for Kafka.
Usage:
kt command [arguments]
The commands are:
consume consume messages.
list list topics.
Use "kt [command] -help" for for information about the command.
`
func usage() {
fmt.Fprintln(os.Stderr, usageMessage)
os.Exit(2)
}
func parseArgs() command {
if len(os.Args) < 2 {
usage()
}
commands := map[string]command{
"consume": consumerCommand(),
"list": listCommand(),
}
cmd, ok := commands[os.Args[1]]
if !ok {
usage()
}
cmd.parseArgs(os.Args[2:])
return cmd
}
func main() {
cmd := parseArgs()
closer := listenForInterrupt()
cmd.run(closer)
}