From 3ab37020bd0ca0ebd06e8ec81ee5ba0fb5ed2be2 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Fri, 7 Feb 2020 21:46:26 +0100 Subject: [PATCH] MQTT: add integration test (#16143) * Create mosquitto image * MQTT input: add integration test * Fix * Verify connectivity * Fix * Fix: mage check * Fix * Fix * Fix: remove global var --- filebeat/docker-compose.yml | 8 + filebeat/input/mqtt/input.go | 23 ++- filebeat/input/mqtt/input_test.go | 36 ++-- filebeat/input/mqtt/mqtt_integration_test.go | 170 ++++++++++++++++++ .../environments/docker/mosquitto/Dockerfile | 2 + 5 files changed, 214 insertions(+), 25 deletions(-) create mode 100644 filebeat/input/mqtt/mqtt_integration_test.go create mode 100644 testing/environments/docker/mosquitto/Dockerfile diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index 52437a78a8d..19302ae1e6f 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -16,6 +16,8 @@ services: - KAFKA_PORT=9092 - KIBANA_HOST=kibana - KIBANA_PORT=5601 + - MOSQUITTO_HOST=mosquitto + - MOSQUITTO_PORT=1883 working_dir: /go/src/github.com/elastic/beats/filebeat volumes: - ${PWD}/..:/go/src/github.com/elastic/beats/ @@ -31,6 +33,7 @@ services: elasticsearch: { condition: service_healthy } kafka: { condition: service_healthy } kibana: { condition: service_healthy } + mosquitto: { condition: service_healthy } redis: { condition: service_healthy } elasticsearch: @@ -51,5 +54,10 @@ services: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: kibana + mosquitto: + build: ${ES_BEATS}/testing/environments/docker/mosquitto + expose: + - 1883 + redis: build: ${PWD}/input/redis/_meta diff --git a/filebeat/input/mqtt/input.go b/filebeat/input/mqtt/input.go index a3e00338cb8..945c1fe8de5 100644 --- a/filebeat/input/mqtt/input.go +++ b/filebeat/input/mqtt/input.go @@ -40,11 +40,6 @@ const ( subscribeRetryInterval = 1 * time.Second ) -var ( - newMqttClient = libmqtt.NewClient - newBackoff = backoff.NewEqualJitterBackoff -) - // Input contains the input and its config type mqttInput struct { once sync.Once @@ -67,6 +62,16 @@ func NewInput( cfg *common.Config, connector channel.Connector, inputContext input.Context, +) (input.Input, error) { + return newInput(cfg, connector, inputContext, libmqtt.NewClient, backoff.NewEqualJitterBackoff) +} + +func newInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, + newMqttClient func(options *libmqtt.ClientOptions) libmqtt.Client, + newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff, ) (input.Input, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { @@ -88,7 +93,7 @@ func NewInput( inflightMessages := new(sync.WaitGroup) clientSubscriptions := createClientSubscriptions(config) onMessageHandler := createOnMessageHandler(logger, out, inflightMessages) - onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions) + onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions, newBackoff) clientOptions, err := createClientOptions(config, onConnectHandler) if err != nil { return nil, err @@ -127,7 +132,11 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig } } -func createOnConnectHandler(logger *logp.Logger, inputContext *input.Context, onMessageHandler func(client libmqtt.Client, message libmqtt.Message), clientSubscriptions map[string]byte) func(client libmqtt.Client) { +func createOnConnectHandler(logger *logp.Logger, + inputContext *input.Context, + onMessageHandler func(client libmqtt.Client, message libmqtt.Message), + clientSubscriptions map[string]byte, + newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff) func(client libmqtt.Client) { // The function subscribes the client to the specific topics (with retry backoff in case of failure). return func(client libmqtt.Client) { backoff := newBackoff( diff --git a/filebeat/input/mqtt/input_test.go b/filebeat/input/mqtt/input_test.go index 9261e40b468..99413bc5cf7 100644 --- a/filebeat/input/mqtt/input_test.go +++ b/filebeat/input/mqtt/input_test.go @@ -33,9 +33,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -var ( - logger = logp.NewLogger("test") -) +var logger = logp.NewLogger("test") func TestNewInput_MissingConfigField(t *testing.T) { config := common.MustNewConfigFrom(common.MapStr{ @@ -75,6 +73,8 @@ func TestNewInput_Run(t *testing.T) { }) eventsCh := make(chan beat.Event) + defer close(eventsCh) + outlet := &mockedOutleter{ onEventHandler: func(event beat.Event) bool { eventsCh <- event @@ -104,7 +104,7 @@ func TestNewInput_Run(t *testing.T) { } var client *mockedClient - newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client { + newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client { client = &mockedClient{ onConnectHandler: o.OnConnect, messages: []mockedMessage{firstMessage, secondMessage}, @@ -115,7 +115,7 @@ func TestNewInput_Run(t *testing.T) { return client } - input, err := NewInput(config, connector, inputContext) + input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff) require.NoError(t, err) require.NotNil(t, input) @@ -137,7 +137,7 @@ func TestNewInput_Run(t *testing.T) { } } -func TestNewInput_Run_Stop(t *testing.T) { +func TestNewInput_Run_Wait(t *testing.T) { config := common.MustNewConfigFrom(common.MapStr{ "hosts": "tcp://mocked:1234", "topics": []string{"first", "second"}, @@ -148,7 +148,10 @@ func TestNewInput_Run_Stop(t *testing.T) { var eventProcessing sync.WaitGroup eventProcessing.Add(numMessages) + eventsCh := make(chan beat.Event) + defer close(eventsCh) + outlet := &mockedOutleter{ onEventHandler: func(event beat.Event) bool { eventProcessing.Done() @@ -174,7 +177,7 @@ func TestNewInput_Run_Stop(t *testing.T) { } var client *mockedClient - newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client { + newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client { client = &mockedClient{ onConnectHandler: o.OnConnect, messages: messages, @@ -185,7 +188,7 @@ func TestNewInput_Run_Stop(t *testing.T) { return client } - input, err := NewInput(config, connector, inputContext) + input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff) require.NoError(t, err) require.NotNil(t, input) @@ -198,7 +201,7 @@ func TestNewInput_Run_Stop(t *testing.T) { } }() - input.Stop() + input.Wait() } func TestRun_Once(t *testing.T) { @@ -254,11 +257,10 @@ func TestOnCreateHandler_SubscribeMultiple_Succeeded(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond) } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ @@ -274,11 +276,10 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSucceeded(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond) } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ @@ -296,14 +297,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - mockedBackoff := &mockedBackoff{ waits: []bool{true, false}, } - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return mockedBackoff } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go new file mode 100644 index 00000000000..3fd3506cbf4 --- /dev/null +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -0,0 +1,170 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package mqtt + +import ( + "fmt" + "os" + "sync" + "testing" + "time" + + libmqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +const ( + message = "hello-world" + + waitTimeout = 30 * time.Second +) + +var ( + hostPort = fmt.Sprintf("tcp://%s:%s", + getOrDefault(os.Getenv("MOSQUITTO_HOST"), "mosquitto"), + getOrDefault(os.Getenv("MOSQUITTO_PORT"), "1883")) + topic = fmt.Sprintf("topic-%d", time.Now().UnixNano()) +) + +type eventCaptor struct { + c chan struct{} + closeOnce sync.Once + closed bool + events chan beat.Event +} + +func newEventCaptor(events chan beat.Event) channel.Outleter { + return &eventCaptor{ + c: make(chan struct{}), + events: events, + } +} + +func (ec *eventCaptor) OnEvent(event beat.Event) bool { + ec.events <- event + return true +} + +func (ec *eventCaptor) Close() error { + ec.closeOnce.Do(func() { + ec.closed = true + close(ec.c) + }) + return nil +} + +func (ec *eventCaptor) Done() <-chan struct{} { + return ec.c +} + +func TestInput(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt")) + + // Setup the input config. + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": []string{hostPort}, + "topics": []string{topic}, + }) + + // Route input events through our captor instead of sending through ES. + eventsCh := make(chan beat.Event) + defer close(eventsCh) + + captor := newEventCaptor(eventsCh) + defer captor.Close() + + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(captor), nil + }) + + // Mock the context. + inputContext := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Setup the input + input, err := NewInput(config, connector, inputContext) + require.NoError(t, err) + require.NotNil(t, input) + + // Run the input. + input.Run() + + // Create Publisher + publisher := createPublisher(t) + + // Verify that event has been received + verifiedCh := make(chan struct{}) + defer close(verifiedCh) + + emitInputData(t, verifiedCh, publisher) + + event := <-eventsCh + verifiedCh <- struct{}{} + + val, err := event.GetValue("message") + require.NoError(t, err) + require.Equal(t, message, val) +} + +func createPublisher(t *testing.T) libmqtt.Client { + clientOptions := libmqtt.NewClientOptions(). + SetClientID("emitter"). + SetAutoReconnect(false). + SetConnectRetry(false). + AddBroker(hostPort) + client := libmqtt.NewClient(clientOptions) + token := client.Connect() + require.True(t, token.WaitTimeout(waitTimeout)) + require.NoError(t, token.Error()) + return client +} + +func emitInputData(t *testing.T, verifiedCh <-chan struct{}, publisher libmqtt.Client) { + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-verifiedCh: + return + case <-ticker.C: + token := publisher.Publish(topic, 1, false, []byte(message)) + require.True(t, token.WaitTimeout(waitTimeout)) + require.NoError(t, token.Error()) + } + } + }() +} + +func getOrDefault(s, defaultString string) string { + if s == "" { + return defaultString + } + return s +} diff --git a/testing/environments/docker/mosquitto/Dockerfile b/testing/environments/docker/mosquitto/Dockerfile new file mode 100644 index 00000000000..eac5d1e0d6c --- /dev/null +++ b/testing/environments/docker/mosquitto/Dockerfile @@ -0,0 +1,2 @@ +FROM eclipse-mosquitto:1.6.8 +HEALTHCHECK --interval=1s --retries=600 CMD nc -z localhost 1883