Skip to content

Commit

Permalink
fix for some recommendation of golangci-lint
Browse files Browse the repository at this point in the history
Signed-off-by: tsaikd <[email protected]>
  • Loading branch information
tsaikd committed Mar 27, 2020
1 parent 4bff545 commit c29a57e
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 100 deletions.
8 changes: 4 additions & 4 deletions config/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func GetCodecOrDefault(ctx context.Context, raw ConfigRaw) (TypeCodecConfig, err
return nil, err
}
if c == nil {
return DefaultCodecInitHandler(nil, nil)
return DefaultCodecInitHandler(ctx, nil)
}
return c, nil
}
Expand All @@ -82,13 +82,13 @@ func GetCodecDefault(ctx context.Context, raw ConfigRaw, defaultType string) (Ty
return nil, nil
}

switch codecConfig.(type) {
switch cfg := codecConfig.(type) {
case map[string]interface{}:
return getCodec(ctx, ConfigRaw(codecConfig.(map[string]interface{})))
return getCodec(ctx, ConfigRaw(cfg))
case string:
// shorthand codec config method:
// codec: [codecTypeName]
return getCodec(ctx, ConfigRaw{"type": codecConfig.(string)})
return getCodec(ctx, ConfigRaw{"type": cfg})
default:
return nil, ErrorUnknownCodecType1.New(nil, codecConfig)
}
Expand Down
3 changes: 3 additions & 0 deletions config/logevent/logevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func Benchmark_Marshal_JSONEx(b *testing.B) {
}
b.SetBytes(int64(len(d)))
for n := 0; n < b.N; n++ {
//nolint: errcheck
jsonex.Marshal(jsonMap)
}
}
Expand All @@ -290,6 +291,7 @@ func Benchmark_Marshal_JSONIter(b *testing.B) {
}
b.SetBytes(int64(len(d)))
for n := 0; n < b.N; n++ {
//nolint: errcheck
jsoniter.Marshal(jsonMap)
}
}
Expand All @@ -302,6 +304,7 @@ func Benchmark_Marshal_StdJSON(b *testing.B) {
}
b.SetBytes(int64(len(d)))
for n := 0; n < b.N; n++ {
//nolint: errcheck
json.Marshal(jsonMap)
}
}
Expand Down
8 changes: 3 additions & 5 deletions config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,9 @@ func contextWithOSSignal(parent context.Context, logger logutil.LevelLogger, sig
ctx, cancel := context.WithCancel(parent)

go func(cancel context.CancelFunc) {
select {
case sig := <-osSignalChan:
logger.Info(sig)
cancel()
}
sig := <-osSignalChan
logger.Info(sig)
cancel()
}(cancel)

return ctx
Expand Down
2 changes: 1 addition & 1 deletion filter/cond/filtercond.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type EventParameters struct {

// Get obtaining value from event's specified field recursively
func (ep *EventParameters) Get(field string) (interface{}, error) {
if strings.IndexRune(field, '.') < 0 {
if !strings.ContainsRune(field, '.') {
// no nest fields
return ep.Event.Get(field), nil
}
Expand Down
6 changes: 3 additions & 3 deletions filter/urlparam/filterurlparam.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type FilterConfig struct {

// Include param keys, "all_fields" or "*" include all fields
IncludeKeys []string `json:"include_keys"`
includeAll bool `json:"-"`
includeAll bool

// url_decode params, "all_fields" or "*" decode all params values
UrlDecode []string `json:"url_decode"`
decodeAll bool `json:"-"`
decodeAll bool

// prefix for param name, default: request_url_args_
Prefix string `json:"prefix"`
Expand Down Expand Up @@ -76,7 +76,7 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeFilterC
}

if conf.Prefix != "" {
if strings.Index(conf.Prefix, ".") >= 0 {
if strings.Contains(conf.Prefix, ".") {
return nil, fmt.Errorf("prefix can not includ dot(\".\")")
}
}
Expand Down
1 change: 0 additions & 1 deletion input/beats/inputbeats.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,4 @@ func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven
data.ACK()
}
}
return nil
}
7 changes: 2 additions & 5 deletions input/dockerlog/inputdockerlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"regexp"
"time"

"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/tsaikd/KDGoLib/errutil"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/logevent"
Expand Down Expand Up @@ -190,8 +190,5 @@ func (t *InputConfig) isValidContainer(names []string) bool {
}
}

if len(t.includes) > 0 {
return false
}
return true
return len(t.includes) < 1
}
9 changes: 3 additions & 6 deletions input/dockerstats/inputdockerstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"regexp"
"time"

"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/tsaikd/KDGoLib/errutil"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/logevent"
Expand Down Expand Up @@ -90,7 +90,7 @@ func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeInputCo
// copying codec instances is needed to allow codecs to do sequential processing, such as milti-line logs with proper isolation.
conf.Codec, err = config.GetCodecOrDefault(ctx, *raw)

return &conf, nil
return &conf, err
}

// Start wraps the actual function starting the plugin
Expand Down Expand Up @@ -167,8 +167,5 @@ func (t *InputConfig) isValidContainer(names []string) bool {
}
}
}
if len(t.includes) > 0 {
return false
}
return true
return len(t.includes) < 1
}
6 changes: 1 addition & 5 deletions input/dockerstats/logloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@ import (
"strings"
"time"

"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/tsaikd/gogstash/config/logevent"
"github.com/tsaikd/gogstash/input/dockerlog/dockertool"
)

var (
containerMap = map[string]interface{}{}
)

func (t *InputConfig) containerLogLoop(ctx context.Context, container interface{}, since *time.Time, msgChan chan<- logevent.LogEvent) (err error) {
id, name, err := dockertool.GetContainerInfo(container)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions input/exec/inputexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ func (t *InputConfig) exec(msgChan chan<- logevent.LogEvent) {
}

msgChan <- event

return
}

func (t *InputConfig) doExecCommand() (data string, err error) {
Expand Down
2 changes: 0 additions & 2 deletions input/http/inputhttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ func (t *InputConfig) Request(ctx context.Context, msgChan chan<- logevent.LogEv
if err != nil {
goglog.Logger.Errorf("%v", err)
}

return
}

func (t *InputConfig) SendRequest() (data []byte, err error) {
Expand Down
5 changes: 4 additions & 1 deletion input/http/inputhttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func init() {

func TestMain(m *testing.M) {
http.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
rw.Write([]byte("foo"))
_, err := rw.Write([]byte("foo"))
if err != nil {
panic(err)
}
})

go func() {
Expand Down
4 changes: 2 additions & 2 deletions input/httplisten/httplisten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ input:

client := &http.Client{Transport: &transport}

resp, err := client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
_, err = client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
assert.NotNil(err)

// case 2: with correct client cert
tlsConfig.Certificates = []tls.Certificate{clientCert}
resp, err = client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
resp, err := client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
require.NoError(err)
defer resp.Body.Close()
assert.Equal(http.StatusOK, resp.StatusCode)
Expand Down
10 changes: 6 additions & 4 deletions input/kafka/inputkafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package inputkafka

import (
"context"
"github.com/Shopify/sarama"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
"os"
"os/signal"
"sync"
"syscall"

"github.com/Shopify/sarama"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)

// ModuleName is the name used in config file
Expand Down Expand Up @@ -112,6 +113,7 @@ func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven
}

ct, cancel := context.WithCancel(ctx)
defer cancel()
client, err := sarama.NewConsumerGroup(t.Brokers, t.Group, t.saConf)
if err != nil {
goglog.Logger.Errorf("Error creating consumer group client: %v", err)
Expand Down
6 changes: 4 additions & 2 deletions input/redis/inputredis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func Test_input_redis_module_batch(t *testing.T) {
require.NotNil(require)

for i := 0; i < 10; i++ {
s.Lpush("gogstash-test", fmt.Sprintf("{\"@timestamp\":\"%s\",\"message\":\"inputredis test message\"}", timeNow.Format(time.RFC3339Nano)))
_, err := s.Lpush("gogstash-test", fmt.Sprintf("{\"@timestamp\":\"%s\",\"message\":\"inputredis test message\"}", timeNow.Format(time.RFC3339Nano)))
require.NoError(err)
}

ctx := context.Background()
Expand Down Expand Up @@ -82,7 +83,8 @@ func Test_input_redis_module_single(t *testing.T) {
require := require.New(t)
require.NotNil(require)

s.Lpush("gogstash-test", fmt.Sprintf("{\"@timestamp\":\"%s\",\"message\":\"inputredis test message\"}", timeNow.Format(time.RFC3339Nano)))
_, err := s.Lpush("gogstash-test", fmt.Sprintf("{\"@timestamp\":\"%s\",\"message\":\"inputredis test message\"}", timeNow.Format(time.RFC3339Nano)))
require.NoError(err)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand Down
19 changes: 9 additions & 10 deletions input/socket/inputsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (i *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven
defer l.Close()
case "udp":
address, err := net.ResolveUDPAddr(i.Socket, i.Address)
if err != nil {
return err
}
logger.Debugf("listen %q on %q", i.Socket, address.String())
var conn net.PacketConn
if i.ReusePort {
Expand All @@ -126,10 +129,8 @@ func (i *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
select {
case <-ctx.Done():
return l.Close()
}
<-ctx.Done()
return l.Close()
})

eg.Go(func() error {
Expand Down Expand Up @@ -158,12 +159,10 @@ func (i *InputConfig) handleUDP(ctx context.Context, conn net.PacketConn, msgCha
defer pw.Close()

eg.Go(func() error {
select {
case <-ctx.Done():
pr.Close()
conn.Close()
return nil
}
<-ctx.Done()
pr.Close()
conn.Close()
return nil
})

eg.Go(func() error {
Expand Down
71 changes: 30 additions & 41 deletions output/amqp/outputamqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ type OutputConfig struct {
amqpClients map[string]amqpClient
}

type amqpConn struct {
Channel *amqp.Channel
Connection *amqp.Connection
}

// DefaultOutputConfig returns an OutputConfig struct with default values
func DefaultOutputConfig() OutputConfig {
return OutputConfig{
Expand Down Expand Up @@ -159,52 +154,46 @@ func (o *OutputConfig) Output(ctx context.Context, event logevent.LogEvent) (err
}

func (o *OutputConfig) reconnect(url string) {
for {
select {
case poolResponse := <-o.amqpClients[url].reconnect:
// When a reconnect event is received
// start reconnect loop until reconnected
for {
time.Sleep(time.Duration(o.ReconnectDelay) * time.Second)

logrus.Info("Reconnecting to ", poolResponse.Host())

if conn, err := o.getConnection(poolResponse.Host()); err == nil {
if ch, err := conn.Channel(); err == nil {
if err := ch.ExchangeDeclare(
o.Exchange,
o.ExchangeType,
o.ExchangeDurable,
o.ExchangeAutoDelete,
false,
false,
nil,
); err == nil {
logrus.Info("Reconnected to ", poolResponse.Host())
o.amqpClients[poolResponse.Host()] = amqpClient{
client: ch,
reconnect: make(chan hostpool.HostPoolResponse, 1),
}
poolResponse.Mark(nil)
break
for poolResponse := range o.amqpClients[url].reconnect {
// When a reconnect event is received
// start reconnect loop until reconnected
for {
time.Sleep(time.Duration(o.ReconnectDelay) * time.Second)

logrus.Info("Reconnecting to ", poolResponse.Host())

if conn, err := o.getConnection(poolResponse.Host()); err == nil {
if ch, err := conn.Channel(); err == nil {
if err := ch.ExchangeDeclare(
o.Exchange,
o.ExchangeType,
o.ExchangeDurable,
o.ExchangeAutoDelete,
false,
false,
nil,
); err == nil {
logrus.Info("Reconnected to ", poolResponse.Host())
o.amqpClients[poolResponse.Host()] = amqpClient{
client: ch,
reconnect: make(chan hostpool.HostPoolResponse, 1),
}
poolResponse.Mark(nil)
break
}
}

logrus.Info("Failed to reconnect to ", url, ". Waiting ", o.ReconnectDelay, " seconds...")
}

logrus.Info("Failed to reconnect to ", url, ". Waiting ", o.ReconnectDelay, " seconds...")
}
}
}

func (o *OutputConfig) getConnection(url string) (c *amqp.Connection, e error) {
if strings.HasPrefix(url, "amqps") {
cfg := new(tls.Config)
cfg.RootCAs = x509.NewCertPool()

cfg.InsecureSkipVerify = false
if o.TLSSkipVerify == true {
cfg.InsecureSkipVerify = true
cfg := &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: o.TLSSkipVerify,
}

for _, ca := range o.TLSCACerts {
Expand Down
Loading

0 comments on commit c29a57e

Please sign in to comment.