Skip to content

Commit

Permalink
Merge pull request #25 from debugloop/flowtime
Browse files Browse the repository at this point in the history
Add FlowTimeStart and FlowTimeEnd
  • Loading branch information
lspgn authored May 1, 2019
2 parents 959ed13 + ec20c3b commit def764f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 68 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ The format is the following:
| ----- | ----------- |
| FlowType | Indicates the protocol (IPFIX, NetFlow v9, sFlow v5) |
| TimeRecvd | Timestamp the packet was received by the collector |
| TimeFlow | Timestamp of the packet (same as TimeRecvd in sFlow, in NetFlow it's the uptime of the router minus LAST_SWITCHED field, in IPFIX it's flowEnd* field) |
| TimeFlow | Timestamp of the packet (same as TimeRecvd in sFlow, in NetFlow it's the uptime of the router minus LAST_SWITCHED field, in IPFIX it's flowEnd* field), meant to be replaced by TimeFlowEnd |
| SamplingRate | Sampling rate of the flow, used to extrapolate the number of bytes and packets |
| SequenceNum | Sequence number of the packet |
| SrcIP | Source IP (sequence of bytes, can be IPv4 or IPv6) |
Expand Down Expand Up @@ -172,6 +172,8 @@ The format is the following:
| FragmentId | IP Fragment Identifier |
| FragmentOffset | IP Fragment Offset |
| IPv6FlowLabel | IPv6 Flow Label |
| TimeFlowStart | Start Timestamp of the flow (this field is empty for sFlow, in NetFlow it's the uptime of the router minus FIRST_SWITCHED field, in IPFIX it's flowStart* field) |
| TimeFlowEnd | End Timestamp of the flow (same as TimeRecvd in sFlow, in NetFlow it's the uptime of the router minus LAST_SWITCHED field, in IPFIX it's flowEnd* field) |
| IngressVrfId | Ingress VRF ID |
| EgressVrfId | Egress VRF ID |

Expand Down
29 changes: 15 additions & 14 deletions goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *state) decodeNetFlow(msg interface{}) error {
for _, fmsg := range flowMessageSet {
fmsg.TimeRecvd = uint64(time.Now().UTC().Unix())
fmsg.RouterAddr = routerAddr
timeDiff := fmsg.TimeRecvd - fmsg.TimeFlow
timeDiff := fmsg.TimeRecvd - fmsg.TimeFlowEnd
NetFlowTimeStatsSum.With(
prometheus.Labels{
"router": key,
Expand Down Expand Up @@ -361,7 +361,7 @@ func (s *state) decodeNetFlow(msg interface{}) error {
for _, fmsg := range flowMessageSet {
fmsg.TimeRecvd = uint64(time.Now().UTC().Unix())
fmsg.RouterAddr = routerAddr
timeDiff := fmsg.TimeRecvd - fmsg.TimeFlow
timeDiff := fmsg.TimeRecvd - fmsg.TimeFlowEnd
NetFlowTimeStatsSum.With(
prometheus.Labels{
"router": key,
Expand All @@ -384,17 +384,17 @@ func (s *state) decodeNetFlow(msg interface{}) error {
}

func FlowMessageToString(fmsg *flowmessage.FlowMessage) string {
s := fmt.Sprintf("Type:%v TimeRecvd:%v SamplingRate:%v SequenceNum:%v TimeFlow:%v " +
"SrcIP:%v DstIP:%v IPversion:%v Bytes:%v Packets:%v RouterAddr:%v NextHop:%v NextHopAS:%v " +
"SrcAS:%v DstAS:%v SrcNet:%v DstNet:%v SrcIf:%v DstIf:%v Proto:%v " +
"SrcPort:%v DstPort:%v IPTos:%v ForwardingStatus:%v IPTTL:%v TCPFlags:%v " +
"SrcMac:%v DstMac:%v VlanId:%v Etype:%v IcmpType:%v IcmpCode:%v " +
"SrcVlan:%v DstVlan:%v IPv6FlowLabel:%v",
fmsg.Type, fmsg.TimeRecvd, fmsg.SamplingRate, fmsg.SequenceNum, fmsg.TimeFlow,
net.IP(fmsg.SrcIP), net.IP(fmsg.DstIP), fmsg.IPversion, fmsg.Bytes, fmsg.Packets, net.IP(fmsg.RouterAddr), net.IP(fmsg.NextHop), fmsg.NextHopAS,
fmsg.SrcAS, fmsg.DstAS, fmsg.SrcNet, fmsg.DstNet, fmsg.SrcIf, fmsg.DstIf, fmsg.Proto,
fmsg.SrcPort, fmsg.DstPort, fmsg.IPTos, fmsg.ForwardingStatus, fmsg.IPTTL, fmsg.TCPFlags,
fmsg.SrcMac, fmsg.DstMac, fmsg.VlanId, fmsg.Etype, fmsg.IcmpType, fmsg.IcmpCode,
s := fmt.Sprintf("Type:%v TimeRecvd:%v SamplingRate:%v SequenceNum:%v TimeFlowStart:%v "+
"TimeFlowEnd:%v SrcIP:%v DstIP:%v IPversion:%v Bytes:%v Packets:%v RouterAddr:%v "+
"NextHop:%v NextHopAS:%v SrcAS:%v DstAS:%v SrcNet:%v DstNet:%v SrcIf:%v DstIf:%v "+
"Proto:%v SrcPort:%v DstPort:%v IPTos:%v ForwardingStatus:%v IPTTL:%v TCPFlags:%v "+
"SrcMac:%v DstMac:%v VlanId:%v Etype:%v IcmpType:%v IcmpCode:%v "+
"SrcVlan:%v DstVlan:%v IPv6FlowLabel:%v",
fmsg.Type, fmsg.TimeRecvd, fmsg.SamplingRate, fmsg.SequenceNum, fmsg.TimeFlowStart, fmsg.TimeFlowEnd,
net.IP(fmsg.SrcIP), net.IP(fmsg.DstIP), fmsg.IPversion, fmsg.Bytes, fmsg.Packets, net.IP(fmsg.RouterAddr), net.IP(fmsg.NextHop), fmsg.NextHopAS,
fmsg.SrcAS, fmsg.DstAS, fmsg.SrcNet, fmsg.DstNet, fmsg.SrcIf, fmsg.DstIf, fmsg.Proto,
fmsg.SrcPort, fmsg.DstPort, fmsg.IPTos, fmsg.ForwardingStatus, fmsg.IPTTL, fmsg.TCPFlags,
fmsg.SrcMac, fmsg.DstMac, fmsg.VlanId, fmsg.Etype, fmsg.IcmpType, fmsg.IcmpCode,
fmsg.SrcVlan, fmsg.DstVlan, fmsg.IPv6FlowLabel)
return s
}
Expand Down Expand Up @@ -603,7 +603,8 @@ func (s *state) decodeSflow(msg interface{}) error {
ts := uint64(time.Now().UTC().Unix())
for _, fmsg := range flowMessageSet {
fmsg.TimeRecvd = ts
fmsg.TimeFlow = ts
fmsg.TimeFlow = ts // deprecate this
fmsg.TimeFlowEnd = ts
}

s.produceFlow(flowMessageSet)
Expand Down
112 changes: 65 additions & 47 deletions pb/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pb/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ message FlowMessage {
uint64 SamplingRate = 3;
uint32 SequenceNum = 4;

// Found inside packet
// Found inside packet (To be deprecated in favor of TimeFlowEnd)
uint64 TimeFlow = 5;

// Source/destination addresses
Expand Down Expand Up @@ -93,4 +93,8 @@ message FlowMessage {
// VRF
uint32 IngressVrfId = 38;
uint32 EgressVrfId = 39;

// time information, found inside packets
uint64 TimeFlowStart = 40;
uint64 TimeFlowEnd = 41;
}
34 changes: 29 additions & 5 deletions producer/producer_nf.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,26 +276,50 @@ func ConvertNetFlowDataSet(version uint16, baseTime uint32, uptime uint32, recor
if version == 9 {
// NetFlow v9 time works with a differential based on router's uptime
switch df.Type {
case netflow.NFV9_FIELD_FIRST_SWITCHED:
var timeFirstSwitched uint32
DecodeUNumber(v, &timeFirstSwitched)
timeDiff := (uptime - timeFirstSwitched) / 1000
flowMessage.TimeFlowStart = uint64(baseTime - timeDiff)
case netflow.NFV9_FIELD_LAST_SWITCHED:
var timeLastSwitched uint32
DecodeUNumber(v, &timeLastSwitched)
timeDiff := (uptime - timeLastSwitched) / 1000
flowMessage.TimeFlow = uint64(baseTime - timeDiff)
timeFlowEnd := uint64(baseTime - timeDiff)
flowMessage.TimeFlow = timeFlowEnd // deprecate this
flowMessage.TimeFlowEnd = timeFlowEnd
}
} else if version == 10 {
switch df.Type {
case netflow.IPFIX_FIELD_flowStartSeconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlowStart = time
case netflow.IPFIX_FIELD_flowStartMilliseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlowStart = time / 1000
case netflow.IPFIX_FIELD_flowStartMicroseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlowStart = time / 1000000
case netflow.IPFIX_FIELD_flowStartNanoseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlowStart = time / 1000000000

case netflow.IPFIX_FIELD_flowEndSeconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlow = time
flowMessage.TimeFlow = time // deprecate this
flowMessage.TimeFlowEnd = time
case netflow.IPFIX_FIELD_flowEndMilliseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlow = time / 1000
flowMessage.TimeFlow = time / 1000 // deprecate this
flowMessage.TimeFlowEnd = time / 1000
case netflow.IPFIX_FIELD_flowEndMicroseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlow = time / 1000000
flowMessage.TimeFlow = time / 1000000 // deprecate this
flowMessage.TimeFlowEnd = time / 1000000
case netflow.IPFIX_FIELD_flowEndNanoseconds:
DecodeUNumber(v, &time)
flowMessage.TimeFlow = time / 1000000000
flowMessage.TimeFlow = time / 1000000000 // deprecate this
flowMessage.TimeFlowEnd = time / 1000000000
}
}
}
Expand Down

0 comments on commit def764f

Please sign in to comment.