Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 110 additions & 40 deletions go/cli/mcap/cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,35 @@ import (
)

type filterFlags struct {
output string
includeTopics []string
excludeTopics []string
startSec uint64
endSec uint64
startNano uint64
endNano uint64
start string
end string
includeMetadata bool
includeAttachments bool
outputCompression string
chunkSize int64
unchunked bool
output string
includeTopics []string
excludeTopics []string
includeLastPerChannelTopics []string
startSec uint64
endSec uint64
startNano uint64
endNano uint64
start string
end string
includeMetadata bool
includeAttachments bool
outputCompression string
chunkSize int64
unchunked bool
}

type filterOpts struct {
output string
includeTopics []regexp.Regexp
excludeTopics []regexp.Regexp
start uint64
end uint64
includeMetadata bool
includeAttachments bool
compressionFormat mcap.CompressionFormat
chunkSize int64
unchunked bool
output string
includeTopics []regexp.Regexp
excludeTopics []regexp.Regexp
includeLastPerChannelTopics []regexp.Regexp
start uint64
end uint64
includeMetadata bool
includeAttachments bool
compressionFormat mcap.CompressionFormat
chunkSize int64
unchunked bool
}

// parseDateOrNanos parses a string containing either an RFC3339-formatted date with timezone
Expand Down Expand Up @@ -115,15 +117,22 @@ func buildFilterOptions(flags *filterFlags) (*filterOpts, error) {

includeTopics, err := compileMatchers(flags.includeTopics)
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid included topic regex: %w", err)
}
opts.includeTopics = includeTopics

excludeTopics, err := compileMatchers(flags.excludeTopics)
if err != nil {
return nil, err
return nil, fmt.Errorf("invalid excluded topic regex: %w", err)
}
opts.excludeTopics = excludeTopics

includeLastPerChannelTopics, err := compileMatchers(flags.includeLastPerChannelTopics)
if err != nil {
return nil, fmt.Errorf("invalid last-per-channel topic regex: %w", err)
}
opts.includeLastPerChannelTopics = includeLastPerChannelTopics

opts.chunkSize = flags.chunkSize
opts.unchunked = flags.unchunked
return opts, nil
Expand Down Expand Up @@ -192,7 +201,7 @@ func compileMatchers(regexStrings []string) ([]regexp.Regexp, error) {
}
regex, err := regexp.Compile(regexString)
if err != nil {
return nil, err
return nil, fmt.Errorf("%s is not a valid regex: %w", regexString, err)
}
matchers[i] = *regex
}
Expand Down Expand Up @@ -266,6 +275,8 @@ func filter(
buf := make([]byte, 1024)
schemas := make(map[uint16]markableSchema)
channels := make(map[uint16]markableChannel)
mostRecentMessageBeforeRangeStart := make(map[uint16]*mcap.Message)
messagesBeforeRangeStartWritten := false

for {
token, data, err := lexer.Next(buf)
Expand Down Expand Up @@ -298,6 +309,12 @@ func filter(
if err != nil {
return err
}
for i := range opts.includeLastPerChannelTopics {
matcher := opts.includeLastPerChannelTopics[i]
Comment on lines +312 to +313
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would this work? (same could be applied to other matchers below)

Suggested change
for i := range opts.includeLastPerChannelTopics {
matcher := opts.includeLastPerChannelTopics[i]
for _, matcher := range opts.includeLastPerChannelTopics {

if matcher.MatchString(channel.Topic) {
mostRecentMessageBeforeRangeStart[channel.ID] = nil
}
}
// if any topics match an includeTopic, add it.
for i := range opts.includeTopics {
matcher := opts.includeTopics[i]
Expand Down Expand Up @@ -327,12 +344,58 @@ func filter(
if err != nil {
return err
}
mostRecent, ok := mostRecentMessageBeforeRangeStart[message.ChannelID]
Copy link
Member

@jtbandes jtbandes Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we use messagesBeforeRangeStartWritten to short-circuit this logic if the messages have already been written, since they will never be written again?

if message.LogTime < opts.start {
if ok {
if mostRecent == nil || mostRecent.LogTime <= message.LogTime {
mostRecentMessageBeforeRangeStart[message.ChannelID] = message
// Copy the data buffer explicitly, to avoid keeping a reference to the greater
// `buf` array that underlies `message.Data`.
mostRecentMessageBeforeRangeStart[message.ChannelID].Data = append([]byte{}, message.Data...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caveat that I am not super familiar with this filter code and it's been a long time since I looked at it...but...are we assuming here that the file is ordered by log time? I'm not sure if repeated calls to lexer.Next(buf) is giving us in-order reading using the index, but I suspect not. If I'm right, then it means this feature might not do what it claims to do if the file is disordered. I'm not sure what it would take to fix that, but if we don't think it's worth it to fully support disordered files, should we at least try to detect and warn/error when it happens, and maybe also document the limitation in the cli help?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on another note, the pre-emptive copying strikes me as possibly pessimizing, since we will create new copies of almost every preceding message until we reach the desired range. I see that @DaleKoenig already mentioned this too :)

I'm not sure I understand exactly what the copy is fixing -- would storing a reference to the underlying buf mostly be a concern if the number of channels is high (or total message size across channels is large?) Are we specifically concerned that we might end up holding multiple preceding whole chunks in memory at once? At minimum, I think this comment could be expanded to clarify what we are avoiding (assuming you did some performance testing and found that this solution is best)

Another quick note that we could probably improve it a bit by re-using the buffer if we are replacing an item in the map. Since it's already been copied, if the buffer is large enough we should be able to copy into it without allocating again.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my implementation, the reasoning was that the filter reads through the cli sequentially and old chunks are not kept in memory, so it is necessary to keep a copy of anything we might want to hold onto to only write at a later time. So it did not seem feasible to keep a reference to the old data without copying it. Implementing a method of keeping the old chunks around when they contain messages we want to write later seemed too invasive/difficult.

}
}
continue
}
if message.LogTime >= opts.end {
continue
}
if !messagesBeforeRangeStartWritten {
messagesBeforeRangeStartWritten = true
// We have reached the start of the record, so add any stored messages here
for _, mostRecent := range mostRecentMessageBeforeRangeStart {
if mostRecent == nil {
continue
}
// We might still need to write the channel
channel, ok := channels[mostRecent.ChannelID]
if !ok {
continue
}
if !channel.written {
Comment on lines +369 to +374
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there a way to avoid the duplication here with the channel-writing code that already exists immediately below this block? At first glance, it looks like we should be able to take advantage of mostRecent.ChannelID being the same as message.ChannelID and avoid doing this twice.

if channel.SchemaID != 0 {
schema, ok := schemas[channel.SchemaID]
if !ok {
return fmt.Errorf("encountered channel with topic %s with unknown schema ID %d",
channel.Topic, channel.SchemaID)
}
if !schema.written {
if err := mcapWriter.WriteSchema(schema.Schema); err != nil {
return err
}
schemas[channel.SchemaID] = markableSchema{schema.Schema, true}
}
}
if err := mcapWriter.WriteChannel(channel.Channel); err != nil {
return err
}
channels[mostRecent.ChannelID] = markableChannel{channel.Channel, true}
}
if err := mcapWriter.WriteMessage(mostRecent); err != nil {
return err
}
numMessages++
}
}
channel, ok := channels[message.ChannelID]
if !ok {
continue
Expand Down Expand Up @@ -406,6 +469,12 @@ usage:
[]string{},
"messages with topic names matching this regex will be excluded, can be supplied multiple times",
)
includeLastPerChannelTopics := filterCmd.PersistentFlags().StringArray(
"include-last-per-channel-topic-regex",
[]string{},
"For included topics matching this regex, the most recent message prior to the start time"+
" will still be included.",
)
start := filterCmd.PersistentFlags().StringP(
"start",
"S",
Expand Down Expand Up @@ -463,19 +532,20 @@ usage:
)
filterCmd.Run = func(_ *cobra.Command, args []string) {
filterOptions, err := buildFilterOptions(&filterFlags{
output: *output,
includeTopics: *includeTopics,
excludeTopics: *excludeTopics,
start: *start,
startSec: *startSec,
startNano: *startNano,
end: *end,
endSec: *endSec,
endNano: *endNano,
chunkSize: *chunkSize,
includeMetadata: *includeMetadata,
includeAttachments: *includeAttachments,
outputCompression: *outputCompression,
output: *output,
includeTopics: *includeTopics,
excludeTopics: *excludeTopics,
includeLastPerChannelTopics: *includeLastPerChannelTopics,
start: *start,
startSec: *startSec,
startNano: *startNano,
end: *end,
endSec: *endSec,
endNano: *endNano,
chunkSize: *chunkSize,
includeMetadata: *includeMetadata,
includeAttachments: *includeAttachments,
outputCompression: *outputCompression,
})
if err != nil {
die("configuration error: %s", err)
Expand Down
92 changes: 92 additions & 0 deletions go/cli/mcap/cmd/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,95 @@ func TestBuildFilterOptions(t *testing.T) {
})
}
}

func TestLastPerChannelBehavior(t *testing.T) {
cases := []struct {
name string
flags *filterFlags
expectedMessageCount map[uint16]int
}{
{name: "noop",
flags: &filterFlags{
startNano: 50,
},
expectedMessageCount: map[uint16]int{
1: 50,
2: 50,
3: 50,
},
},
{name: "last per channel on all topics",
flags: &filterFlags{
startNano: 50,
includeLastPerChannelTopics: []string{".*"},
},
expectedMessageCount: map[uint16]int{
1: 51,
2: 51,
3: 51,
},
},
{name: "last per channel on camera topics only",
flags: &filterFlags{
startNano: 50,
includeLastPerChannelTopics: []string{"camera_.*"},
},
expectedMessageCount: map[uint16]int{
1: 51,
2: 51,
3: 50,
},
},
{name: "does not override include topics",
flags: &filterFlags{
startNano: 50,
includeLastPerChannelTopics: []string{"camera_.*"},
includeTopics: []string{"camera_a"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth adding a test case for excludeTopics as well?

},
expectedMessageCount: map[uint16]int{
1: 51,
2: 0,
3: 0,
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
opts, err := buildFilterOptions(c.flags)
require.NoError(t, err)
writeBuf := bytes.Buffer{}
readBuf := bytes.Buffer{}

writeFilterTestInput(t, &readBuf)
require.NoError(t, filter(&readBuf, &writeBuf, opts))
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{})
require.NoError(t, err)
defer lexer.Close()
messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
for {
token, record, err := lexer.Next(nil)
if err != nil {
require.ErrorIs(t, err, io.EOF)
break
}
if token == mcap.TokenMessage {
message, err := mcap.ParseMessage(record)
require.NoError(t, err)
messageCounter[message.ChannelID]++
}
}
for channelID, count := range messageCounter {
require.Equal(
t,
c.expectedMessageCount[channelID],
count,
"message count incorrect on channel %d", channelID,
)
}
})
}
}
Loading