Skip to content

Commit

Permalink
Implement stats fetch using dump-flows to provide an option to get st…
Browse files Browse the repository at this point in the history
…atistics

for all matching flows in one shot.
Also, provide an option to report interface names instead of port numbers,
if requested.
  • Loading branch information
do-msingh committed Oct 17, 2024
1 parent c0f7d42 commit b64fc3e
Show file tree
Hide file tree
Showing 5 changed files with 449 additions and 2 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Neal Shrader <[email protected]>
Sangeetha Srikanth <[email protected]>
Franck Rupin <[email protected]>
Adam Simeth <[email protected]>
Manmeet Singh <[email protected]>
124 changes: 124 additions & 0 deletions ovs/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ type LearnedFlow struct {
Limit int
}

// A PerFlowStats is meant for fetching FlowStats per flow
type PerFlowStats struct {
Protocol Protocol
InPort int
Table int
Cookie uint64
IfName string
Stats FlowStats
}

var _ error = &FlowError{}

// A FlowError is an error encountered while marshaling or unmarshaling
Expand Down Expand Up @@ -439,6 +449,120 @@ func (f *Flow) UnmarshalText(b []byte) error {
return nil
}

// UnmarshalText unmarshals flows text into a PerFlowStats.
func (f *PerFlowStats) UnmarshalText(b []byte) error {
// Make a copy per documentation for encoding.TextUnmarshaler.
// A string is easier to work with in this case.
s := string(b)

// Must have one and only one actions=... field in the flow.
ss := strings.Split(s, keyActions+"=")
if len(ss) != 2 || ss[1] == "" {
return &FlowError{
Err: errNoActions,
}
}
if len(ss) < 2 {
return &FlowError{
Err: errNotEnoughElements,
}
}
matchers := strings.TrimSpace(ss[0])

// Handle matchers first.
ss = strings.Split(matchers, ",")
for i := 0; i < len(ss); i++ {
if !strings.Contains(ss[i], "=") {
// that means this will be a protocol field.
if ss[i] != "" {
f.Protocol = Protocol(ss[i])
}
continue
}

// All remaining comma-separated values should be in key=value format
kv := strings.Split(ss[i], "=")
if len(kv) != 2 {
continue
}
kv[1] = strings.TrimSpace(kv[1])

switch strings.TrimSpace(kv[0]) {
case cookie:
// Parse cookie into struct field.
cookie, err := strconv.ParseUint(kv[1], 0, 64)
if err != nil {
return &FlowError{
Str: kv[1],
Err: err,
}
}
f.Cookie = cookie
continue
case inPort:
// Parse in_port into struct field.
s := kv[1]
if strings.TrimSpace(s) == portLOCAL {
f.InPort = PortLOCAL
continue
}
// Try to read as integer port numbers first
port, err := strconv.ParseInt(s, 10, 0)
if err != nil {
f.IfName = s
} else {
f.InPort = int(port)
}
continue
case table:
// Parse table into struct field.
table, err := strconv.ParseInt(kv[1], 10, 0)
if err != nil {
return &FlowError{
Str: kv[1],
Err: err,
}
}
f.Table = int(table)
continue
case nPackets:
// Parse nPackets into struct field.
pktCount, err := strconv.ParseUint(kv[1], 0, 64)
if err != nil {
return &FlowError{
Str: kv[1],
Err: err,
}
}
f.Stats.PacketCount = uint64(pktCount)
continue
case nBytes:
// Parse nBytes into struct field.
byteCount, err := strconv.ParseUint(kv[1], 0, 64)
if err != nil {
return &FlowError{
Str: kv[1],
Err: err,
}
}
f.Stats.ByteCount = uint64(byteCount)
continue
case duration, hardAge, idleAge, priority, idleTimeout, keyActions:
// ignore those fields.
continue
}

// All arbitrary key/value pairs that
// don't match the case above.
_, err := parseMatch(kv[0], kv[1])
if err != nil {
return err
}
}

return nil
}

// MatchFlow converts Flow into MatchFlow.
func (f *Flow) MatchFlow() *MatchFlow {
return &MatchFlow{
Expand Down
70 changes: 70 additions & 0 deletions ovs/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,3 +1313,73 @@ func flowErrorEqual(a error, b error) bool {

return reflect.DeepEqual(fa, fb)
}

// perflowstatsEqual determines if two possible PerFlowStats are equal.
func perflowstatsEqual(a *PerFlowStats, b *PerFlowStats) bool {
// Special case: both nil is OK
if a == nil && b == nil {
return true
}

return reflect.DeepEqual(a, b)
}

func TestPerFlowStatsUnmarshalText(t *testing.T) {
var tests = []struct {
desc string
s string
f *PerFlowStats
err error
}{
{
desc: "empty Flow string, need actions fields",
err: &FlowError{
Err: errNoActions,
},
},
{
desc: "Flow string with interface name",
s: "priority=10,in_port=eth0,table=0,actions=drop",
f: &PerFlowStats{
InPort: 0,
IfName: "eth0",
Table: 0,
},
},
{
desc: "Flow string with flow stats",
s: "n_packets=13256, n_bytes=1287188, priority=10,in_port=eth0,table=0,actions=drop",
f: &PerFlowStats{
InPort: 0,
IfName: "eth0",
Table: 0,
Stats: FlowStats{
PacketCount: 13256,
ByteCount: 1287188,
},
},
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
f := new(PerFlowStats)
err := f.UnmarshalText([]byte(tt.s))

// Need temporary strings to avoid nil pointer dereference
// panics when checking Error method.
if want, got := tt.err, err; !flowErrorEqual(want, got) {
t.Fatalf("unexpected error:\n- want: %v\n- got: %v",
want, got)
}
if err != nil {
return
}

if want, got := tt.f, f; !perflowstatsEqual(want, got) {
t.Fatalf("unexpected Flow:\n- want: %#v\n- got: %#v",
want, got)
}
})
}
}
60 changes: 58 additions & 2 deletions ovs/openflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ const (
dirDelete = "delete"
)

// Interface names option
const (
interfaceNamesOption = "--names"
)

// Add pushes zero or more Flows on to the transaction, to be added by
// Open vSwitch. If any of the flows are invalid, Add becomes a no-op
// and the error will be surfaced when Commit is called.
Expand Down Expand Up @@ -267,7 +272,7 @@ func (o *OpenFlowService) DumpTables(bridge string) ([]*Table, error) {
return tables, err
}

// DumpFlowsWithFlowArgs retrieves statistics about all flows for the specified bridge,
// DumpFlowsWithFlowArgs retrieves details about all flows for the specified bridge,
// filtering on the specified flow(s), if provided.
// If a table has no active flows and has not been used for a lookup or matched
// by an incoming packet, it is filtered from the output.
Expand Down Expand Up @@ -306,13 +311,64 @@ func (o *OpenFlowService) DumpFlowsWithFlowArgs(bridge string, flow *MatchFlow)
return flows, err
}

// DumpFlows retrieves statistics about all flows for the specified bridge.
// DumpFlows retrieves details about all flows for the specified bridge.
// If a table has no active flows and has not been used for a lookup or matched
// by an incoming packet, it is filtered from the output.
func (o *OpenFlowService) DumpFlows(bridge string) ([]*Flow, error) {
return o.DumpFlowsWithFlowArgs(bridge, nil)
}

// DumpFlowStatsWithFlowArgs retrieves statistics about all flows for the specified bridge,
// filtering on the specified flow(s), if provided.
// If a table has no active flows and has not been used for a lookup or matched
// by an incoming packet, it is filtered from the output.
// We neeed to add a Matchflow to filter the dumpflow results. For example filter based on table, cookie.
// Report with interface names if useInterfaceNames is set. Port numbers otherwise
func (o *OpenFlowService) DumpFlowStatsWithFlowArgs(bridge string, flow *MatchFlow, useInterfaceNames bool) ([]*PerFlowStats, error) {
args := []string{"dump-flows", bridge}
if useInterfaceNames {
args = append(args, interfaceNamesOption)
}
args = append(args, o.c.ofctlFlags...)
if flow != nil {
fb, err := flow.MarshalText()
if err != nil {
return nil, err
}
args = append(args, string(fb))
}
out, err := o.exec(args...)
if err != nil {
return nil, err
}

var flows []*PerFlowStats
err = parseEachLine(out, dumpFlowsPrefix, func(b []byte) error {
// Do not attempt to parse ST_FLOW messages.
if bytes.Contains(b, dumpFlowsPrefix) {
return nil
}

f := new(PerFlowStats)
if err := f.UnmarshalText(b); err != nil {
return err
}

flows = append(flows, f)
return nil
})

return flows, err
}

// DumpFlowStats retrieves statistics about all matching flows for the specified bridge.
// If a table has no active flows and has not been used for a lookup or matched
// by an incoming packet, it is filtered from the output.
// Use nil MatchFlow if no filtering is desired.
func (o *OpenFlowService) DumpFlowStats(bridge string, flow *MatchFlow, useInterfaceNames bool) ([]*PerFlowStats, error) {
return o.DumpFlowStatsWithFlowArgs(bridge, flow, useInterfaceNames)
}

// DumpAggregate retrieves statistics about the specified flow attached to the
// specified bridge.
func (o *OpenFlowService) DumpAggregate(bridge string, flow *MatchFlow) (*FlowStats, error) {
Expand Down
Loading

0 comments on commit b64fc3e

Please sign in to comment.