Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT: add integration test #16143

Merged
merged 9 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
- KAFKA_PORT=9092
- KIBANA_HOST=kibana
- KIBANA_PORT=5601
- MOSQUITTO_HOST=mosquitto
- MOSQUITTO_PORT=1883
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -31,6 +33,7 @@ services:
elasticsearch: { condition: service_healthy }
kafka: { condition: service_healthy }
kibana: { condition: service_healthy }
mosquitto: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
Expand All @@ -51,5 +54,10 @@ services:
file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml
service: kibana

mosquitto:
build: ${ES_BEATS}/testing/environments/docker/mosquitto
expose:
- 1883

redis:
build: ${PWD}/input/redis/_meta
23 changes: 16 additions & 7 deletions filebeat/input/mqtt/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ const (
subscribeRetryInterval = 1 * time.Second
)

var (
newMqttClient = libmqtt.NewClient
newBackoff = backoff.NewEqualJitterBackoff
)

// Input contains the input and its config
type mqttInput struct {
once sync.Once
Expand All @@ -67,6 +62,16 @@ func NewInput(
cfg *common.Config,
connector channel.Connector,
inputContext input.Context,
) (input.Input, error) {
return newInput(cfg, connector, inputContext, libmqtt.NewClient, backoff.NewEqualJitterBackoff)
}

func newInput(
cfg *common.Config,
connector channel.Connector,
inputContext input.Context,
newMqttClient func(options *libmqtt.ClientOptions) libmqtt.Client,
newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff,
) (input.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -88,7 +93,7 @@ func NewInput(
inflightMessages := new(sync.WaitGroup)
clientSubscriptions := createClientSubscriptions(config)
onMessageHandler := createOnMessageHandler(logger, out, inflightMessages)
onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions)
onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions, newBackoff)
clientOptions, err := createClientOptions(config, onConnectHandler)
if err != nil {
return nil, err
Expand Down Expand Up @@ -127,7 +132,11 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig
}
}

func createOnConnectHandler(logger *logp.Logger, inputContext *input.Context, onMessageHandler func(client libmqtt.Client, message libmqtt.Message), clientSubscriptions map[string]byte) func(client libmqtt.Client) {
func createOnConnectHandler(logger *logp.Logger,
inputContext *input.Context,
onMessageHandler func(client libmqtt.Client, message libmqtt.Message),
clientSubscriptions map[string]byte,
newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff) func(client libmqtt.Client) {
// The function subscribes the client to the specific topics (with retry backoff in case of failure).
return func(client libmqtt.Client) {
backoff := newBackoff(
Expand Down
36 changes: 18 additions & 18 deletions filebeat/input/mqtt/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

var (
logger = logp.NewLogger("test")
)
var logger = logp.NewLogger("test")

func TestNewInput_MissingConfigField(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
Expand Down Expand Up @@ -75,6 +73,8 @@ func TestNewInput_Run(t *testing.T) {
})

eventsCh := make(chan beat.Event)
defer close(eventsCh)

outlet := &mockedOutleter{
onEventHandler: func(event beat.Event) bool {
eventsCh <- event
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNewInput_Run(t *testing.T) {
}

var client *mockedClient
newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client {
newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client {
client = &mockedClient{
onConnectHandler: o.OnConnect,
messages: []mockedMessage{firstMessage, secondMessage},
Expand All @@ -115,7 +115,7 @@ func TestNewInput_Run(t *testing.T) {
return client
}

input, err := NewInput(config, connector, inputContext)
input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff)
require.NoError(t, err)
require.NotNil(t, input)

Expand All @@ -137,7 +137,7 @@ func TestNewInput_Run(t *testing.T) {
}
}

func TestNewInput_Run_Stop(t *testing.T) {
func TestNewInput_Run_Wait(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
"hosts": "tcp://mocked:1234",
"topics": []string{"first", "second"},
Expand All @@ -148,7 +148,10 @@ func TestNewInput_Run_Stop(t *testing.T) {

var eventProcessing sync.WaitGroup
eventProcessing.Add(numMessages)

eventsCh := make(chan beat.Event)
defer close(eventsCh)

outlet := &mockedOutleter{
onEventHandler: func(event beat.Event) bool {
eventProcessing.Done()
Expand All @@ -174,7 +177,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
}

var client *mockedClient
newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client {
newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client {
client = &mockedClient{
onConnectHandler: o.OnConnect,
messages: messages,
Expand All @@ -185,7 +188,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
return client
}

input, err := NewInput(config, connector, inputContext)
input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff)
require.NoError(t, err)
require.NotNil(t, input)

Expand All @@ -198,7 +201,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
}
}()

input.Stop()
input.Wait()
}

func TestRun_Once(t *testing.T) {
Expand Down Expand Up @@ -254,11 +257,10 @@ func TestOnCreateHandler_SubscribeMultiple_Succeeded(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond)
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand All @@ -274,11 +276,10 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSucceeded(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond)
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand All @@ -296,14 +297,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

mockedBackoff := &mockedBackoff{
waits: []bool{true, false},
}
newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return mockedBackoff
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand Down
170 changes: 170 additions & 0 deletions filebeat/input/mqtt/mqtt_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package mqtt

import (
"fmt"
"os"
"sync"
"testing"
"time"

libmqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

const (
message = "hello-world"

waitTimeout = 30 * time.Second
)

var (
hostPort = fmt.Sprintf("tcp://%s:%s",
getOrDefault(os.Getenv("MOSQUITTO_HOST"), "mosquitto"),
getOrDefault(os.Getenv("MOSQUITTO_PORT"), "1883"))
topic = fmt.Sprintf("topic-%d", time.Now().UnixNano())
)

type eventCaptor struct {
c chan struct{}
closeOnce sync.Once
closed bool
events chan beat.Event
}

func newEventCaptor(events chan beat.Event) channel.Outleter {
return &eventCaptor{
c: make(chan struct{}),
events: events,
}
}

func (ec *eventCaptor) OnEvent(event beat.Event) bool {
ec.events <- event
return true
}

func (ec *eventCaptor) Close() error {
ec.closeOnce.Do(func() {
ec.closed = true
close(ec.c)
})
return nil
}

func (ec *eventCaptor) Done() <-chan struct{} {
return ec.c
}

func TestInput(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt"))

// Setup the input config.
config := common.MustNewConfigFrom(common.MapStr{
"hosts": []string{hostPort},
"topics": []string{topic},
})

// Route input events through our captor instead of sending through ES.
eventsCh := make(chan beat.Event)
defer close(eventsCh)

captor := newEventCaptor(eventsCh)
defer captor.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(captor), nil
})

// Mock the context.
inputContext := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Setup the input
input, err := NewInput(config, connector, inputContext)
require.NoError(t, err)
require.NotNil(t, input)

// Run the input.
input.Run()

// Create Publisher
publisher := createPublisher(t)

// Verify that event has been received
verifiedCh := make(chan struct{})
defer close(verifiedCh)

emitInputData(t, verifiedCh, publisher)

event := <-eventsCh
verifiedCh <- struct{}{}

val, err := event.GetValue("message")
require.NoError(t, err)
require.Equal(t, message, val)
}

func createPublisher(t *testing.T) libmqtt.Client {
clientOptions := libmqtt.NewClientOptions().
SetClientID("emitter").
SetAutoReconnect(false).
SetConnectRetry(false).
AddBroker(hostPort)
client := libmqtt.NewClient(clientOptions)
token := client.Connect()
require.True(t, token.WaitTimeout(waitTimeout))
require.NoError(t, token.Error())
return client
}

func emitInputData(t *testing.T, verifiedCh <-chan struct{}, publisher libmqtt.Client) {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-verifiedCh:
return
case <-ticker.C:
token := publisher.Publish(topic, 1, false, []byte(message))
require.True(t, token.WaitTimeout(waitTimeout))
require.NoError(t, token.Error())
}
}
}()
}

func getOrDefault(s, defaultString string) string {
if s == "" {
return defaultString
}
return s
}
2 changes: 2 additions & 0 deletions testing/environments/docker/mosquitto/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM eclipse-mosquitto:1.6.8
HEALTHCHECK --interval=1s --retries=600 CMD nc -z localhost 1883