-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
datalake
: relax translation_stm::max_collectible_offset()
value (and add compaction_test.py
)
#24610
datalake
: relax translation_stm::max_collectible_offset()
value (and add compaction_test.py
)
#24610
Conversation
5db3f18
to
6c113d7
Compare
@@ -24,22 +24,24 @@ | |||
|
|||
class DatalakeVerifier(): | |||
""" | |||
Verifier that does the verification of the data in the redpanda Iceberg table. | |||
The verifier consumes offsets from specified topic and verifies it the data | |||
Verifier that does the verification of the data in the redpanda Iceberg table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing whitespace removal
Retry command for Build#59935please wait until all jobs are finished before running the slash command
|
CI test resultstest results on build#59935
test results on build#60023
test results on build#60079
test results on build#60368
test results on build#60544
test results on build#60563
test results on build#60597
|
Lot of Not sure if this is another The only related change I can see in EDIT: Probably just because of the |
6c113d7
to
0e1a24c
Compare
Force push to:
|
0e1a24c
to
2fe6c55
Compare
Retry command for Build#60016please wait until all jobs are finished before running the slash command
|
2fe6c55
to
b01095c
Compare
Force push to:
|
b01095c
to
4bd7693
Compare
Force push to:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the delay here.
@@ -24,22 +24,24 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compaction is expected to block until translation happens. What additional coverage does verification with a compacted log with a fully translated iceberg table provide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compaction is expected to block until translation happens
What additional coverage does verification with a compacted log with a fully translated iceberg table provide?
That the aforementioned expectation is true (i.e Iceberg table is fully translated, log is fully compacted).
Do you think there is other verification that should be added here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That the aforementioned expectation is true (i.e Iceberg table is fully translated, log is fully compacted).
Correct me if I'm wrong but the verifier in the current form can also succeed if the topic got translated from a compacted log (hypothetically if the code violated the max_collectible_offset invariant), no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I suppose that is true.
In its current form, we handle the cases where the iceberg table has as much or more information than the log, as we assume that it was translated before compaction of the log took place/didn't take place, but this doesn't verify that the case you described didn't occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, so I was wondering if we should instead test enforcement of max_collectible_offset which is a more critical invariant and leave the verifier as it is today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that perhaps we haven't covered the case where the max_collectible_offset invariant is violated, but I do think that the existing changes to the verifier are helpful to at least ensure the case in which it ISN'T violated is correct
I'll try to think of ways to better cover the critical invariant in a follow up PR, if that works for you.
@@ -118,3 +120,126 @@ def test_translation_no_gaps(self, cloud_storage_type): | |||
include_query_engines=[QueryEngineType.TRINO | |||
]) as dl: | |||
self.do_test_no_gaps(dl) | |||
|
|||
|
|||
class CompactionTest(RedpandaTest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the motivations for removing offset translation was to use iceberg enabled topics with read replicas/topic recovery, wondering if its worth adding an e2e test for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to see a test added in this PR, or as a follow up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up PR is also fine. (just want to ensure nothing else is broken, other than offset translation before we declare it as working).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW read replicas or topic recovery don't appear to be addressed by this PR.
Read replica translators won't be able to perform offset translation on anything, and topic recovery will likely require changes to what revisions and topic overrides get passed to the coordinators
I don't believe either are in scope of this work (which is really just to unblock compaction IIUC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about topic recovery needing more work. I think @~bashtanov is doing some work here in the migrations land.
Read replica translators won't be able to perform offset translation on anything,
whats the use case here with read replicas though? IIRC its about being able to (sql) query an iceberg enabled RRR topic, thats it? RRR topic itself cannot do any translation locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. But as is, RRR topics won't be able to call the to_log_offset()
that's done in the translation path. I think it's reasonable to have the translator skip the offset-translation and pass a nullopt to the STM as the log offset.
4bd7693
to
d1ef28c
Compare
Force push to:
|
d1ef28c
to
7d564a7
Compare
Force push to:
|
@@ -24,22 +24,24 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, so I was wondering if we should instead test enforcement of max_collectible_offset which is a more critical invariant and leave the verifier as it is today.
src/v/datalake/translation/utils.h
Outdated
model::offset | ||
get_translated_log_offset(ss::shared_ptr<storage::log> log, kafka::offset o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it may be confusing to refer to "translation" here in multiple contexts (i'm actually not sure if you mean it as offset-translated or datalake-translated), it may be more self-descriptive if this were named highest_log_offset_below(kafka::offset)
, where the translator would pass in kafka::next_offset(max_translated_offset)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or consider the name highest_log_offset_below_next()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being able to call this function in translation_stm::max_collectible_offset()
like
return get_translated_log_offset(_raft->log(), _highest_translated_offset);
feels better than having to manipulate the passed offset outside the function before calling it each time.
If you feel strongly about this I can change the name- it is unfortunate that "translation" can mean two different things in this context but I hope the code comments are descriptive enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel strongly about the name -- we already have some many named offsets here and there, this one doesn't seem so pivotal that it needs a special name. My vote is for highest_log_offset_below_next()
@@ -118,3 +120,126 @@ def test_translation_no_gaps(self, cloud_storage_type): | |||
include_query_engines=[QueryEngineType.TRINO | |||
]) as dl: | |||
self.do_test_no_gaps(dl) | |||
|
|||
|
|||
class CompactionTest(RedpandaTest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW read replicas or topic recovery don't appear to be addressed by this PR.
Read replica translators won't be able to perform offset translation on anything, and topic recovery will likely require changes to what revisions and topic overrides get passed to the coordinators
I don't believe either are in scope of this work (which is really just to unblock compaction IIUC)
bool check_translated_log_offset( | ||
ss::shared_ptr<storage::log> log, | ||
kafka::offset translated_offset, | ||
model::offset expected_offset) { | ||
auto translated_log_offset | ||
= datalake::translation::get_translated_log_offset( | ||
log, translated_offset); | ||
return expected_offset == translated_log_offset; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this feels like quite a lot of testing for something that is ultimately just calling into the offset translator, and so this feels like we're just testing the offset translator. Wondering if we can instead write tests that check the max collectible offset? Plus, if you go the highest_log_offset_below()
route, this also all changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ultimately this is more of an offset translation test, but I wanted to have a set of very illustrative unit tests that only tested this mechanism, and not the state machine as a whole.
7d564a7
to
af320da
Compare
Force push to:
|
datalake
: remove offset translation from translation_stm
(and add compaction_test.py
)datalake
: relax translation_stm::max_collectible_offset()
value (and add compaction_test.py
)
Mind removing this from this PR and following up with a test in a separate PR? IMO this one here should be focused on unblocking compaction |
af320da
to
5dc456c
Compare
Force push to:
|
And most importantly, add the function `highest_log_offset_below_next()`. This function will be used to compute the appropriate highest translated log offset for a given translated kafka offset while taking into account translator batches. This will allow us to be less pessimistic about the `max_collectible_offset` returned by the `translation_stm` in the future.
5dc456c
to
9db2f43
Compare
Force push to:
|
This will return a less restrictive value for `translation_stm::max_collectible_offset()`.
By handling gaps in offsets and recording seen keys, we can validate the correctness of a compacted log that has been translated (fully) into an iceberg table.
Adds a new `test_compaction` test, which uses the `KgoVerifierSeqConsumer` to validate a fully compacted log, along with the `datalake_verifier` service to validate the Iceberg table. Also moves the contents of `compaction_gaps_test.py` into `compaction_test.py`.
9db2f43
to
3862ad0
Compare
Force push to:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm , cover letter needs to be updated to highest_log_offset_below_next
?
// Returns the equivalent log offset which can be considered translated by the | ||
// datalake subsystem, while taking into account translator batch types, for a | ||
// given kafka offset. | ||
// | ||
// Note that the provided kafka::offset o MUST be a valid offset, i.e one that | ||
// has been produced to the log. This function will always return a value, and | ||
// its correctness depends on the validity of the input offset. | ||
// | ||
// For example, in the following situation: | ||
// Kaf offsets: [O] . . . [NKO] | ||
// Log offsets: [K] [C] [C] [C/TLO] [NKO] | ||
// where O is the input offset, K is the last kafka record, C is a translator | ||
// (Config) batch, TLO is the translated log offset, and NKO is the next | ||
// expected kafka record. We should expect TLO to be equal to the offset of the | ||
// last configuration batch before the next kafka record. | ||
model::offset highest_log_offset_below_next( | ||
ss::shared_ptr<storage::log> log, kafka::offset o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
Previously, the
datalake::translation::translation_stm
would return its max collectible as the following:redpanda/src/v/datalake/translation/state_machine.cc
Lines 112 to 122 in 925707c
This offset translation leads to an overly restrictive condition for the max collectible offset, due to the fact that it is translation batch unaware.
Here, the utility function
highest_log_offset_below_next()
is added, which returns the "equivalent" translated log offset for a given kafka offset, taking into account translation batches (which don't need to be translated, and thus shouldn't restrict the max collectible offset).translation_stm::max_collectible_offset()
now uses this function to relax its returned offset.Additionally, a new test for compaction with an Iceberg enabled topic is added to
datalake/compaction_test.py
, with some enhancements to thedatalake_verifier
service to make it compaction aware.Backports Required
Release Notes
Improvements