Skip to content

Commit

Permalink
[chore] Refactor service/host into service/internal/graph (#10854)
Browse files Browse the repository at this point in the history
#### Description
This is a prep PR to reduce the size of
#10777. As
part of the work to make our `component.Host` implementation implement
`componentstatus.Reporter` (see
#10852),
the `host` struct and `graph` logic need to be closer together. This is
because, as part of
#10777
`StartAll` is changed to depend on our specific `Host` type instead of a
`component.Host`. Our host already has a dependency on `graph`, so it
can't be moved into its own module.

<!-- Issue number if applicable -->
#### Link to tracking issue
Related to
#10777
Related to
#10413
  • Loading branch information
TylerHelmuth authored Aug 14, 2024
1 parent b0085e5 commit d5d1f82
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 195 deletions.
74 changes: 0 additions & 74 deletions service/host.go

This file was deleted.

164 changes: 164 additions & 0 deletions service/internal/graph/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"net/http"
"path"
"runtime"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/zpages"
)

// TODO: remove as part of https://github.com/open-telemetry/opentelemetry-collector/issues/7370 for service 1.0
type getExporters interface {
GetExporters() map[component.DataType]map[component.ID]component.Component
}

var _ getExporters = (*Host)(nil)
var _ component.Host = (*Host)(nil)

type Host struct {
AsyncErrorChannel chan error
Receivers *receiver.Builder
Processors *processor.Builder
Exporters *exporter.Builder
Connectors *connector.Builder
Extensions *extension.Builder

BuildInfo component.BuildInfo

Pipelines *Graph
ServiceExtensions *extensions.Extensions
}

func (host *Host) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.Receivers.Factory(componentType)
case component.KindProcessor:
return host.Processors.Factory(componentType)
case component.KindExporter:
return host.Exporters.Factory(componentType)
case component.KindConnector:
return host.Connectors.Factory(componentType)
case component.KindExtension:
return host.Extensions.Factory(componentType)
}
return nil
}

func (host *Host) GetExtensions() map[component.ID]component.Component {
return host.ServiceExtensions.GetExtensions()
}

// Deprecated: [0.79.0] This function will be removed in the future.
// Several components in the contrib repository use this function so it cannot be removed
// before those cases are removed. In most cases, use of this function can be replaced by a
// connector. See https://github.com/open-telemetry/opentelemetry-collector/issues/7370 and
// https://github.com/open-telemetry/opentelemetry-collector/pull/7390#issuecomment-1483710184
// for additional information.
func (host *Host) GetExporters() map[component.DataType]map[component.ID]component.Component {
return host.Pipelines.GetExporters()
}

func (host *Host) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) {
host.ServiceExtensions.NotifyComponentStatusChange(source, event)
if event.Status() == component.StatusFatalError {
host.AsyncErrorChannel <- event.Err()
}
}

const (
// Paths
zServicePath = "servicez"
zPipelinePath = "pipelinez"
zExtensionPath = "extensionz"
zFeaturePath = "featurez"
)

var (
// InfoVar is a singleton instance of the Info struct.
runtimeInfoVar [][2]string
)

func init() {
runtimeInfoVar = [][2]string{
{"StartTimestamp", time.Now().String()},
{"Go", runtime.Version()},
{"OS", runtime.GOOS},
{"Arch", runtime.GOARCH},
// Add other valuable runtime information here.
}
}

func (host *Host) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.HandleFunc(path.Join(pathPrefix, zServicePath), host.zPagesRequest)
mux.HandleFunc(path.Join(pathPrefix, zPipelinePath), host.Pipelines.HandleZPages)
mux.HandleFunc(path.Join(pathPrefix, zExtensionPath), host.ServiceExtensions.HandleZPages)
mux.HandleFunc(path.Join(pathPrefix, zFeaturePath), handleFeaturezRequest)
}

func (host *Host) zPagesRequest(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Service " + host.BuildInfo.Command})
zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Build Info", Properties: getBuildInfoProperties(host.BuildInfo)})
zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Runtime Info", Properties: runtimeInfoVar})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Pipelines",
ComponentEndpoint: zPipelinePath,
Link: true,
})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Extensions",
ComponentEndpoint: zExtensionPath,
Link: true,
})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Name: "Features",
ComponentEndpoint: zFeaturePath,
Link: true,
})
zpages.WriteHTMLPageFooter(w)
}

func handleFeaturezRequest(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Feature Gates"})
zpages.WriteHTMLFeaturesTable(w, getFeaturesTableData())
zpages.WriteHTMLPageFooter(w)
}

func getFeaturesTableData() zpages.FeatureGateTableData {
data := zpages.FeatureGateTableData{}
featuregate.GlobalRegistry().VisitAll(func(gate *featuregate.Gate) {
data.Rows = append(data.Rows, zpages.FeatureGateTableRowData{
ID: gate.ID(),
Enabled: gate.IsEnabled(),
Description: gate.Description(),
Stage: gate.Stage().String(),
FromVersion: gate.FromVersion(),
ToVersion: gate.ToVersion(),
ReferenceURL: gate.ReferenceURL(),
})
})
return data
}

func getBuildInfoProperties(buildInfo component.BuildInfo) [][2]string {
return [][2]string{
{"Command", buildInfo.Command},
{"Description", buildInfo.Description},
{"Version", buildInfo.Version},
}
}
40 changes: 20 additions & 20 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Settings struct {
type Service struct {
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *serviceHost
host *graph.Host
collectorConf *confmap.Conf

reporter status.Reporter
Expand All @@ -81,14 +81,14 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled()
srv := &Service{
buildInfo: set.BuildInfo,
host: &serviceHost{
receivers: set.Receivers,
processors: set.Processors,
exporters: set.Exporters,
connectors: set.Connectors,
extensions: set.Extensions,
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
host: &graph.Host{
Receivers: set.Receivers,
Processors: set.Processors,
Exporters: set.Exporters,
Connectors: set.Connectors,
Extensions: set.Extensions,
BuildInfo: set.BuildInfo,
AsyncErrorChannel: set.AsyncErrorChannel,
},
collectorConf: set.CollectorConf,
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
}
srv.reporter = status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) {
srv.reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
logger.Warn("Invalid transition", zap.Error(err))
}
Expand Down Expand Up @@ -200,21 +200,21 @@ func (srv *Service) Start(ctx context.Context) error {
// enable status reporting
srv.reporter.Ready()

if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil {
if err := srv.host.ServiceExtensions.Start(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

if srv.collectorConf != nil {
if err := srv.host.serviceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
if err := srv.host.ServiceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
return err
}
}

if err := srv.host.pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil {
if err := srv.host.Pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil {
return fmt.Errorf("cannot start pipelines: %w", err)
}

if err := srv.host.serviceExtensions.NotifyPipelineReady(); err != nil {
if err := srv.host.ServiceExtensions.NotifyPipelineReady(); err != nil {
return err
}

Expand Down Expand Up @@ -257,15 +257,15 @@ func (srv *Service) Shutdown(ctx context.Context) error {
// Begin shutdown sequence.
srv.telemetrySettings.Logger.Info("Starting shutdown...")

if err := srv.host.serviceExtensions.NotifyPipelineNotReady(); err != nil {
if err := srv.host.ServiceExtensions.NotifyPipelineNotReady(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

if err := srv.host.pipelines.ShutdownAll(ctx, srv.reporter); err != nil {
if err := srv.host.Pipelines.ShutdownAll(ctx, srv.reporter); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err))
}

if err := srv.host.serviceExtensions.Shutdown(ctx); err != nil {
if err := srv.host.ServiceExtensions.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

Expand All @@ -282,9 +282,9 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e
extensionsSettings := extensions.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Extensions: srv.host.extensions,
Extensions: srv.host.Extensions,
}
if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil {
if srv.host.ServiceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil {
return fmt.Errorf("failed to build extensions: %w", err)
}
return nil
Expand All @@ -293,7 +293,7 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e
// Creates the pipeline graph.
func (srv *Service) initGraph(ctx context.Context, set Settings, cfg Config) error {
var err error
if srv.host.pipelines, err = graph.Build(ctx, graph.Settings{
if srv.host.Pipelines, err = graph.Build(ctx, graph.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
ReceiverBuilder: set.Receivers,
Expand Down
5 changes: 3 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func TestServiceGetExporters(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

// nolint
expMap := srv.host.GetExporters()
assert.Len(t, expMap, 3)
assert.Len(t, expMap[component.DataTypeTraces], 1)
Expand Down Expand Up @@ -443,10 +444,10 @@ func TestServiceFatalError(t *testing.T) {

go func() {
ev := component.NewFatalErrorEvent(assert.AnError)
srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev)
srv.host.NotifyComponentStatusChange(&component.InstanceID{}, ev)
}()

err = <-srv.host.asyncErrorChannel
err = <-srv.host.AsyncErrorChannel

require.ErrorIs(t, err, assert.AnError)
}
Expand Down
Loading

0 comments on commit d5d1f82

Please sign in to comment.