Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create fields on the fly, according to fields names and datatypes that arrives to that output plugin. #13

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# logstash-output-influxdb
This fork permits to create fields on the fly, accrding to fields names and datatypes
that arrives to that output plugin.

I added 2 configuration paramters:

# This settings revokes the needs to use data_points and coerce_values configuration
# to create appropriate insert to influxedb. Should be used with fields_to_skip configuration
# This setting sets data points (column) names as field name from arrived to plugin event,
# value for data points
config :use_event_fields_for_data_points, :validate => :boolean, :default => true

# The array with keys to delete from future processing.
# By the default event that arrived to the output plugin contains keys "@version", "@timestamp"
# and can contains another fields like, for example, "command" that added by input plugin EXEC.
# Of course we doesn't needs those fields to be processed and inserted to influxdb when configuration
# use_event_fields_for_data_points is true.
# We doesn't deletes the keys from event, we creates new Hash from event and after that, we deletes unwanted
# keys.

config :fields_to_skip, :validate => :array, :default => []

This is my example config file:
I'm retrieving different number of fields with differnt names from CPU, memory, disks, but
I doesn't need defferent configuration per data type as in master branch.
I'm creating relevant fields names and datatypes on filter stage and just skips the unwanted fields in outputv plugin.

input {

exec {
command => "env LANG=C sar -P ALL 1 1|egrep -v '^$|Average|CPU'"
type => "system.cpu"
interval => 1
}
exec {
command => "env LANG=C sar -r 1 1|egrep -v '^$|Average|memfree|CPU'"
type => "system.memory"
interval => 1
}
exec {
command => "env LANG=C sar -pd 1 1|egrep -v '^$|Average|DEV|CPU'"
type => "system.disks"
interval => 1
}
}


filter {

if [type] == "system.cpu" {
split {}
grok {
match => { "message" => "\A(?<sar_time>%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:cpu}\s+%{NUMBER:user:float}\s+%{NUMBER:nice:float}\s+%{NUMBER:system:float}\s+%{NUMBER:iowait:float}\s+%{NUMBER:steal:float}\s+%{NUMBER:idle:float}\z" } remove_field => [ "message" ]
add_field => {"series_name" => "%{host}.%{type}.%{cpu}"}
}
ruby {
code => " event['usage'] = (100 - event['idle']).round(2); event['usage-io'] = event['usage'] - event['iowait']" }
}
if [type] == "system.memory" {
split {}
grok {
match => { "message" => "\A(?<sar_time>%{HOUR}:%{MINUTE}:%{SECOND})\s+%{NUMBER:kbmemfree:float}\s+%{NUMBER:kbmemused:float}\s+%{NUMBER:percenmemused:float}\s+%{NUMBER:kbbuffers:float}\s+%{NUMBER:kbcached:float}\s+%{NUMBER:kbcommit:float}\s+%{NUMBER:kpercentcommit:float}\z" }
remove_field => [ "message" ]
add_field => {"series_name" => "%{host}.%{type}"}
}
ruby {
code => " event['kbtotalmemory'] = (event['kbmemfree'] + event['kbmemused']);event['kbnetoused'] = (event['kbmemused'] - (event['kbbuffers'] + event['kbcached']));event['kbnetofree'] = (event['kbmemfree'] + (event['kbbuffers'] + event['kbcached']))"
}
}
if [type] == "system.disks" {
split {}
grok {
match => { "message" => "\A(?<sar_time>%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:disk}\s+%{NUMBER:tps:float}\s+%{NUMBER:rd_sec_s:float}\s+%{NUMBER:wr_sec_s:float}\s+%{NUMBER:avgrq-sz:float}\s+%{NUMBER:avgqu-sz:float}\s+%{NUMBER:await:float}\s+%{NUMBER:svctm:float}\s+%{NUMBER:percenutil:float}\z" }
remove_field => [ "message" ]
add_field => {"series_name" => "%{host}.%{type}.%{disk}"}
}

}
ruby {
code => "event['time'] = (DateTime.parse(event['sar_time']).to_time.to_i ) - 7200"
}
}
output {

influxdb {
host => "172.20.90.72"
password => "root"
user => "root"
db => "metrics"
allow_time_override => true
time_precision => "s"
series => "%{series_name}"
use_event_fields_for_data_points => true
fields_to_skip => ["@version","@timestamp","type","host","command","sar_time","series_name"]
}

stdout { codec => rubydebug
workers => 4
}
}
98 changes: 69 additions & 29 deletions lib/logstash/outputs/influxdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
require "logstash/namespace"
require "logstash/outputs/base"
require "stud/buffer"
require "active_support/core_ext"
require "json"

# This output lets you output Metrics to InfluxDB
#
Expand All @@ -12,7 +14,7 @@
# the InfluxDB API let's you do some semblance of bulk operation
# per http call but each call is database-specific
#
# You can learn more at http://influxdb.com[InfluxDB homepage]
# You can learn more about InfluxDB at <http://influxdb.org>
class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
include Stud::Buffer

Expand Down Expand Up @@ -57,15 +59,15 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
# Set the level of precision of `time`
#
# only useful when overriding the time value
config :time_precision, :validate => ["m", "s", "u"], :default => "s"
config :time_precision, :validate => ["ms", "s", "u"], :default => "s"

# Allow value coercion
#
# this will attempt to convert data point values to the appropriate type before posting
# otherwise sprintf-filtered numeric values could get sent as strings
# format is `{'column_name' => 'datatype'}`
# format is `{'column_name' => 'datatype'}
#
# currently supported datatypes are `integer` and `float`
# currently supported datatypes are integer and float
#
config :coerce_values, :validate => :hash, :default => {}

Expand All @@ -84,6 +86,22 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base
# near-real-time.
config :idle_flush_time, :validate => :number, :default => 1

# This settings revokes the needs to use data_points and coerce_values configuration
# to create appropriate insert to influxedb. Should be used with fields_to_skip configuration
# This setting sets data points (column) names as field name from arrived to plugin event,
# value for data points
config :use_event_fields_for_data_points, :validate => :boolean, :default => true

# The array with keys to delete from future processing.
# By the default event that arrived to the output plugin contains keys "@version", "@timestamp"
# and can contains another fields like, for example, "command" that added by input plugin EXEC.
# Of course we doesn't needs those fields to be processed and inserted to influxdb when configuration
# use_event_fields_for_data_points is true.
# We doesn't deletes the keys from event, we creates new Hash from event and after that, we deletes unwanted
# keys.

config :fields_to_skip, :validate => :array, :default => []

public
def register
require "ftw" # gem ftw
Expand Down Expand Up @@ -126,30 +144,52 @@ def receive(event)
# ]
event_hash = {}
event_hash['name'] = event.sprintf(@series)
sprintf_points = Hash[@data_points.map {|k,v| [event.sprintf(k), event.sprintf(v)]}]
if sprintf_points.has_key?('time')
@logger.error("Cannot override value of time without 'allow_override_time'. Using event timestamp") unless @allow_override_time

if !use_event_fields_for_data_points
logger.debug("#{use_event_fields_for_data_points} is false. Using preconfigured data points to build influxdb request")
sprintf_points = Hash[@data_points.map {|k,v| [event.sprintf(k), event.sprintf(v)]}]
return
else
sprintf_points['time'] = to_epoch(event.timestamp)
logger.debug("#{use_event_fields_for_data_points} is true. Using fields from event to build influxdb request")
if fields_to_skip.any?
array_to_skip = fields_to_skip
else
array_to_skip = []
end

logger.debug("Those fields going to be skipped from influxdb request #{array_to_skip.to_json} ")
sprintf_points = Hash[event]
sprintf_points.delete_if{|k,v| array_to_skip.include?(k)}
logger.debug("Those fields going to be used for influxdb request #{sprintf_points.to_json} ")
end
@coerce_values.each do |column, value_type|
if sprintf_points.has_key?(column)
begin
case value_type
when "integer"
@logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}")
sprintf_points[column] = sprintf_points[column].to_i
when "float"
@logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}")
sprintf_points[column] = sprintf_points[column].to_f
else
@logger.error("Don't know how to convert to #{value_type}")
end
rescue => e
@logger.error("Unhandled exception", :error => e.message)
end
end

if sprintf_points.has_key?('time')
@logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp") unless @allow_time_override
else
# sprintf_points['time'] = to_epoch(event.timestamp)
sprintf_points['time'] = event.timestamp.to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You corrected the time_precision to use "ms" instead of "m" however since you're using to_i here, you'll never get millisecond precision. See my fix for this

end

#if !use_event_fields_for_data_points
@coerce_values.each do |column, value_type|
if sprintf_points.has_key?(column)
begin
case value_type
when "integer"
@logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}")
sprintf_points[column] = sprintf_points[column].to_i
when "float"
@logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}")
sprintf_points[column] = sprintf_points[column].to_f
else
@logger.error("Don't know how to convert to #{value_type}")
end
rescue => e
@logger.error("Unhandled exception", :error => e.message)
end
end
end
#end
event_hash['columns'] = sprintf_points.keys
event_hash['points'] = []
event_hash['points'] << sprintf_points.values
Expand Down Expand Up @@ -220,10 +260,10 @@ def post(body)
end
end # def post

private
def to_epoch(t)
return t.is_a?(Time) ? t.to_i : Time.parse(t).to_i
end
# private
# def to_epoch(t)
# return t.is_a?(Time) ? t.to_i : Time.parse(t).to_i
# end

def teardown
buffer_flush(:final => true)
Expand Down