-
Notifications
You must be signed in to change notification settings - Fork 158
mcap filter: add last-per-channel filtering semantics
#1456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
mcap filter: add last-per-channel filtering semantics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this over! I think not updating the timestamp of the message will have some downsides - mainly that there may be a large gap between the first publishes and the data of interest when opening the mcap in foxglove. However, I think modifying the timestamp is not ideal either, so leaving it unchanged is reasonable.
7eaedc4 to
92f91ef
Compare
|
👍 to the semantics and having this be part of the CLI. Do we need the My thinking is that the typical desire would be to have any of the topics you wanted to filter on present in the truncated file without having to remember special ones to state separately in the flag. Code review I leave to someone more familiar with the structure. |
|
See @DaleKoenig 's response on the last PR when I asked this question:
|
We no longer "pull" the topic forward and keep the original timestamp. Does this comment still apply in that context? |
I think that part no longer applies. However I would still worry about the performance impact of creating an extra in-memory copy of every message previous to the specified start time, rather than just the (usually infrequent) topics that were transient-local originally |
| for i := range opts.includeLastPerChannelTopics { | ||
| matcher := opts.includeLastPerChannelTopics[i] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would this work? (same could be applied to other matchers below)
| for i := range opts.includeLastPerChannelTopics { | |
| matcher := opts.includeLastPerChannelTopics[i] | |
| for _, matcher := range opts.includeLastPerChannelTopics { |
| // We might still need to write the channel | ||
| channel, ok := channels[mostRecent.ChannelID] | ||
| if !ok { | ||
| continue | ||
| } | ||
| if !channel.written { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there a way to avoid the duplication here with the channel-writing code that already exists immediately below this block? At first glance, it looks like we should be able to take advantage of mostRecent.ChannelID being the same as message.ChannelID and avoid doing this twice.
| if err != nil { | ||
| return err | ||
| } | ||
| mostRecent, ok := mostRecentMessageBeforeRangeStart[message.ChannelID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we use messagesBeforeRangeStartWritten to short-circuit this logic if the messages have already been written, since they will never be written again?
| mostRecentMessageBeforeRangeStart[message.ChannelID] = message | ||
| // Copy the data buffer explicitly, to avoid keeping a reference to the greater | ||
| // `buf` array that underlies `message.Data`. | ||
| mostRecentMessageBeforeRangeStart[message.ChannelID].Data = append([]byte{}, message.Data...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
caveat that I am not super familiar with this filter code and it's been a long time since I looked at it...but...are we assuming here that the file is ordered by log time? I'm not sure if repeated calls to lexer.Next(buf) is giving us in-order reading using the index, but I suspect not. If I'm right, then it means this feature might not do what it claims to do if the file is disordered. I'm not sure what it would take to fix that, but if we don't think it's worth it to fully support disordered files, should we at least try to detect and warn/error when it happens, and maybe also document the limitation in the cli help?
| flags: &filterFlags{ | ||
| startNano: 50, | ||
| includeLastPerChannelTopics: []string{"camera_.*"}, | ||
| includeTopics: []string{"camera_a"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be worth adding a test case for excludeTopics as well?
| mostRecentMessageBeforeRangeStart[message.ChannelID] = message | ||
| // Copy the data buffer explicitly, to avoid keeping a reference to the greater | ||
| // `buf` array that underlies `message.Data`. | ||
| mostRecentMessageBeforeRangeStart[message.ChannelID].Data = append([]byte{}, message.Data...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on another note, the pre-emptive copying strikes me as possibly pessimizing, since we will create new copies of almost every preceding message until we reach the desired range. I see that @DaleKoenig already mentioned this too :)
I'm not sure I understand exactly what the copy is fixing -- would storing a reference to the underlying buf mostly be a concern if the number of channels is high (or total message size across channels is large?) Are we specifically concerned that we might end up holding multiple preceding whole chunks in memory at once? At minimum, I think this comment could be expanded to clarify what we are avoiding (assuming you did some performance testing and found that this solution is best)
Another quick note that we could probably improve it a bit by re-using the buffer if we are replacing an item in the map. Since it's already been copied, if the buffer is large enough we should be able to copy into it without allocating again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my implementation, the reasoning was that the filter reads through the cli sequentially and old chunks are not kept in memory, so it is necessary to keep a copy of anything we might want to hold onto to only write at a later time. So it did not seem feasible to keep a reference to the old data without copying it. Implementing a method of keeping the old chunks around when they contain messages we want to write later seemed too invasive/difficult.
Co-authored-by: Jacob Bandes-Storch <[email protected]>
Changelog
mcap filter: added--include-last-per-channel-topic-regexoption. This includes the last message before the filter start on selected topics. This can be used to retain infrequently-logged topics in filtered MCAPs.Docs
None.
Description
Adapting @DaleKoenig's changes from #1426, this PR adds last-per-channel inclusion semantics for selected topics.
Differences from Dale's PR include:
-lreserved for a more fundamental feature down the line.mcap filterexcludes all messages before the requested filter start time. This can mean that infrequently logged topics like/tf_staticget dropped entirely, and the user may not want that.mcap filter --include-last-per-channel-topic-regex /tf_staticwill include the last logged/tf_staticmessage before the filter start time, as well as all messages matching the filter.Fixes: #1454