Skip to content

Commit

Permalink
Switch to using NSQ rather than named pipes (closes #24)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldemirenes authored and BenFradet committed Dec 5, 2017
1 parent 7ffa31f commit 1e9e85d
Show file tree
Hide file tree
Showing 19 changed files with 774 additions and 426 deletions.
12 changes: 6 additions & 6 deletions integration/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

sudo service elasticsearch start
sudo service iglu_server_0.2.0 start
sudo service snowplow_stream_collector_0.9.0 start
sudo service snowplow_stream_enrich_0.10.0 start
sudo service snowplow_elasticsearch_sink_good_0.8.0 start
sudo service snowplow_elasticsearch_sink_bad_0.8.0 start
sudo service snowplow_stream_collector start
sudo service snowplow_stream_enrich start
sudo service snowplow_elasticsearch_loader_good start
sudo service snowplow_elasticsearch_loader_bad start
sudo service kibana4_init start
sudo service nginx start
sleep 15
Expand All @@ -17,7 +17,7 @@ while [ $COUNTER -lt 10 ]; do
curl http://localhost:8080/i
let COUNTER=COUNTER+1
done
sleep 5
sleep 60

# Assertions
good_count="$(curl --silent -XGET 'http://localhost:9200/good/good/_count' | python -c 'import json,sys;obj=json.load(sys.stdin);print obj["count"]')"
Expand All @@ -27,7 +27,7 @@ echo "Event Counts:"
echo " - Good: ${good_count}"
echo " - Bad: ${bad_count}"

if [[ "${good_count}" -eq "10" ]] && [[ "${bad_count}" -eq "11" ]]; then
if [[ "${good_count}" -eq "10" ]] && [[ "${bad_count}" -eq "10" ]]; then
exit 0
else
exit 1
Expand Down
107 changes: 0 additions & 107 deletions provisioning/resources/configs/snowplow-elasticsearch-sink-bad.hocon

This file was deleted.

107 changes: 0 additions & 107 deletions provisioning/resources/configs/snowplow-elasticsearch-sink-good.hocon

This file was deleted.

143 changes: 143 additions & 0 deletions provisioning/resources/configs/snowplow-es-loader-bad.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright (c) 2014-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.

# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = nsq

# Where to write good and bad records
sink {
# Sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
good = elasticsearch

# Sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "none" for ignoring bad records
bad = none
}

# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = bad

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey: ""
secretKey: ""
}

# config for NSQ
nsq {
# Channel name for NSQ source
channelName = ESLoaderChannelBad

# Host name for NSQ tools
host = "127.0.0.1"

# TCP port for nsqd
port = 4150

# HTTP port for nsqlookupd
lookupPort = 4161
}

kinesis {
# "LATEST": most recent data.
# "TRIM_HORIZON": oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition= TRIM_HORIZON

# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 1000

# Region where the Kinesis stream is located
region = ""

# "appName" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
appName = ""
}

# Common configuration section for all stream sources
streams {
inStreamName = BadEnrichedEvents

# Stream for enriched events which are rejected by Elasticsearch
outStreamName = BadElasticsearchEvents

# Events are accumulated in a buffer before being sent to Elasticsearch.
# Note: Buffering is not supported by NSQ; will be ignored
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 5242880
recordLimit = 1
timeLimit = 60000
}
}

elasticsearch {

# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - max-timeout: the maximum attempt time before a client restart
# - ssl: if using the http client, whether to use ssl or not
client {
endpoint = "localhost"
port = 9200
maxTimeout = 10000
ssl = false
}

# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
aws {
signing = false
region = ""
}

# index: the Elasticsearch index name
# type: the Elasticsearch index type
cluster {
name = elasticsearch
index = bad
clusterType = bad
}
}
Loading

0 comments on commit 1e9e85d

Please sign in to comment.