Skip to content

Commit

Permalink
cloudwatch_logs_test
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonTheMain committed Dec 3, 2024
1 parent 9112dcd commit 7f93e5e
Showing 1 changed file with 49 additions and 9 deletions.
58 changes: 49 additions & 9 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@

Aws.eager_autoload!

class CLoudWatchTagCache
def initialize
@cache = {}
end

def get_tags(log_group_name)
if @cache.key?(log_group_name)
return @cache[log_group_name][:tags]
else
tags = fetch_tags_from_cloudwatch(log_group_name)
@cache[log_group_name] = {tags: tags}
return tags
end
end

def fetch_tags_from_cloudwatch(log_group_name)
tag_params = { log_group_name: log_group_name}
response = @cloudwatch.list_tags_log_group(tag_params)
tags = response.tags

tags.clone.each do |key, value|
key_without_spaces = key.to_s.gsub(/[[:space:]]/, "")
if not tags.key?(key_without_spaces)
tags[key_without_spaces] = value
tags.delete(key)
end
end

return tags
end
end

# Stream events from CloudWatch Logs streams.
#
# Specify an individual log group, and this plugin will scan
Expand Down Expand Up @@ -50,7 +82,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
# seconds before now to read back from.
config :start_position, :default => 'beginning'


def initialize(*args)
super(*args)
@cache = CLoudWatchTagCache.new
end
# def register
public
def register
Expand Down Expand Up @@ -203,15 +238,9 @@ def process_group(group)
@priority << group
end #def process_group

# def process_log
private
def process_log(log, group)
tag_params = {
:log_group_name => group
}
response = @cloudwatch.list_tags_log_group(tag_params)
tags = response.tags

def fetch_tags(log_group_name)
tags = @cache.get_tags(log_group_name)
tags.clone.each do |key, value|
key_without_spaces = key.to_s.gsub(/[[:space:]]/, "")
if not tags.key?(key_without_spaces)
Expand All @@ -220,6 +249,17 @@ def process_log(log, group)
end
end

tags
end

# def process_log
private
def process_log(log, group)
tag_params = {
:log_group_name => group
}
tags = fetch_tags(log_group_name)

@logger.debug("processing_log #{log}")
@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
Expand Down

0 comments on commit 7f93e5e

Please sign in to comment.