From 7f93e5e3ad6753c028a21f98d8925230a9161595 Mon Sep 17 00:00:00 2001 From: "Jason A. Gambino" Date: Tue, 3 Dec 2024 15:47:47 -0500 Subject: [PATCH] cloudwatch_logs_test --- lib/logstash/inputs/cloudwatch_logs.rb | 58 ++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 41ce7ce..c314324 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -11,6 +11,38 @@ Aws.eager_autoload! +class CLoudWatchTagCache + def initialize + @cache = {} + end + + def get_tags(log_group_name) + if @cache.key?(log_group_name) + return @cache[log_group_name][:tags] + else + tags = fetch_tags_from_cloudwatch(log_group_name) + @cache[log_group_name] = {tags: tags} + return tags + end + end + + def fetch_tags_from_cloudwatch(log_group_name) + tag_params = { log_group_name: log_group_name} + response = @cloudwatch.list_tags_log_group(tag_params) + tags = response.tags + + tags.clone.each do |key, value| + key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") + if not tags.key?(key_without_spaces) + tags[key_without_spaces] = value + tags.delete(key) + end + end + + return tags + end +end + # Stream events from CloudWatch Logs streams. # # Specify an individual log group, and this plugin will scan @@ -50,7 +82,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base # seconds before now to read back from. config :start_position, :default => 'beginning' - + def initialize(*args) + super(*args) + @cache = CLoudWatchTagCache.new + end # def register public def register @@ -203,15 +238,9 @@ def process_group(group) @priority << group end #def process_group - # def process_log - private - def process_log(log, group) - tag_params = { - :log_group_name => group - } - response = @cloudwatch.list_tags_log_group(tag_params) - tags = response.tags + def fetch_tags(log_group_name) + tags = @cache.get_tags(log_group_name) tags.clone.each do |key, value| key_without_spaces = key.to_s.gsub(/[[:space:]]/, "") if not tags.key?(key_without_spaces) @@ -220,6 +249,17 @@ def process_log(log, group) end end + tags + end + + # def process_log + private + def process_log(log, group) + tag_params = { + :log_group_name => group + } + tags = fetch_tags(log_group_name) + @logger.debug("processing_log #{log}") @codec.decode(log.message.to_str) do |event| event.set("@timestamp", parse_time(log.timestamp))