@@ -5,14 +5,12 @@ import (
5
5
"flag"
6
6
"log"
7
7
"os"
8
- "os/exec"
9
- "path/filepath"
10
8
"net"
11
9
"net/rpc"
12
10
"net/http"
13
11
"bufio"
14
12
"sync"
15
- "errors "
13
+ "strings "
16
14
"andreworals.com/dlogger/leveled_logger"
17
15
"andreworals.com/dlogger/chan_grep"
18
16
"github.com/hsfzxjy/go-srpc"
@@ -24,8 +22,6 @@ import (
24
22
25
23
var logger leveled_logger.LeveledLogger
26
24
27
- const rpcPort = "1234"
28
- const kListenPort = 1235
29
25
var logDir = flag .String ("d" , "logs" , "log directory" )
30
26
var debugLevel = flag .Int ("v" , 1 , "verbosity level, 0=off 1=normal 2=debug" )
31
27
var machineAddrs []string
@@ -39,12 +35,12 @@ type GrepClientRpc string
39
35
40
36
// RPC from queried log server to log server
41
37
// Takes output from grep and sends it over a TCP connection to the caller
42
- func (* LogServerRpc ) LoggerQuery (args Args , s * srpc.Session ) error {
38
+ func (* LogServerRpc ) LoggerQuery (args chan_grep. Args , s * srpc.Session ) error {
43
39
return srpc .S ( func () error {
44
40
resultsCh := make (chan string )
45
41
errCh := make (chan error )
46
42
47
- go grepLogs (args , * logDir , resultsCh , errCh )
43
+ go chan_grep . GrepLogs (args , * logDir , resultsCh , errCh )
48
44
49
45
for {
50
46
select {
@@ -59,11 +55,12 @@ func (*LogServerRpc) LoggerQuery(args Args, s *srpc.Session) error {
59
55
}, s , nil )
60
56
}
61
57
62
- func serverReader (wg * sync.WaitGroup , args Args , addrStr string , listenPort int , ch chan <- string ) {
58
+ func serverReader (wg * sync.WaitGroup , args chan_grep. Args , addrStr string , ch chan <- string ) {
63
59
defer wg .Done ()
64
60
65
- c , err := rpc .DialHTTP ("tcp" , addrStr + ":" + rpcPort )
61
+ c , err := rpc .DialHTTP ("tcp" , addrStr )
66
62
if err != nil {
63
+ logger .PrintfNormal ("Could not dial logserver at %s: %s\n " , addrStr , err )
67
64
return
68
65
}
69
66
client := srpc .WrapClient (c )
@@ -83,7 +80,7 @@ func done(wg *sync.WaitGroup, doneCh chan<- Nothing, ch chan<- string) {
83
80
84
81
// RPC from grep client to query log server
85
82
// Calls each log server to retrieve the its logs and populates a channel to send back to the grep client over TCP
86
- func (* GrepClientRpc ) GrepQuery (args Args , s * srpc.Session ) error {
83
+ func (* GrepClientRpc ) GrepQuery (args chan_grep. Args , s * srpc.Session ) error {
87
84
logger .PrintfDebug ("Received grep query: %s\n " , args .Query )
88
85
return srpc .S ( func () error {
89
86
// query the other servers
@@ -92,15 +89,15 @@ func (*GrepClientRpc) GrepQuery(args Args, s *srpc.Session) error {
92
89
var wg sync.WaitGroup
93
90
94
91
// Grep from other servers
95
- for i , addrStr := range machineAddrs {
92
+ for _ , addr := range machineAddrs {
96
93
wg .Add (1 )
97
- go serverReader (& wg , args , addrStr , kListenPort + i , grepResultsCh )
94
+ go serverReader (& wg , args , addr , grepResultsCh )
98
95
}
99
96
// Grep from this server
100
97
wg .Add (1 )
101
98
go func (wgroup * sync.WaitGroup ) {
102
99
defer wgroup .Done ()
103
- grepLogs (args , grepResultsCh , nil )
100
+ chan_grep . GrepLogs (args , * logDir , grepResultsCh , nil )
104
101
logger .PrintfDebug ("grepLogs done\n " )
105
102
}(& wg )
106
103
// Call back when the grep calls return
@@ -132,15 +129,20 @@ func readMachineFile(fileName string) {
132
129
133
130
lines := bufio .NewScanner (file ) // use default ScanLines
134
131
for lines .Scan () {
135
- machineAddrs = append (machineAddrs , lines .Text ())
132
+ str := lines .Text ()
133
+ addr_tmp := strings .Split (str , ":" )
134
+ if len (addr_tmp ) != 2 {
135
+ fmt .Fprintf (os .Stderr , "Invalid address or port: %s\n Format is <address>:<port>\n " , str )
136
+ }
137
+ machineAddrs = append (machineAddrs , str )
136
138
}
137
139
138
140
logger .PrintfDebug ("Addresses: %v\n " , machineAddrs )
139
141
}
140
142
141
143
func usage () {
142
144
fmt .Fprintf (os .Stderr ,
143
- "Usage: %s [-d <log directory>] [-v <integer debug level>] <machine file>\n " ,
145
+ "Usage: %s [-d <log directory>] [-v <integer debug level>] <port> < machine file>\n " ,
144
146
os .Args [0 ])
145
147
flag .PrintDefaults ()
146
148
}
@@ -152,9 +154,12 @@ func main() {
152
154
logger .SetLevel (leveled_logger .LogLevel (* debugLevel ))
153
155
defer logger .AutoPrefix ("main: " )()
154
156
155
- if flag .NArg () == 1 {
156
- readMachineFile (flag .Args ()[0 ])
157
+ if flag .NArg () != 2 {
158
+ usage ()
159
+ os .Exit (1 )
157
160
}
161
+ rpcPort := flag .Args ()[0 ]
162
+ readMachineFile (flag .Args ()[1 ])
158
163
159
164
rpc .Register (new (GrepClientRpc ))
160
165
rpc .Register (new (LogServerRpc ))
0 commit comments