Skip to content

Commit

Permalink
[WIP] Assemble the apps using Docker Compose (closes #23)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jun 21, 2018
1 parent d239c13 commit 84ec347
Show file tree
Hide file tree
Showing 23 changed files with 359 additions and 565 deletions.
9 changes: 0 additions & 9 deletions integration/integration_test.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
#!/bin/bash

sudo service elasticsearch start
sudo service iglu_server_0.3.0 start
sudo service snowplow_stream_collector start
sudo service snowplow_stream_enrich start
sudo service snowplow_elasticsearch_loader_good start
sudo service snowplow_elasticsearch_loader_bad start
sudo service kibana4_init start
sleep 15

# Send good and bad events
COUNTER=0
while [ $COUNTER -lt 10 ]; do
Expand Down
2 changes: 1 addition & 1 deletion provisioning/resources/configs/iglu-resolver.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
],
"connection": {
"http": {
"uri": "http://localhost:8081/api",
"uri": "iglu-server:8081/api",
"apikey": "PLACEHOLDER"
}
}
Expand Down
2 changes: 1 addition & 1 deletion provisioning/resources/configs/iglu-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repo-server {
# 'postgres' contains configuration options for the postgre instance the server
# is using
postgres {
host = "localhost"
host = "postgres"
port = 5432
dbname = "iglu"
username = "snowplow"
Expand Down
73 changes: 2 additions & 71 deletions provisioning/resources/configs/snowplow-es-loader-bad.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -14,93 +14,37 @@
# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.

# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = nsq

# Where to write good and bad records
sink {
# Sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
good = elasticsearch

# Sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "none" for ignoring bad records
bad = none
}

# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = bad

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey: ""
secretKey: ""
}

# config for NSQ
nsq {
# Channel name for NSQ source
channelName = ESLoaderChannelBad

# Host name for NSQ tools
host = "127.0.0.1"

# TCP port for nsqd
host = nsqlookupd
port = 4150

# HTTP port for nsqlookupd
lookupPort = 4161
}

kinesis {
# "LATEST": most recent data.
# "TRIM_HORIZON": oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition= TRIM_HORIZON

# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 1000

# Region where the Kinesis stream is located
region = ""

# "appName" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
appName = ""
}

# Common configuration section for all stream sources
streams {
inStreamName = BadEnrichedEvents

# Stream for enriched events which are rejected by Elasticsearch
outStreamName = BadElasticsearchEvents

# Events are accumulated in a buffer before being sent to Elasticsearch.
# Note: Buffering is not supported by NSQ; will be ignored
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 5242880
recordLimit = 1
Expand All @@ -110,31 +54,18 @@ streams {

elasticsearch {

# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - max-timeout: the maximum attempt time before a client restart
# - ssl: if using the http client, whether to use ssl or not
client {
endpoint = "localhost"
endpoint = elasticsearch
port = 9200
maxTimeout = 10000
ssl = false
}

# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
aws {
signing = false
region = ""
}

# index: the Elasticsearch index name
# type: the Elasticsearch index type
cluster {
name = elasticsearch
index = bad
Expand Down
73 changes: 2 additions & 71 deletions provisioning/resources/configs/snowplow-es-loader-good.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -14,93 +14,37 @@
# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.

# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = nsq

# Where to write good and bad records
sink {
# Sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
good = elasticsearch

# Sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "none" for ignoring bad records
bad = nsq
}

# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = good

# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = ""
secretKey = ""
}

# config for NSQ
nsq {
# Channel name for NSQ source
channelName = ESLoaderChannelGood

# Host name for NSQ tools
host = "127.0.0.1"

# TCP port for nsqd
host = nsqlookupd
port = 4150

# HTTP port for nsqlookupd
lookupPort = 4161
}

kinesis {
# "LATEST": most recent data.
# "TRIM_HORIZON": oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = TRIM_HORIZON

# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 1000

# Region where the Kinesis stream is located
region = ""

# "appName" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
appName = ""
}

# Common configuration section for all stream sources
streams {
inStreamName = EnrichedEvents

# Stream for enriched events which are rejected by Elasticsearch
outStreamName = BadElasticsearchEvents

# Events are accumulated in a buffer before being sent to Elasticsearch.
# Note: Buffering is not supported by NSQ; will be ignored
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit: 5242880
recordLimit: 1
Expand All @@ -110,31 +54,18 @@ streams {

elasticsearch {

# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - max-timeout: the maximum attempt time before a client restart
# - ssl: if using the http client, whether to use ssl or not
client {
endpoint = "localhost"
endpoint = elasticsearch
port = 9200
maxTimeout = 10000
ssl = false
}

# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
aws {
signing = false
region = ""
}

# index: the Elasticsearch index name
# type: the Elasticsearch index type
cluster {
name = "elasticsearch"
index = "good"
Expand Down
54 changes: 12 additions & 42 deletions provisioning/resources/configs/snowplow-stream-collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -18,69 +18,49 @@

# 'collector' contains configuration options for the main Scala collector.
collector {
# The collector runs as a web service specified on the following
# interface and port.
interface = "0.0.0.0"
port = 8080

# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}

# The collector returns a cookie to clients for user identification
# with the following domain and expiration.
crossDomain {
enabled = false
domain = "*"
secure = true
}

cookie {
enabled = true
expiration = "365 days" # e.g. "365 days"
# Network cookie name
name = sp
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
domain = ""
}

# When enabled and the cookie specified above is missing, performs a redirect to itself to check
# if third-party cookies are blocked using the specified name. If they are indeed blocked,
# fallbackNetworkId is used instead of generating a new random one.
cookieBounce {
enabled = false
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
}

redirectMacro {
enabled = false
placeholder = "[TOKEN]"
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = RawEvents

# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = BadRawEvents

# Whether to use the incoming event's ip as the partition key for the good stream/topic
useIpAddressAsPartitionKey = false

# config for NSQ sink
sink {
enabled = nsq

# Host name for NSQ tools
host = "127.0.0.1"

# TCP port for nsqd
host = nsqd
port = 4150
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 4000000
recordLimit = 500 # Not supported by Kafka; will be ignored
Expand All @@ -89,23 +69,13 @@ collector {
}
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loggers = ["akka.event.slf4j.Slf4jLogger"]

# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on

raw-request-uri-header = on

# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
Expand Down
Loading

0 comments on commit 84ec347

Please sign in to comment.