-
-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathdnsprocessor.go
111 lines (92 loc) · 3.02 KB
/
dnsprocessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package workers
import (
"time"
"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/transformers"
"github.com/dmachard/go-logger"
)
type DNSProcessor struct {
*GenericWorker
}
func NewDNSProcessor(config *pkgconfig.Config, logger *logger.Logger, name string, size int) DNSProcessor {
w := DNSProcessor{GenericWorker: NewGenericWorker(config, logger, name, "dns processor", size, pkgconfig.DefaultMonitor)}
return w
}
func (w *DNSProcessor) StartCollect() {
w.LogInfo("starting data collection")
defer w.CollectDone()
// prepare next channels
defaultRoutes, defaultNames := GetRoutes(w.GetDefaultRoutes())
droppedRoutes, droppedNames := GetRoutes(w.GetDroppedRoutes())
// prepare enabled transformers
transforms := transformers.NewTransforms(&w.GetConfig().IngoingTransformers, w.GetLogger(), w.GetName(), defaultRoutes, 0)
// read incoming dns message
for {
select {
case cfg := <-w.NewConfig():
w.SetConfig(cfg)
transforms.ReloadConfig(&cfg.IngoingTransformers)
case <-w.OnStop():
transforms.Reset()
return
case dm, opened := <-w.GetInputChannel():
if !opened {
w.LogInfo("channel closed, exit")
return
}
// count global messages
w.CountIngressTraffic()
// compute timestamp
ts := time.Unix(int64(dm.DNSTap.TimeSec), int64(dm.DNSTap.TimeNsec))
dm.DNSTap.Timestamp = ts.UnixNano()
dm.DNSTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano)
// decode the dns payload
dnsHeader, err := dnsutils.DecodeDNS(dm.DNS.Payload)
if err != nil {
dm.DNS.MalformedPacket = true
w.LogError("dns parser malformed packet: %s - %v+", err, dm)
}
// get number of questions and answers
dm.DNS.QdCount = dnsHeader.Qdcount
dm.DNS.AnCount = dnsHeader.Ancount
dm.DNS.ArCount = dnsHeader.Arcount
dm.DNS.NsCount = dnsHeader.Nscount
// dns reply ?
if dnsHeader.Qr == 1 {
dm.DNSTap.Operation = "CLIENT_RESPONSE"
dm.DNS.Type = dnsutils.DNSReply
qip := dm.NetworkInfo.QueryIP
qport := dm.NetworkInfo.QueryPort
dm.NetworkInfo.QueryIP = dm.NetworkInfo.ResponseIP
dm.NetworkInfo.QueryPort = dm.NetworkInfo.ResponsePort
dm.NetworkInfo.ResponseIP = qip
dm.NetworkInfo.ResponsePort = qport
} else {
dm.DNS.Type = dnsutils.DNSQuery
dm.DNSTap.Operation = dnsutils.DNSTapClientQuery
}
if err = dnsutils.DecodePayload(&dm, &dnsHeader, w.GetConfig()); err != nil {
w.LogError("%v - %v", err, dm)
}
if dm.DNS.MalformedPacket {
if w.GetConfig().Global.Trace.LogMalformed {
w.LogInfo("payload: %v", dm.DNS.Payload)
}
}
// count output packets
w.CountEgressTraffic()
// apply all enabled transformers
transformResult, err := transforms.ProcessMessage(&dm)
if err != nil {
w.LogError(err.Error())
}
if transformResult == transformers.ReturnDrop {
w.SendDroppedTo(droppedRoutes, droppedNames, dm)
continue
}
// dispatch dns message to all generators
w.SendForwardedTo(defaultRoutes, defaultNames, dm)
}
}
}