Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support to S3 plugin #439

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

ddukbg
Copy link

@ddukbg ddukbg commented Oct 18, 2024

Summary

This PR adds support for Zstd compression in the Fluentd S3 plugin.

Changes

  • Implemented Zstd compression using the zstd-ruby library.
  • Introduced the ZstdCompressor class to handle log compression before uploading to S3.
  • Updated the example configuration to demonstrate the use of store_as zstd.
  • Ensured that the Zstd module is properly loaded to avoid uninitialized constant errors.

Testing

  • Successfully tested the Zstd compression functionality by sending large log data to S3.
  • All test cases passed without errors.

Test Code

require 'fluent/test'
require 'fluent/plugin/output'
require_relative '../lib/fluent/plugin/out_s3'  # out_s3.rb 파일을 명시적으로 불러옵니다
require 'fluent/plugin/s3_compressor_zstd'
require 'zstd-ruby'
require 'tempfile'

RSpec.describe Fluent::Plugin::S3Output::ZstdCompressor do
  let(:log) { double('log', warn: nil) }
  let(:compressor) { described_class.new(buffer_type: 'memory', log: log) }

  describe '#compress' do
    let(:test_data) { "This is a test log message" }

    it 'compresses the data using zstd' do
      chunk = double('chunk')
      allow(chunk).to receive(:open).and_yield(StringIO.new(test_data))

      tmp_file = Tempfile.new
      compressor.compress(chunk, tmp_file)
      tmp_file.rewind

      compressed_data = tmp_file.read
      expect(compressed_data).not_to eq(test_data)
      expect(Zstd.decompress(compressed_data)).to eq(test_data)
    end

    it 'logs a warning if compression fails' do
      chunk = double('chunk')
      allow(chunk).to receive(:open).and_raise(StandardError.new("Mock compression error"))

      tmp_file = Tempfile.new
      expect { compressor.compress(chunk, tmp_file) }.to raise_error(StandardError)

      expect(log).to have_received(:warn).with(/zstd compression failed: Mock compression error/)
    end
  end
end

Result

rspec test/zstd_compressor_spec.rb 

..

Finished in 0.03044 seconds (files took 0.59546 seconds to load)
2 examples, 0 failures

store_as (Zstd) Test

#fluent.conf
# -*- encoding: utf-8 -*-
<source>
  @type forward
  @id   input
  @label @mainstream
</source>

<label @mainstream>
  <match **>
    @type s3
    s3_bucket fluent-test-yw
    s3_region ap-northeast-2
    path logs/
    store_as zstd
    <format>
      @type json
      time_key timestamp   # JSON의 시간 키 명시
      encoding utf-8       # 인코딩 명시
    </format>
    <buffer>
      @type memory              # 메모리 버퍼 사용
      chunk_limit_size 1m        # 청크 크기를 1MB로 설정 (더 큰 로그를 한번에 처리)
      flush_interval 1s          # 1초마다 플러시
      flush_thread_count 4       # 동시에 플러시할 쓰레드 개수 (시스템 성능에 따라 더 증가 가능)
      retry_max_interval 10s     # 재시도 시간 조정
      retry_timeout 60m          # 재시도 시간 제한
    </buffer>
  </match>
</label>

Test Data
echo '{"message": "'$(head -c 1000000 </dev/zero | tr '\0' 'A')'"}' | fluent-cat test.tag

fluentd log
2024-10-18 17:49:37 +0900 [info]: #0 fluent/log.rb:362:info: [Aws::S3::Client 200 0.162773 0 retries] head_object(bucket:"fluent-test-yw",key:"logs/20241018_0.zst")

S3 Data
image

Why this feature?

Zstd compression provides a better compression ratio and performance compared to gzip, making it a valuable option for users who want efficient log storage on S3.

@daipom daipom self-requested a review October 18, 2024 08:59
@daipom
Copy link
Contributor

daipom commented Oct 18, 2024

Thanks for this enhancement.
Could you please add DCO to all commits?

fluent-plugin-s3.gemspec Outdated Show resolved Hide resolved
@ddukbg ddukbg force-pushed the master branch 2 times, most recently from 0d0bf95 to 6af3b5d Compare October 18, 2024 09:30
dependabot bot and others added 2 commits October 18, 2024 18:30
Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v3...v4)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: yongwoo.kim <[email protected]>
@daipom
Copy link
Contributor

daipom commented Oct 21, 2024

@ddukbg Thanks! I will review this soon.

FYI: We are just now getting Zstd support on Fluentd's side.

@ddukbg
Copy link
Author

ddukbg commented Oct 23, 2024

@daipom hello :)
The unit tests failed on Ruby 2.7, and it seems to be related to an issue with the bundler version. The latest bundler doesn’t appear to support Ruby 2.7. Is there any action I should take to address this?

@Watson1978
Copy link

Watson1978 commented Oct 23, 2024

ruby/setup-ruby action has installed proper bundler automatically by default.

- uses: ruby/setup-ruby@v1

image

So, I think you can replace gem install bundler rake to gem install rake at

gem install bundler rake

Then, I think the tests might be successful.

Before:
- name: Install dependencies
  run: gem install bundler rake

After:
- name: Install dependencies
  run: gem install rake

Signed-off-by: ddukbg <[email protected]>
@ddukbg
Copy link
Author

ddukbg commented Oct 23, 2024

@Watson1978 Thank you for your valuable feedback. I have modified the fluent-plugin-s3/.github/workflows/linux.yml file by removing the redundant bundler installation as you suggested. After these changes, the GitHub Actions workflow runs successfully and all unit tests have passed.
image

@daipom daipom self-requested a review October 25, 2024 08:02
Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this enhancement!
Could you please check my following comments?

lib/fluent/plugin/s3_compressor_zstd.rb Outdated Show resolved Hide resolved
test/test_in_s3.rb Outdated Show resolved Hide resolved
…mments

Moved ZstdCompressor tests from test_in_s3.rb to test_out_s3.rb as they relate to the out_s3 plugin.

Signed-off-by: ddukbg <[email protected]>
…omments

Added tests for ZstdCompressor to test_out_s3.rb following the maintainer's suggestions.

Signed-off-by: ddukbg <[email protected]>
Add ZstdCompressor to S3 Plugin and Fix Tests According to Maintainer's Feedback
@ddukbg
Copy link
Author

ddukbg commented Oct 25, 2024

@daipom Thank you for your valuable feedback :)
This Pull Request addresses the @daipom feedback regarding the addition of the ZstdCompressor to the S3 output plugin.

Changes Made:

  1. Removed Duplicate ZstdCompressor Class:

    • Deleted lib/fluent/plugin/s3_compressor_zstd.rb as the ZstdCompressor class was already defined in out_s3.rb, eliminating duplication.
  2. Added Tests for ZstdCompressor in test_out_s3.rb:

    • Moved the tests related to ZstdCompressor from test_in_s3.rb to test_out_s3.rb since they pertain to the output plugin.
    • Added test cases test_configure_with_mime_type_zstd and test_write_with_zstd to verify the proper functioning of ZstdCompressor.
  3. Ensured ZstdCompressor is Properly Registered:

    • Verified that ZstdCompressor is correctly registered in the COMPRESSOR_REGISTRY within out_s3.rb.
  4. Fixed Test Failures:

    • Adjusted the s3path in test_write_with_zstd to match the expected file extension for zstd compression, resolving the test failure.
  5. Resolved Gem Build Error:

    • Removed s3_compressor_zstd.rb from the Git index to fix the gem build error caused by the file being listed but not present.

Fluentd Log

2024-10-25 18:38:37 +0900 [info]: #0 fluent/log.rb:362:info: [Aws::S3::Client 200 0.119847 0 retries] head_object(bucket:"fluent-test-yw",key:"logs/20241025_0.zst")
2024-10-25 18:38:37 +0900 [info]: #0 fluent/log.rb:362:info: [Aws::S3::Client 200 0.017421 0 retries] head_object(bucket:"fluent-test-yw",key:"logs/20241025_1.zst")
2024-10-25 18:38:37 +0900 [info]: #0 fluent/log.rb:362:info: [Aws::S3::Client 200 0.029599 0 retries] put_object(body:#<Tempfile:/var/folders/59/3d04mk1x5nd_mf9gwd2c1dvw0000gn/T/s3-20241025-86143-etcg6h>,content_type:"application/x-zst",storage_class:"STANDARD",bucket:"fluent-test-yw",key:"logs/20241025_2.zst")

S3 Data
Pasted Graphic 8

testcode

bundle exec ruby -Ilib:test test/test_out_s3.rb 
Loaded suite test/test_out_s3
Started
Finished in 22.943453 seconds.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
53 tests, 76 assertions, 0 failures, 0 errors, 0 pendings, 0 omissions, 0 notifications
100% passed
--------------------------------------------

testcode(rspec)

rspec test/zstd_compressor_spec.rb                                                                                                                                       

Finished in 0.0169 seconds (files took 0.59358 seconds to load)
2 examples, 0 failures
require 'fluent/test'
require 'fluent/plugin/output'
require_relative '../lib/fluent/plugin/out_s3'  # out_s3.rb 파일을 명시적으로 불러옵니다
require 'zstd-ruby'
require 'tempfile'

RSpec.describe Fluent::Plugin::S3Output::ZstdCompressor do
  let(:log) { double('log', warn: nil) }
  let(:compressor) { described_class.new(buffer_type: 'memory', log: log) }

  describe '#compress' do
    let(:test_data) { "This is a test log message" }

    it 'compresses the data using zstd' do
      chunk = double('chunk')
      allow(chunk).to receive(:open).and_yield(StringIO.new(test_data))

      tmp_file = Tempfile.new
      compressor.compress(chunk, tmp_file)
      tmp_file.rewind

      compressed_data = tmp_file.read
      expect(compressed_data).not_to eq(test_data)
      expect(Zstd.decompress(compressed_data)).to eq(test_data)
    end

    it 'logs a warning if compression fails' do
      chunk = double('chunk')
      allow(chunk).to receive(:open).and_raise(StandardError.new("Mock compression error"))

      tmp_file = Tempfile.new
      expect { compressor.compress(chunk, tmp_file) }.to raise_error(StandardError)

      expect(log).to have_received(:warn).with(/zstd compression failed: Mock compression error/)
    end
  end
end

Notes:

  • All changes have been made according to the maintainer's comments.
  • Tests have been updated and all pass successfully.
  • Documentation and code comments have been updated accordingly.

Please review the changes, and let me know if any further adjustments are needed.

@ddukbg ddukbg requested a review from daipom October 29, 2024 06:35
Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Sorry for my late response.
Basically, it looks good to me.
Could you please fix the following?

lib/fluent/plugin/out_s3.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/out_s3.rb Outdated Show resolved Hide resolved
test/test_out_s3.rb Outdated Show resolved Hide resolved
Remove redundant spaces to improve code readability and consistency

Co-authored-by: Daijiro Fukuda <[email protected]>
Signed-off-by: ddukbg <[email protected]>

refactor: Simplify data compression logic

refactor: Simplify data compression logic

Remove duplicate file reading and streamline compression process

Co-authored-by: Daijiro Fukuda <[email protected]>
Signed-off-by: ddukbg <[email protected]>
@ddukbg
Copy link
Author

ddukbg commented Nov 2, 2024

@daipom
Thank you for your helpful suggestions! I've updated the code according to your feedback:

- Removed duplicate file reading
- Simplified the compression process

Could you please review the changes when you have a chance?

@ddukbg ddukbg requested a review from daipom November 2, 2024 09:17
@ddukbg ddukbg force-pushed the master branch 7 times, most recently from 4d2dbff to b1efca1 Compare November 2, 2024 11:00
Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to bother you, but I think it would be better to implement the compressor in a separate file.
Could you please confirm the following comments?

Copy link
Contributor

@daipom daipom Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to put the implementation of ZstdCompressor in a separate file so that require 'zstd-ruby' is only executed when necessary.

How about adding such a file instead of fixing out_s3.rb?

s3_compressor_zstd.rb

require 'zstd-ruby'

module Fluent::Plugin
  class S3Output
    class ZstdCompressor < Compressor
      S3Output.register_compressor("zstd", self)

      config_section :compress, param_name: :compress_config, init: true, multi: false do
        desc "Compression level for zstd (1-22)"
        config_param :level, :integer, default: 3
      end

      def ext
        'zst'.freeze
      end

      def content_type
        'application/x-zst'.freeze
      end

      def compress(chunk, tmp)
        data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8')
        compressed_data = Zstd.compress(data, level: @compress_config.level)
        tmp.write(compressed_data)
      rescue => e
        log.warn "zstd compression failed: #{e.message}"
        raise e
      end
    end
  end
end

The points are

  • require 'zstd-ruby' will be executed only when necessary.
    • This file is automatically loaded when store_as is set to zstd. (We don't need to change out_s3.rb.)
  • We can change level as follows:
    <match test>
      @type s3
      ...
      <compress>
        level 1
      </compress>
    </match>
    • Note: We can omit the compress section to use the default value 3.


def compress(chunk, tmp)
begin
data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8')

Could you please tell me why this is necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#439 (comment)

Handled line endings and encoding

Could you please add this test case?

Comment on lines +112 to +118
def test_configure_with_mime_type_zstd
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a test for the level option like this.

Suggested change
def test_configure_with_mime_type_zstd
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
end
data('level default' => nil,
'level 1' => 1)
def test_configure_with_mime_type_zstd(level)
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
conf << "\n<compress>\nlevel #{level}\n</compress>\n" if level
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
assert_equal (level || 3), d.instance.instance_variable_get(:@compressor).instance_variable_get(:@compress_config).level
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants