From 7f93e5e3ad6753c028a21f98d8925230a9161595 Mon Sep 17 00:00:00 2001 From: "Jason A. Gambino" Date: Tue, 3 Dec 2024 15:47:47 -0500 Subject: [PATCH 1/2] cloudwatch_logs_test --- lib/logstash/inputs/cloudwatch_logs.rb | 58 ++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 41ce7ce..c314324 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -11,6 +11,38 @@ Aws.eager_autoload! +class CLoudWatchTagCache + def initialize + @cache = {} + end + + def get_tags(log_group_name) + if @cache.key?(log_group_name) + return @cache[log_group_name][:tags] + else + tags = fetch_tags_from_cloudwatch(log_group_name) + @cache[log_group_name] = {tags: tags} + return tags + end + end + + def fetch_tags_from_cloudwatch(log_group_name) + tag_params = { log_group_name: log_group_name} + response = @cloudwatch.list_tags_log_group(tag_params) + tags = response.tags + + tags.clone.each do |key, value| + key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") + if not tags.key?(key_without_spaces) + tags[key_without_spaces] = value + tags.delete(key) + end + end + + return tags + end +end + # Stream events from CloudWatch Logs streams. # # Specify an individual log group, and this plugin will scan @@ -50,7 +82,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base # seconds before now to read back from. config :start_position, :default => 'beginning' - + def initialize(*args) + super(*args) + @cache = CLoudWatchTagCache.new + end # def register public def register @@ -203,15 +238,9 @@ def process_group(group) @priority << group end #def process_group - # def process_log - private - def process_log(log, group) - tag_params = { - :log_group_name => group - } - response = @cloudwatch.list_tags_log_group(tag_params) - tags = response.tags + def fetch_tags(log_group_name) + tags = @cache.get_tags(log_group_name) tags.clone.each do |key, value| key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") if not tags.key?(key_without_spaces) @@ -220,6 +249,17 @@ def process_log(log, group) end end + tags + end + + # def process_log + private + def process_log(log, group) + tag_params = { + :log_group_name => group + } + tags = fetch_tags(log_group_name) + @logger.debug("processing_log #{log}") @codec.decode(log.message.to_str) do |event| event.set("@timestamp", parse_time(log.timestamp)) From bebbead1ca6ed42822719ca7ffe039111d0cb44d Mon Sep 17 00:00:00 2001 From: "Jason A. Gambino" Date: Wed, 4 Dec 2024 09:04:52 -0500 Subject: [PATCH 2/2] updating cloudwatch branch --- lib/logstash/inputs/cloudwatch_logs.rb | 63 ++++++++------------------ logstash-input-cloudwatch_logs.gemspec | 4 +- 2 files changed, 20 insertions(+), 47 deletions(-) diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index c314324..bc32c74 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -11,38 +11,6 @@ Aws.eager_autoload! -class CLoudWatchTagCache - def initialize - @cache = {} - end - - def get_tags(log_group_name) - if @cache.key?(log_group_name) - return @cache[log_group_name][:tags] - else - tags = fetch_tags_from_cloudwatch(log_group_name) - @cache[log_group_name] = {tags: tags} - return tags - end - end - - def fetch_tags_from_cloudwatch(log_group_name) - tag_params = { log_group_name: log_group_name} - response = @cloudwatch.list_tags_log_group(tag_params) - tags = response.tags - - tags.clone.each do |key, value| - key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") - if not tags.key?(key_without_spaces) - tags[key_without_spaces] = value - tags.delete(key) - end - end - - return tags - end -end - # Stream events from CloudWatch Logs streams. # # Specify an individual log group, and this plugin will scan @@ -82,10 +50,6 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base # seconds before now to read back from. config :start_position, :default => 'beginning' - def initialize(*args) - super(*args) - @cache = CLoudWatchTagCache.new - end # def register public def register @@ -95,9 +59,9 @@ def register @sincedb = {} check_start_position_validity - - Aws::ConfigService::Client.new(aws_options_hash) @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash) + @tag_cache = {} + Aws::ConfigService::Client.new(aws_options_hash) if @sincedb_path.nil? if settings @@ -238,9 +202,22 @@ def process_group(group) @priority << group end #def process_group - def fetch_tags(log_group_name) - tags = @cache.get_tags(log_group_name) + if @tag_cache.key?(log_group_name) + return @tag_cache[log_group_name][:tags] + else + tags = fetch_tags_from_cloudwatch(log_group_name) + @tag_cache[log_group_name] = { tags: tags} + return tags + end + end + + private + def fetch_tags_from_cloudwatch(log_group_name) + tag_params = { log_group_name: log_group_name} + response = @cloudwatch.list_tags_log_group(tag_params) + tags = response.tags + tags.clone.each do |key, value| key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") if not tags.key?(key_without_spaces) @@ -248,17 +225,13 @@ def fetch_tags(log_group_name) tags.delete(key) end end - tags end # def process_log private def process_log(log, group) - tag_params = { - :log_group_name => group - } - tags = fetch_tags(log_group_name) + tags = fetch_tags(group) @logger.debug("processing_log #{log}") @codec.decode(log.message.to_str) do |event| diff --git a/logstash-input-cloudwatch_logs.gemspec b/logstash-input-cloudwatch_logs.gemspec index 6158020..4eb734b 100644 --- a/logstash-input-cloudwatch_logs.gemspec +++ b/logstash-input-cloudwatch_logs.gemspec @@ -1,13 +1,13 @@ Gem::Specification.new do |s| s.name = 'logstash-input-cloudwatch_logs' - s.version = '1.1.0' + s.version = '1.1.1' s.licenses = ['Apache-2.0'] s.summary = 'Stream events from CloudWatch Logs.' s.description = 'This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program' s.homepage = '' s.require_paths = ['lib'] - + s.authors = 'Cloud-gov' # Files s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']