-
Notifications
You must be signed in to change notification settings - Fork 84
Implement compression support #150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Compression is negotiated with the client connection then it is propogated to the upstream coordinator (or Astra) connections and negotiated. If a frame is not modified then it doesn't need to be recompressed. Uses partial message encoding support to rewrite the consistency when it's not supported. Remove frame patching. This is necessary because the frames need to be recompressed when they're modified.
I'll let others review because I wrote it. Open to all feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, maybe in the future we can port some of this "partial codec" functionality to the protocol library so we can use it in zdm too.
return version, errors.New("invalid startup key/value pairs") | ||
} | ||
|
||
response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup(startupKeysAndValues...))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this block (the SendAndReceive() block + it's error handling) should be moved after we (optionally) override the codecs to use compression-aware versions. If we send anything before configuring the codecs we see something like the following:
2025-04-14T11:22:02.034-0500 ERROR proxycore/connpool.go:74 unable to connect pool {"endpoint": "127.0.0.1:9042", "error": "cannot decode body: cannot decompress body: no compressor available"}
github.com/datastax/cql-proxy/proxycore.connectPool
/work/git/cql-proxy/proxycore/connpool.go:74
github.com/datastax/cql-proxy/proxycore.(*Session).OnEvent.func1.1
/work/git/cql-proxy/proxycore/session.go:121
Underlying problem here is an inability to decode response from the server because the set of codecs in play does not yet support compression. Simply moving this line after the for loop below appears to resolve the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STARTUP is always uncompressed. I think this current logic is incorrect when there's protocol negotiation because it'll set the codecs then the subsequent STARTUP will be compressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is being thrown when we try to decode the body of the response, and according to the spec even a response to STARTUP can be compressed (assuming everybody agrees on compression protocols).
At least that's what I assume is happening here. That matches up with what I'm seeing in wireshark; the response from the server looks to be hashed (encrypted, compressed, etc.) data rather than straight native protocol bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both things are problems. The custom compressed codecs need to be set in between the STARTUP request and its response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood.
I'm wondering if go-cassandra-native-protocol is smart enough not to compress STARTUP messages even if the provided codec supports compression. That would explain why I see everything work perfectly in my testing with this change in place. I'm still trying to verify that now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following works because go-cassandra-native-protocol
pays attention to the compressed frame flag when encoding the frame. That means that the few handshake frames won't be compressed, but I don't think that's a big deal.
diff --git a/proxycore/clientconn.go b/proxycore/clientconn.go
index 844003d..3c16619 100644
--- a/proxycore/clientconn.go
+++ b/proxycore/clientconn.go
@@ -87,28 +87,28 @@ func ConnectClient(ctx context.Context, endpoint Endpoint, config ClientConnConf
}
func (c *ClientConn) Handshake(ctx context.Context, version primitive.ProtocolVersion, auth Authenticator, startupKeysAndValues ...string) (primitive.ProtocolVersion, error) {
- for {
- if len(startupKeysAndValues)%2 != 0 {
- return version, errors.New("invalid startup key/value pairs")
+ if len(startupKeysAndValues)%2 != 0 {
+ return version, errors.New("invalid startup key/value pairs")
+ }
+
+ for i := 0; i < len(startupKeysAndValues); i += 2 {
+ key := startupKeysAndValues[i]
+ value := startupKeysAndValues[i+1]
+ if strings.EqualFold("COMPRESSION", key) {
+ if codec, ok := codecs.CustomRawCodecsWithCompression[strings.ToLower(value)]; ok {
+ c.codec = codec
+ } else {
+ return version, fmt.Errorf("invalid compression type: %s", value)
+ }
}
+ }
+ for {
response, err := c.SendAndReceive(ctx, frame.NewFrame(version, -1, message.NewStartup(startupKeysAndValues...)))
if err != nil {
return version, err
}
- for i := 0; i < len(startupKeysAndValues); i += 2 {
- key := startupKeysAndValues[i]
- value := startupKeysAndValues[i+1]
- if strings.EqualFold("COMPRESSION", key) {
- if codec, ok := codecs.CustomRawCodecsWithCompression[strings.ToLower(value)]; ok {
- c.codec = codec
- } else {
- return version, fmt.Errorf("invalid compression type: %s", value)
- }
- }
- }
-
switch msg := response.Body.Message.(type) {
case *message.Ready:
if c.eventHandler != nil {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think that's exactly right. We're explicitly building frames for STARTUP messages that (very correctly) don't set the compression flags so those messages won't be compressed... which is precisely what we want. But we do want the compression-aware codecs to be in play since the responses might be compressed.
As mentioned to @mpenick in separate conversation... hey, at least we know why it works now. :)
FTR I completely agree with @joao-r-reis; (eventually) getting the partial codec stuff into go-cassandra-native-protocol sounds like a solid idea. |
I can't add a review to my own PR but I do think we need to address the issue discussed in my comment. @mpenick let me know if you're good with me making the change referenced there, otherwise we can discuss further if you think something else needs to happen. |
Standalone version of #147 which was (erroneously) closed by yours truly.
Credit for all code in this PR goes to @mpenick