Skip to content

Commit

Permalink
Add NamedPipe input operator (#28841)
Browse files Browse the repository at this point in the history
**Description:** 

This is a namedpipe input operator, which will read from a named pipe
and send the data to the pipeline. It pretty closely mimics the file
input operator, but with a few differences.

In particular, named pipes have an interesting property that they
receive EOFs when a writer closes the pipe, but that _doesn't_ mean that
the pipe is closed. To solve this issue, we crib from existing `tail -f`
implementations and use an inotify watcher to detect whenever the pipe
receives new data, and then read it using the standard `bufio.Scanner`
reader.

**Link to tracking Issue:** #27234

**Testing:**

We add a couple of tests for the new operator. The first tests simply
the creation of the named pipe - checking that it's created as a pipe,
with the right permissions. The second goes further by inserting logs
over several different `Open`s into the pipe, testing that the logs are
read, and that the operator is able to handle the named pipe behavior of
skipping over EOFs.

**Documentation:**

None, at the moment

/cc @djaglowski

---------

Signed-off-by: sinkingpoint <[email protected]>
  • Loading branch information
sinkingpoint authored Nov 15, 2023
1 parent bc25618 commit 4e8e527
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/namedpipe_operator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add "namedpipe" operator.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27234]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/antonmedv/expr v1.15.3
github.com/bmatcuk/doublestar/v4 v4.6.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/fsnotify/fsnotify v1.7.0
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
Expand All @@ -32,7 +33,6 @@ require (
require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
Expand Down
209 changes: 209 additions & 0 deletions pkg/stanza/operator/input/namedpipe/namedpipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build linux
// +build linux

package namedpipe // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/namedpipe"

import (
"bufio"
"context"
"fmt"
"os"
"sync"

"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
operatorType = "namedpipe"
DefaultMaxLogSize = 1024 * 1024
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfig creates a new stdin input config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
}
}

// Config is the configuration of a stdin input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
BaseConfig `mapstructure:",squash"`
}

type BaseConfig struct {
Path string `mapstructure:"path"`
Permissions uint32 `mapstructure:"mode"`
Encoding string `mapstructure:"encoding"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
MaxLogSize int `mapstructure:"max_log_size"`
}

// Build will build a namedpipe input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to lookup encoding %q: %w", c.Encoding, err)
}

splitFunc, err := c.SplitConfig.Func(enc, true, DefaultMaxLogSize)
if err != nil {
return nil, fmt.Errorf("failed to create split function: %w", err)
}

maxLogSize := c.MaxLogSize
if maxLogSize == 0 {
maxLogSize = DefaultMaxLogSize
}

return &Input{
InputOperator: inputOperator,

buffer: make([]byte, maxLogSize),
path: c.Path,
permissions: c.Permissions,
splitFunc: splitFunc,
trimFunc: c.TrimConfig.Func(),
}, nil
}

type Input struct {
helper.InputOperator

buffer []byte
path string
permissions uint32
splitFunc bufio.SplitFunc
trimFunc trim.Func
cancel context.CancelFunc
pipe *os.File
wg sync.WaitGroup
}

func (n *Input) Start(_ operator.Persister) error {
stat, err := os.Stat(n.path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to stat named pipe: %w", err)
}

if !os.IsNotExist(err) && stat.Mode()&os.ModeNamedPipe == 0 {
return fmt.Errorf("path %s is not a named pipe", n.path)
}

if os.IsNotExist(err) {
if fifoErr := unix.Mkfifo(n.path, n.permissions); fifoErr != nil {
return fmt.Errorf("failed to create named pipe: %w", fifoErr)
}
}

// chmod the named pipe because mkfifo respects the umask which may result
// in a named pipe with incorrect permissions.
if chmodErr := os.Chmod(n.path, os.FileMode(n.permissions)); chmodErr != nil {
return fmt.Errorf("failed to chmod named pipe: %w", chmodErr)
}

watcher, err := NewWatcher(n.path)
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}

pipe, err := os.OpenFile(n.path, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
return fmt.Errorf("failed to open named pipe: %w", err)
}

n.pipe = pipe

ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel

n.wg.Add(2)
go func() {
defer n.wg.Done()
if err := watcher.Watch(ctx); err != nil {
n.Logger().Errorw("failed to watch named pipe", zap.Error(err))
}
}()

go func() {
defer n.wg.Done()
for {
select {
case <-watcher.C:
if err := n.process(ctx, pipe); err != nil {
n.Logger().Errorw("failed to process named pipe", zap.Error(err))
}
case <-ctx.Done():
return
}
}
}()

return nil
}

func (n *Input) Stop() error {
n.pipe.Close()
n.cancel()
n.wg.Wait()
return nil
}

func (n *Input) process(ctx context.Context, pipe *os.File) error {
scan := bufio.NewScanner(pipe)
scan.Split(n.splitFunc)
scan.Buffer(n.buffer, len(n.buffer))

for scan.Scan() {
line := scan.Bytes()
if len(line) == 0 {
continue
}

if err := n.sendEntry(ctx, line); err != nil {
return fmt.Errorf("failed to send entry: %w", err)
}
}

return scan.Err()
}

// sendEntry sends an entry to the next operator in the pipeline.
func (n *Input) sendEntry(ctx context.Context, bytes []byte) error {
bytes = n.trimFunc(bytes)
if len(bytes) == 0 {
return nil
}

entry, err := n.NewEntry(string(bytes))
if err != nil {
return fmt.Errorf("failed to create entry: %w", err)
}

n.Write(ctx, entry)
return nil
}
Loading

0 comments on commit 4e8e527

Please sign in to comment.