From d4679a6e4cb4dc5e1c0fdd19bc324c98e3563b2a Mon Sep 17 00:00:00 2001 From: Evgeny Gorelik Date: Mon, 23 Feb 2015 12:38:40 +0200 Subject: [PATCH 1/3] Update influxdb.rb I created this fork to solve the solution in issue https://github.com/logstash-plugins/logstash-output-influxdb/issues/6 --- lib/logstash/outputs/influxdb.rb | 98 ++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/lib/logstash/outputs/influxdb.rb b/lib/logstash/outputs/influxdb.rb index b76356d..88bcc62 100644 --- a/lib/logstash/outputs/influxdb.rb +++ b/lib/logstash/outputs/influxdb.rb @@ -2,6 +2,8 @@ require "logstash/namespace" require "logstash/outputs/base" require "stud/buffer" +require "active_support/core_ext" +require "json" # This output lets you output Metrics to InfluxDB # @@ -12,7 +14,7 @@ # the InfluxDB API let's you do some semblance of bulk operation # per http call but each call is database-specific # -# You can learn more at http://influxdb.com[InfluxDB homepage] +# You can learn more about InfluxDB at class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base include Stud::Buffer @@ -57,15 +59,15 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # Set the level of precision of `time` # # only useful when overriding the time value - config :time_precision, :validate => ["m", "s", "u"], :default => "s" + config :time_precision, :validate => ["ms", "s", "u"], :default => "s" # Allow value coercion # # this will attempt to convert data point values to the appropriate type before posting # otherwise sprintf-filtered numeric values could get sent as strings - # format is `{'column_name' => 'datatype'}` + # format is `{'column_name' => 'datatype'} # - # currently supported datatypes are `integer` and `float` + # currently supported datatypes are integer and float # config :coerce_values, :validate => :hash, :default => {} @@ -84,6 +86,22 @@ class LogStash::Outputs::InfluxDB < LogStash::Outputs::Base # near-real-time. config :idle_flush_time, :validate => :number, :default => 1 + # This settings revokes the needs to use data_points and coerce_values configuration + # to create appropriate insert to influxedb. Should be used with fields_to_skip configuration + # This setting sets data points (column) names as field name from arrived to plugin event, + # value for data points + config :use_event_fields_for_data_points, :validate => :boolean, :default => true + + # The array with keys to delete from future processing. + # By the default event that arrived to the output plugin contains keys "@version", "@timestamp" + # and can contains another fields like, for example, "command" that added by input plugin EXEC. + # Of course we doesn't needs those fields to be processed and inserted to influxdb when configuration + # use_event_fields_for_data_points is true. + # We doesn't deletes the keys from event, we creates new Hash from event and after that, we deletes unwanted + # keys. + + config :fields_to_skip, :validate => :array, :default => [] + public def register require "ftw" # gem ftw @@ -126,30 +144,52 @@ def receive(event) # ] event_hash = {} event_hash['name'] = event.sprintf(@series) - sprintf_points = Hash[@data_points.map {|k,v| [event.sprintf(k), event.sprintf(v)]}] - if sprintf_points.has_key?('time') - @logger.error("Cannot override value of time without 'allow_override_time'. Using event timestamp") unless @allow_override_time + + if !use_event_fields_for_data_points + logger.debug("#{use_event_fields_for_data_points} is false. Using preconfigured data points to build influxdb request") + sprintf_points = Hash[@data_points.map {|k,v| [event.sprintf(k), event.sprintf(v)]}] + return else - sprintf_points['time'] = to_epoch(event.timestamp) + logger.debug("#{use_event_fields_for_data_points} is true. Using fields from event to build influxdb request") + if fields_to_skip.any? + array_to_skip = fields_to_skip + else + array_to_skip = [] + end + + logger.debug("Those fields going to be skipped from influxdb request #{array_to_skip.to_json} ") + sprintf_points = Hash[event] + sprintf_points.delete_if{|k,v| array_to_skip.include?(k)} + logger.debug("Those fields going to be used for influxdb request #{sprintf_points.to_json} ") end - @coerce_values.each do |column, value_type| - if sprintf_points.has_key?(column) - begin - case value_type - when "integer" - @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") - sprintf_points[column] = sprintf_points[column].to_i - when "float" - @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") - sprintf_points[column] = sprintf_points[column].to_f - else - @logger.error("Don't know how to convert to #{value_type}") - end - rescue => e - @logger.error("Unhandled exception", :error => e.message) - end - end + + if sprintf_points.has_key?('time') + @logger.error("Cannot override value of time without 'allow_time_override'. Using event timestamp") unless @allow_time_override + else + # sprintf_points['time'] = to_epoch(event.timestamp) + sprintf_points['time'] = event.timestamp.to_i end + + #if !use_event_fields_for_data_points + @coerce_values.each do |column, value_type| + if sprintf_points.has_key?(column) + begin + case value_type + when "integer" + @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") + sprintf_points[column] = sprintf_points[column].to_i + when "float" + @logger.debug("Converting column #{column} to type #{value_type}: Current value: #{sprintf_points[column]}") + sprintf_points[column] = sprintf_points[column].to_f + else + @logger.error("Don't know how to convert to #{value_type}") + end + rescue => e + @logger.error("Unhandled exception", :error => e.message) + end + end + end + #end event_hash['columns'] = sprintf_points.keys event_hash['points'] = [] event_hash['points'] << sprintf_points.values @@ -220,10 +260,10 @@ def post(body) end end # def post - private - def to_epoch(t) - return t.is_a?(Time) ? t.to_i : Time.parse(t).to_i - end +# private +# def to_epoch(t) +# return t.is_a?(Time) ? t.to_i : Time.parse(t).to_i +# end def teardown buffer_flush(:final => true) From c85d780ff08662d669a16adb50c7e9d185fd7be7 Mon Sep 17 00:00:00 2001 From: Evgeny Gorelik Date: Mon, 23 Feb 2015 12:57:54 +0200 Subject: [PATCH 2/3] Create README.md --- README.md | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..7ba913c --- /dev/null +++ b/README.md @@ -0,0 +1,98 @@ +# logstash-output-influxdb +This fork permits to create fields on the fly, accrding to fields names and datatypes +that arrives to that output plugin. + +I added 2 configuration paramters: + + # This settings revokes the needs to use data_points and coerce_values configuration + # to create appropriate insert to influxedb. Should be used with fields_to_skip configuration + # This setting sets data points (column) names as field name from arrived to plugin event, + # value for data points + config :use_event_fields_for_data_points, :validate => :boolean, :default => true + + # The array with keys to delete from future processing. + # By the default event that arrived to the output plugin contains keys "@version", "@timestamp" + # and can contains another fields like, for example, "command" that added by input plugin EXEC. + # Of course we doesn't needs those fields to be processed and inserted to influxdb when configuration + # use_event_fields_for_data_points is true. + # We doesn't deletes the keys from event, we creates new Hash from event and after that, we deletes unwanted + # keys. + + config :fields_to_skip, :validate => :array, :default => [] + + This is my example config file: + I'm retrieving different number of fields with differnt names from CPU, memory, disks, but + I doesn't need defferent configuration per data type as in master branch. + I'm creating relevant fields names and datatypes on filter stage and just skips the unwanted fields in outputv plugin. + + input { + exec { + command => "env LANG=C sar -P ALL 1 1|egrep -v '^$|Average|CPU'" + type => "system.cpu" + interval => 1 + } + exec { + command => "env LANG=C sar -r 1 1|egrep -v '^$|Average|memfree|CPU'" + type => "system.memory" + interval => 1 + } + exec { + command => "env LANG=C sar -pd 1 1|egrep -v '^$|Average|DEV|CPU'" + type => "system.disks" + interval => 1 +} + +filter { + if [type] == "system.cpu" { + split {} + grok { + match => { "message" => "\A(?%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:cpu}\s+%{NUMBER:user:float}\s+%{NUMBER:nice:float}\s+%{NUMBER:system:float}\s+%{NUMBER:iowait:float}\s+%{NUMBER:steal:float}\s+%{NUMBER:idle:float}\z" } + remove_field => [ "message" ] + add_field => {"series_name" => "%{host}.%{type}.%{cpu}"} + } + ruby { + code => " event['usage'] = (100 - event['idle']).round(2); event['usage-io'] = event['usage'] - event['iowait']" + } + } + if [type] == "system.memory" { + split {} + grok { + match => { "message" => "\A(?%{HOUR}:%{MINUTE}:%{SECOND})\s+%{NUMBER:kbmemfree:float}\s+%{NUMBER:kbmemused:float}\s+%{NUMBER:percenmemused:float}\s+%{NUMBER:kbbuffers:float}\s+%{NUMBER:kbcached:float}\s+%{NUMBER:kbcommit:float}\s+%{NUMBER:kpercentcommit:float}\z" } + remove_field => [ "message" ] + add_field => {"series_name" => "%{host}.%{type}"} + } + ruby { + code => " event['kbtotalmemory'] = (event['kbmemfree'] + event['kbmemused']);event['kbnetoused'] = (event['kbmemused'] - (event['kbbuffers'] + event['kbcached']));event['kbnetofree'] = (event['kbmemfree'] + (event['kbbuffers'] + event['kbcached']))" + } + } + if [type] == "system.disks" { + split {} + grok { + match => { "message" => "\A(?%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:disk}\s+%{NUMBER:tps:float}\s+%{NUMBER:rd_sec_s:float}\s+%{NUMBER:wr_sec_s:float}\s+%{NUMBER:avgrq-sz:float}\s+%{NUMBER:avgqu-sz:float}\s+%{NUMBER:await:float}\s+%{NUMBER:svctm:float}\s+%{NUMBER:percenutil:float}\z" } + remove_field => [ "message" ] + add_field => {"series_name" => "%{host}.%{type}.%{disk}"} + } + + } + ruby { + code => "event['time'] = (DateTime.parse(event['sar_time']).to_time.to_i ) - 7200" + } +} +output { + + influxdb { + host => "172.20.90.72" + password => "root" + user => "root" + db => "metrics" + allow_time_override => true + time_precision => "s" + series => "%{series_name}" + use_event_fields_for_data_points => true + fields_to_skip => ["@version","@timestamp","type","host","command","sar_time","series_name"] + } + + stdout { codec => rubydebug + workers => 4 + } +} From a805ebd468b10f63811ea3f579f5fc49a8107784 Mon Sep 17 00:00:00 2001 From: Evgeny Gorelik Date: Mon, 23 Feb 2015 13:02:24 +0200 Subject: [PATCH 3/3] Update README.md --- README.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 7ba913c..55e1831 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,8 @@ I added 2 configuration paramters: I doesn't need defferent configuration per data type as in master branch. I'm creating relevant fields names and datatypes on filter stage and just skips the unwanted fields in outputv plugin. - input { +input { + exec { command => "env LANG=C sar -P ALL 1 1|egrep -v '^$|Average|CPU'" type => "system.cpu" @@ -36,24 +37,25 @@ I added 2 configuration paramters: type => "system.memory" interval => 1 } - exec { + exec { command => "env LANG=C sar -pd 1 1|egrep -v '^$|Average|DEV|CPU'" type => "system.disks" interval => 1 + } } + filter { + if [type] == "system.cpu" { - split {} - grok { - match => { "message" => "\A(?%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:cpu}\s+%{NUMBER:user:float}\s+%{NUMBER:nice:float}\s+%{NUMBER:system:float}\s+%{NUMBER:iowait:float}\s+%{NUMBER:steal:float}\s+%{NUMBER:idle:float}\z" } - remove_field => [ "message" ] - add_field => {"series_name" => "%{host}.%{type}.%{cpu}"} - } - ruby { - code => " event['usage'] = (100 - event['idle']).round(2); event['usage-io'] = event['usage'] - event['iowait']" - } + split {} + grok { + match => { "message" => "\A(?%{HOUR}:%{MINUTE}:%{SECOND})\s+%{DATA:cpu}\s+%{NUMBER:user:float}\s+%{NUMBER:nice:float}\s+%{NUMBER:system:float}\s+%{NUMBER:iowait:float}\s+%{NUMBER:steal:float}\s+%{NUMBER:idle:float}\z" } remove_field => [ "message" ] + add_field => {"series_name" => "%{host}.%{type}.%{cpu}"} } + ruby { + code => " event['usage'] = (100 - event['idle']).round(2); event['usage-io'] = event['usage'] - event['iowait']" } + } if [type] == "system.memory" { split {} grok {