Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string #16968

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Jan 28, 2025

Release notes

[rn:skip]

What does this PR do?

This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:

  • accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
  • respect the encoding of the input string. Use concat method instead of addAll, which avoid to convert RubyString to String and back to RubyString. When return the head StringBuilder it enforce the encoding with the input charset.

Why is it important/What is the impact to the user?

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

The test plan has two sides:

How to test the encoding is respected

Startup a REPL with Logstash and exercise the tokenizer:

$> bin/logstash -i irb
> buftok = FileWatch::BufferedTokenizer.new
> buftok.extract("\xA3".force_encoding("ISO8859-1")); buftok.flush.bytes

or use the following script

require 'socket'

hostname = 'localhost'
port = 1234

socket = TCPSocket.open(hostname, port)

text = "\xA3" # the £ symbol in ISO-8859-1 aka Latin-1
text.force_encoding("ISO-8859-1")
socket.puts(text)

socket.close

with the Logstash run as

bin/logstash -e "input { tcp { port => 1234 codec => line { charset => 'ISO8859-1' } } } output { stdout { codec => rubydebug } }"

In the output the £ as to be present and not £

Related issues

@andsel andsel self-assigned this Jan 28, 2025
@andsel andsel force-pushed the fix/buffered_tokenizer_clean_state_in_case_of_line_too_big_respecting_character_encoding branch from 69bd4f4 to a84656e Compare January 28, 2025 16:02
Copy link
Contributor

It looks like this PR modifies one or more .asciidoc files. These files are being migrated to Markdown, and any changes merged now will be lost. See the migration guide for details.

Copy link
Contributor

📃 DOCS PREVIEWhttps://logstash_bk_16968.docs-preview.app.elstc.co/diff

@andsel andsel force-pushed the fix/buffered_tokenizer_clean_state_in_case_of_line_too_big_respecting_character_encoding branch from b682c20 to b42ca05 Compare January 31, 2025 12:40
@andsel andsel changed the title Fix/buffered tokenizer clean state in case of line too big respecting character encoding Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string Jan 31, 2025
@andsel andsel added the bug label Jan 31, 2025
@andsel andsel marked this pull request as ready for review January 31, 2025 15:57
@andsel
Copy link
Contributor Author

andsel commented Feb 3, 2025

Uncovered use cases

This is a bugfix on the original code to solve the problem to respect sizeLimit when the first token is fragmented on different input buffers.
However, as the original implementation, this doesn't cover the case where the exceeding token is not the first of the data fragment, but is in the middle.
Consider a sizeLimit of 100. If the second second token is wide more than 100 chars, no errors are raised and the token is parsed.

Check with the pipeline:

input {
  tcp {
    port => 1234

    codec => json_lines {
      decode_size_limit_bytes => 100
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

and a loding script as:

require 'socket' 
require 'json'

hostname = 'localhost'
port = 1234

socket = TCPSocket.open(hostname, port)

data = {"a" => "a"*10}.to_json + "\n" + {"b" => "b" * 105}.to_json; socket.write(data)

socket.close

it produces an output like:

{
      "@version" => "1",
             "a" => "aaaaaaaaaa",
    "@timestamp" => 2025-02-03T10:40:06.093178Z
}
{
      "@version" => "1",
             "b" => "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
    "@timestamp" => 2025-02-03T10:40:06.094601Z
}

Ideal solution

To solve this problem, the BufferedTokenizer 's extract method should return an iterator and not array (or list). The iterator should apply the boundary check on each next invocation.

Copy link
Member

@donoghuc donoghuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this solves the bug!

While reviewing it I am a bit concerned about the shared state with encodingName. I see the logic you added in flush that determines if extract has been run or not based on that having been set. However I couldnt really come up with anything materially better.

For you consideration I added a small diff with what I started as a suggestion:

diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java
index 63aa3c230..ea015d128 100644
--- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java
+++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java
@@ -82,8 +82,11 @@ public class BufferedTokenizerExt extends RubyObject {
     @JRubyMethod
     @SuppressWarnings("rawtypes")
     public RubyArray extract(final ThreadContext context, IRubyObject data) {
-        RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context);
-        encodingName = encoding.getEncoding().getCharsetName();
+        // Cache the encodingName. This state has implications for the `flush` method.
+        if (encodingName == null) {
+            RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context);
+            encodingName = encoding.getEncoding().getCharsetName();
+        }
         final RubyArray entities = data.convertToString().split(delimiter, -1);
         if (!bufferFullErrorNotified) {
             input.clear();
@@ -137,8 +140,7 @@ public class BufferedTokenizerExt extends RubyObject {
             // in the accumulator, and clean the pending token part.
             headToken.append(input.shift(context)); // append buffer to first element and
             // create new RubyString with the data specified encoding
-            RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(headToken.toString().getBytes(Charset.forName(encodingName))));
-            encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
+            RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString());
             input.unshift(encodedHeadToken); // reinsert it into the array
             headToken = new StringBuilder();
         }
@@ -163,8 +165,7 @@ public class BufferedTokenizerExt extends RubyObject {
         // create new RubyString with the last data specified encoding, if exists
         RubyString encodedHeadToken;
         if (encodingName != null) {
-            encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(buffer.toString().getBytes(Charset.forName(encodingName))));
-            encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
+            encodedHeadToken = toEncodedRubyString(context, buffer.toString());
         } else {
             // When used with TCP input it could be that on socket connection the flush method
             // is invoked while no invocation of extract, leaving the encoding name unassigned.
@@ -183,4 +184,10 @@ public class BufferedTokenizerExt extends RubyObject {
         return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0));
     }
 
+    private RubyString toEncodedRubyString(ThreadContext context, String input) {
+        // Depends on the encodingName being set by the extract method, could potentially raise if not set. 
+        RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName))));
+        result.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
+        return result;
+    }
 }

Mainly the change is making it a bit more clear the caching behavior of the encodingName and deduplicating some somewhat complex code.

I dont feel super strongly about this suggestion though. Take them or leave it (if you do want to incorporate, bounce it back to me and I can do a quick review as I've spent quite a bit of time on this today and have all the context in my head).

Copy link
Member

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of suggestions to add more context to Exceptions to help us if we encounter this issue in the wild

// is invoked while no invocation of extract, leaving the encoding name unassigned.
// In such case also the headToken must be empty
if (!buffer.toString().isEmpty()) {
throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there additional context we can add to the exception? Maybe the buffer contents? I would also remove the this shouldn't happen text from the exception message body.

Is this code path likely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path should never be executed. If we execute it means that the following conditions are both true:

  1. we have the field encoding not assigned
  2. the head token has a value

Regarding the 1, every RubyString has an encoding assigned, in worst case is defaulted to UTF8, so if we pass at line https://github.com/elastic/logstash/pull/16968/files#diff-5c7f8990e98f54782395d29b4b1b5b68cf6f782b34af5eb8f1b5a77331e0172eR86 we have an encoding.
headToken can be empty but it can't be that there isn't any encoding assigned.

// create new RubyString with the data specified encoding
RubyString encodedHeadToken = RubyUtil.RUBY.newString(new ByteList(headToken.toString().getBytes(Charset.forName(encodingName))));
encodedHeadToken.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
input.unshift(encodedHeadToken); // reinsert it into the array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah shift... AFAIK it is O(N) operation, I wonder if there is improvement we can do...

Copy link
Contributor Author

@andsel andsel Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you refer to unshift, in the existing code we always invoke

entities.unshift(input.join(context));

so we shouldn't have introduced any bottleneck that wasn't present in previous version.

- extracted common code used in string encoding
- avoid full packaeg import
- better execption message with details on limit exceeded
Copy link

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @andsel

Copy link
Member

@donoghuc donoghuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome test coverage BTW ❤️

Copy link
Contributor

@mashhurs mashhurs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Thank you @andsel, this is great!

@andsel andsel merged commit 1c8cf54 into elastic:main Feb 5, 2025
7 checks passed
@andsel
Copy link
Contributor Author

andsel commented Feb 5, 2025

@logstashmachine backport 9.0

github-actions bot pushed a commit that referenced this pull request Feb 5, 2025
…n respecting the encoding of the input string (#16968)

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)
@github-actions github-actions bot added the v9.0.0 label Feb 5, 2025
@andsel
Copy link
Contributor Author

andsel commented Feb 5, 2025

@logstashmachine backport 8.x

github-actions bot pushed a commit that referenced this pull request Feb 5, 2025
…n respecting the encoding of the input string (#16968)

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)
@andsel
Copy link
Contributor Author

andsel commented Feb 5, 2025

@logstashmachine backport 8.18

@andsel
Copy link
Contributor Author

andsel commented Feb 5, 2025

@logstashmachine backport 8.17

@andsel
Copy link
Contributor Author

andsel commented Feb 5, 2025

@logstashmachine backport 8.16

github-actions bot pushed a commit that referenced this pull request Feb 5, 2025
…n respecting the encoding of the input string (#16968)

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)
github-actions bot pushed a commit that referenced this pull request Feb 5, 2025
…n respecting the encoding of the input string (#16968)

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)
github-actions bot pushed a commit that referenced this pull request Feb 5, 2025
…n respecting the encoding of the input string (#16968)

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)
andsel added a commit that referenced this pull request Feb 5, 2025
…fter a buffer full condition respecting the encoding of the input string (#16968) (#17018)

Backport PR #16968 to 9.0 branch, original message:

----

Permit to use effectively the tokenizer also in context where a line is bigger than a limit.
Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens.

## How solve the problem
This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694.
The first try failed on returning the tokens in the same encoding of the input.
This PR does a couple of things:
- accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one.
- respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset.

(cherry picked from commit 1c8cf54)

Co-authored-by: Andrea Selva <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Character encoding issues with refactored BufferedTokenizerExt
5 participants