Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow specification of a index template #194

Merged
merged 1 commit into from
Sep 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws-
+ [utc_index](#utc_index)
+ [target_index_key](#target_index_key)
+ [target_type_key](#target_type_key)
+ [template_name](#template_name)
+ [template_file](#template_file)
+ [request_timeout](#request_timeout)
+ [reload_connections](#reload_connections)
+ [reload_on_failure](#reload_on_failure)
Expand Down Expand Up @@ -224,6 +226,20 @@ and this record will be written to the specified index (`logstash-2014.12.19`) r

Similar to `target_index_key` config, find the type name to write to in the record under this key (or nested record). If key not found in record - fallback to `type_name` (default "fluentd").

### template_name

The name of the template to define. If a template by the name given is already present, it will be left unchanged.

This parameter along with template_file allow the plugin to behave similarly to Logstash (it installs a template at creation time) so that raw records are available. See [https://github.com/uken/fluent-plugin-elasticsearch/issues/33](https://github.com/uken/fluent-plugin-elasticsearch/issues/33).

[template_file](#template_file) must also be specified.

### template_file

The path to the file containing the template to install.

[template_name](#template_name) must also be specified.

### request_timeout

You can specify HTTP request timeout.
Expand Down
10 changes: 10 additions & 0 deletions lib/fluent/plugin/out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
require 'date'
require 'excon'
require 'elasticsearch'
require 'json'
require 'uri'
begin
require 'strptime'
rescue LoadError
end

require_relative 'out_elasticsearch_template'

class Fluent::ElasticsearchOutput < Fluent::BufferedOutput
class ConnectionFailure < StandardError; end

Expand Down Expand Up @@ -49,8 +52,11 @@ class ConnectionFailure < StandardError; end
config_param :remove_keys_on_update_key, :string, :default => nil
config_param :flatten_hashes, :bool, :default => false
config_param :flatten_hashes_separator, :string, :default => "_"
config_param :template_name, :string, :default => nil
config_param :template_file, :string, :default => nil

include Fluent::SetTagKeyMixin
include Fluent::ElasticsearchOutputTemplate
config_set_default :include_tag_key, false

def initialize
Expand All @@ -77,6 +83,10 @@ def configure(conf)
if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
@remove_keys_on_update = @remove_keys_on_update.split ','
end

if @template_name && @template_file
template_install(@template_name, @template_file)
end
end

def start
Expand Down
31 changes: 31 additions & 0 deletions lib/fluent/plugin/out_elasticsearch_template.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module Fluent::ElasticsearchOutputTemplate

def get_template(template_file)
if !File.exists?(template_file)
raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!"
end
file_contents = IO.read(template_file).gsub(/\n/,'')
JSON.parse(file_contents)
end

def template_exists?(name)
client.indices.get_template(:name => name)
return true
rescue Elasticsearch::Transport::Transport::Errors::NotFound
return false
end

def template_put(name, template)
client.indices.put_template(:name => name, :body => template)
end

def template_install(name, template_file)
if !template_exists?(name)
template_put(name, get_template(template_file))
log.info("Template configured, but no template installed. Installed '#{name}' from #{template_file}.")
else
log.info("Template configured and already installed.")
end
end

end
74 changes: 74 additions & 0 deletions test/plugin/test_out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,80 @@ def test_configure
assert_equal 'doe', instance.password
end

def test_template_already_present
config = %{
host logs.google.com
port 777
scheme https
path /es/
user john
password doe
template_name logstash
template_file /abc123
}

# connection start
stub_request(:head, "https://john:[email protected]:777/es//").
to_return(:status => 200, :body => "", :headers => {})
# check if template exists
stub_request(:get, "https://john:[email protected]:777/es//_template/logstash").
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)
end

def test_template_create
cwd = File.dirname(__FILE__)
template_file = File.join(cwd, 'test_template.json')

config = %{
host logs.google.com
port 777
scheme https
path /es/
user john
password doe
template_name logstash
template_file #{template_file}
}

# connection start
stub_request(:head, "https://john:[email protected]:777/es//").
to_return(:status => 200, :body => "", :headers => {})
# check if template exists
stub_request(:get, "https://john:[email protected]:777/es//_template/logstash").
to_return(:status => 404, :body => "", :headers => {})
# creation
stub_request(:put, "https://john:[email protected]:777/es//_template/logstash").
to_return(:status => 200, :body => "", :headers => {})

driver('test', config)
end

def test_template_create_invalid_filename
config = %{
host logs.google.com
port 777
scheme https
path /es/
user john
password doe
template_name logstash
template_file /abc123
}

# connection start
stub_request(:head, "https://john:[email protected]:777/es//").
to_return(:status => 200, :body => "", :headers => {})
# check if template exists
stub_request(:get, "https://john:[email protected]:777/es//_template/logstash").
to_return(:status => 404, :body => "", :headers => {})

assert_raise(RuntimeError) {
driver('test', config)
}
end

def test_legacy_hosts_list
config = %{
hosts host1:50,host2:100,host3
Expand Down
23 changes: 23 additions & 0 deletions test/plugin/test_template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"template": "te*",
"settings": {
"number_of_shards": 1
},
"mappings": {
"type1": {
"_source": {
"enabled": false
},
"properties": {
"host_name": {
"type": "string",
"index": "not_analyzed"
},
"created_at": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z YYYY"
}
}
}
}
}