diff --git a/README.md b/README.md index abb4dd3e..717536ff 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,8 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws- + [utc_index](#utc_index) + [target_index_key](#target_index_key) + [target_type_key](#target_type_key) + + [template_name](#template_name) + + [template_file](#template_file) + [request_timeout](#request_timeout) + [reload_connections](#reload_connections) + [reload_on_failure](#reload_on_failure) @@ -224,6 +226,20 @@ and this record will be written to the specified index (`logstash-2014.12.19`) r Similar to `target_index_key` config, find the type name to write to in the record under this key (or nested record). If key not found in record - fallback to `type_name` (default "fluentd"). +### template_name + +The name of the template to define. If a template by the name given is already present, it will be left unchanged. + +This parameter along with template_file allow the plugin to behave similarly to Logstash (it installs a template at creation time) so that raw records are available. See [https://github.com/uken/fluent-plugin-elasticsearch/issues/33](https://github.com/uken/fluent-plugin-elasticsearch/issues/33). + +[template_file](#template_file) must also be specified. + +### template_file + +The path to the file containing the template to install. + +[template_name](#template_name) must also be specified. + ### request_timeout You can specify HTTP request timeout. diff --git a/lib/fluent/plugin/out_elasticsearch.rb b/lib/fluent/plugin/out_elasticsearch.rb index 985f5207..91855208 100644 --- a/lib/fluent/plugin/out_elasticsearch.rb +++ b/lib/fluent/plugin/out_elasticsearch.rb @@ -2,12 +2,15 @@ require 'date' require 'excon' require 'elasticsearch' +require 'json' require 'uri' begin require 'strptime' rescue LoadError end +require_relative 'out_elasticsearch_template' + class Fluent::ElasticsearchOutput < Fluent::BufferedOutput class ConnectionFailure < StandardError; end @@ -49,8 +52,11 @@ class ConnectionFailure < StandardError; end config_param :remove_keys_on_update_key, :string, :default => nil config_param :flatten_hashes, :bool, :default => false config_param :flatten_hashes_separator, :string, :default => "_" + config_param :template_name, :string, :default => nil + config_param :template_file, :string, :default => nil include Fluent::SetTagKeyMixin + include Fluent::ElasticsearchOutputTemplate config_set_default :include_tag_key, false def initialize @@ -77,6 +83,10 @@ def configure(conf) if @remove_keys_on_update && @remove_keys_on_update.is_a?(String) @remove_keys_on_update = @remove_keys_on_update.split ',' end + + if @template_name && @template_file + template_install(@template_name, @template_file) + end end def start diff --git a/lib/fluent/plugin/out_elasticsearch_template.rb b/lib/fluent/plugin/out_elasticsearch_template.rb new file mode 100644 index 00000000..e98e5498 --- /dev/null +++ b/lib/fluent/plugin/out_elasticsearch_template.rb @@ -0,0 +1,31 @@ +module Fluent::ElasticsearchOutputTemplate + + def get_template(template_file) + if !File.exists?(template_file) + raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!" + end + file_contents = IO.read(template_file).gsub(/\n/,'') + JSON.parse(file_contents) + end + + def template_exists?(name) + client.indices.get_template(:name => name) + return true + rescue Elasticsearch::Transport::Transport::Errors::NotFound + return false + end + + def template_put(name, template) + client.indices.put_template(:name => name, :body => template) + end + + def template_install(name, template_file) + if !template_exists?(name) + template_put(name, get_template(template_file)) + log.info("Template configured, but no template installed. Installed '#{name}' from #{template_file}.") + else + log.info("Template configured and already installed.") + end + end + +end diff --git a/test/plugin/test_out_elasticsearch.rb b/test/plugin/test_out_elasticsearch.rb index d9707833..59e30c48 100644 --- a/test/plugin/test_out_elasticsearch.rb +++ b/test/plugin/test_out_elasticsearch.rb @@ -65,6 +65,80 @@ def test_configure assert_equal 'doe', instance.password end + def test_template_already_present + config = %{ + host logs.google.com + port 777 + scheme https + path /es/ + user john + password doe + template_name logstash + template_file /abc123 + } + + # connection start + stub_request(:head, "https://john:doe@logs.google.com:777/es//"). + to_return(:status => 200, :body => "", :headers => {}) + # check if template exists + stub_request(:get, "https://john:doe@logs.google.com:777/es//_template/logstash"). + to_return(:status => 200, :body => "", :headers => {}) + + driver('test', config) + end + + def test_template_create + cwd = File.dirname(__FILE__) + template_file = File.join(cwd, 'test_template.json') + + config = %{ + host logs.google.com + port 777 + scheme https + path /es/ + user john + password doe + template_name logstash + template_file #{template_file} + } + + # connection start + stub_request(:head, "https://john:doe@logs.google.com:777/es//"). + to_return(:status => 200, :body => "", :headers => {}) + # check if template exists + stub_request(:get, "https://john:doe@logs.google.com:777/es//_template/logstash"). + to_return(:status => 404, :body => "", :headers => {}) + # creation + stub_request(:put, "https://john:doe@logs.google.com:777/es//_template/logstash"). + to_return(:status => 200, :body => "", :headers => {}) + + driver('test', config) + end + + def test_template_create_invalid_filename + config = %{ + host logs.google.com + port 777 + scheme https + path /es/ + user john + password doe + template_name logstash + template_file /abc123 + } + + # connection start + stub_request(:head, "https://john:doe@logs.google.com:777/es//"). + to_return(:status => 200, :body => "", :headers => {}) + # check if template exists + stub_request(:get, "https://john:doe@logs.google.com:777/es//_template/logstash"). + to_return(:status => 404, :body => "", :headers => {}) + + assert_raise(RuntimeError) { + driver('test', config) + } + end + def test_legacy_hosts_list config = %{ hosts host1:50,host2:100,host3 diff --git a/test/plugin/test_template.json b/test/plugin/test_template.json new file mode 100644 index 00000000..4ab6e442 --- /dev/null +++ b/test/plugin/test_template.json @@ -0,0 +1,23 @@ +{ + "template": "te*", + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "type1": { + "_source": { + "enabled": false + }, + "properties": { + "host_name": { + "type": "string", + "index": "not_analyzed" + }, + "created_at": { + "type": "date", + "format": "EEE MMM dd HH:mm:ss Z YYYY" + } + } + } + } +}