Skip to content

Commit

Permalink
updating cloudwatch branch
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonTheMain committed Dec 4, 2024
1 parent 7f93e5e commit bebbead
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 47 deletions.
63 changes: 18 additions & 45 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,6 @@

Aws.eager_autoload!

class CLoudWatchTagCache
def initialize
@cache = {}
end

def get_tags(log_group_name)
if @cache.key?(log_group_name)
return @cache[log_group_name][:tags]
else
tags = fetch_tags_from_cloudwatch(log_group_name)
@cache[log_group_name] = {tags: tags}
return tags
end
end

def fetch_tags_from_cloudwatch(log_group_name)
tag_params = { log_group_name: log_group_name}
response = @cloudwatch.list_tags_log_group(tag_params)
tags = response.tags

tags.clone.each do |key, value|
key_without_spaces = key.to_s.gsub(/[[:space:]]/, "")
if not tags.key?(key_without_spaces)
tags[key_without_spaces] = value
tags.delete(key)
end
end

return tags
end
end

# Stream events from CloudWatch Logs streams.
#
# Specify an individual log group, and this plugin will scan
Expand Down Expand Up @@ -82,10 +50,6 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
# seconds before now to read back from.
config :start_position, :default => 'beginning'

def initialize(*args)
super(*args)
@cache = CLoudWatchTagCache.new
end
# def register
public
def register
Expand All @@ -95,9 +59,9 @@ def register
@sincedb = {}

check_start_position_validity

Aws::ConfigService::Client.new(aws_options_hash)
@cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash)
@tag_cache = {}
Aws::ConfigService::Client.new(aws_options_hash)

if @sincedb_path.nil?
if settings
Expand Down Expand Up @@ -238,27 +202,36 @@ def process_group(group)
@priority << group
end #def process_group


def fetch_tags(log_group_name)
tags = @cache.get_tags(log_group_name)
if @tag_cache.key?(log_group_name)
return @tag_cache[log_group_name][:tags]
else
tags = fetch_tags_from_cloudwatch(log_group_name)
@tag_cache[log_group_name] = { tags: tags}
return tags
end
end

private
def fetch_tags_from_cloudwatch(log_group_name)
tag_params = { log_group_name: log_group_name}
response = @cloudwatch.list_tags_log_group(tag_params)
tags = response.tags

tags.clone.each do |key, value|
key_without_spaces = key.to_s.gsub(/[[:space:]]/, "")
if not tags.key?(key_without_spaces)
tags[key_without_spaces] = value
tags.delete(key)
end
end

tags
end

# def process_log
private
def process_log(log, group)
tag_params = {
:log_group_name => group
}
tags = fetch_tags(log_group_name)
tags = fetch_tags(group)

@logger.debug("processing_log #{log}")
@codec.decode(log.message.to_str) do |event|
Expand Down
4 changes: 2 additions & 2 deletions logstash-input-cloudwatch_logs.gemspec
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-cloudwatch_logs'
s.version = '1.1.0'
s.version = '1.1.1'
s.licenses = ['Apache-2.0']
s.summary = 'Stream events from CloudWatch Logs.'
s.description = 'This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program'
s.homepage = ''
s.require_paths = ['lib']

s.authors = 'Cloud-gov'
# Files
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']

Expand Down

0 comments on commit bebbead

Please sign in to comment.