Skip to content

Commit

Permalink
Move internal/pdatagrpc to model/otlpgrpc (#3507)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 28, 2021
1 parent b21da41 commit 75ebb88
Show file tree
Hide file tree
Showing 15 changed files with 63 additions and 63 deletions.
14 changes: 7 additions & 7 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -453,7 +453,7 @@ func TestHttpReception(t *testing.T) {
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
assert.NoError(t, err)
s := grpc.NewServer(opts...)
pdatagrpc.RegisterTracesServer(s, &grpcTraceServer{})
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -467,7 +467,7 @@ func TestHttpReception(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := pdatagrpc.NewTracesClient(grpcClientConn)
client := otlpgrpc.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
if tt.hasError {
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
opts, err := gss.ToServerOption(map[config.ComponentID]component.Extension{})
assert.NoError(t, err)
s := grpc.NewServer(opts...)
pdatagrpc.RegisterTracesServer(s, &grpcTraceServer{})
otlpgrpc.RegisterTracesServer(s, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
Expand All @@ -514,7 +514,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := pdatagrpc.NewTracesClient(grpcClientConn)
client := otlpgrpc.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
assert.NoError(t, errResp)
Expand All @@ -525,8 +525,8 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

type grpcTraceServer struct{}

func (gts *grpcTraceServer) Export(context.Context, pdata.Traces) (pdatagrpc.TracesResponse, error) {
return pdatagrpc.NewTracesResponse(), nil
func (gts *grpcTraceServer) Export(context.Context, pdata.Traces) (otlpgrpc.TracesResponse, error) {
return otlpgrpc.NewTracesResponse(), nil
}

// tempSocketName provides a temporary Unix socket name for testing.
Expand Down
14 changes: 7 additions & 7 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -86,9 +86,9 @@ func (e *exporter) pushLogs(ctx context.Context, ld pdata.Logs) error {

type grpcSender struct {
// gRPC clients and connection.
traceExporter pdatagrpc.TracesClient
metricExporter pdatagrpc.MetricsClient
logExporter pdatagrpc.LogsClient
traceExporter otlpgrpc.TracesClient
metricExporter otlpgrpc.MetricsClient
logExporter otlpgrpc.LogsClient
clientConn *grpc.ClientConn
metadata metadata.MD
callOptions []grpc.CallOption
Expand All @@ -106,9 +106,9 @@ func newGrpcSender(config *Config, ext map[config.ComponentID]component.Extensio
}

gs := &grpcSender{
traceExporter: pdatagrpc.NewTracesClient(clientConn),
metricExporter: pdatagrpc.NewMetricsClient(clientConn),
logExporter: pdatagrpc.NewLogsClient(clientConn),
traceExporter: otlpgrpc.NewTracesClient(clientConn),
metricExporter: otlpgrpc.NewMetricsClient(clientConn),
logExporter: otlpgrpc.NewLogsClient(clientConn),
clientConn: clientConn,
metadata: metadata.New(config.GRPCClientSettings.Headers),
callOptions: []grpc.CallOption{
Expand Down
20 changes: 10 additions & 10 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand All @@ -55,14 +55,14 @@ type mockTracesReceiver struct {
lastRequest pdata.Traces
}

func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (otlpgrpc.TracesResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = td
r.metadata, _ = metadata.FromIncomingContext(ctx)
return pdatagrpc.NewTracesResponse(), nil
return otlpgrpc.NewTracesResponse(), nil
}

func (r *mockTracesReceiver) GetLastRequest() pdata.Traces {
Expand All @@ -79,7 +79,7 @@ func otlpTracesReceiverOnGRPCServer(ln net.Listener) *mockTracesReceiver {
}

// Now run it as a gRPC server
pdatagrpc.RegisterTracesServer(rcv.srv, rcv)
otlpgrpc.RegisterTracesServer(rcv.srv, rcv)
go func() {
_ = rcv.srv.Serve(ln)
}()
Expand All @@ -92,14 +92,14 @@ type mockLogsReceiver struct {
lastRequest pdata.Logs
}

func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = ld
r.metadata, _ = metadata.FromIncomingContext(ctx)
return pdatagrpc.NewLogsResponse(), nil
return otlpgrpc.NewLogsResponse(), nil
}

func (r *mockLogsReceiver) GetLastRequest() pdata.Logs {
Expand All @@ -116,7 +116,7 @@ func otlpLogsReceiverOnGRPCServer(ln net.Listener) *mockLogsReceiver {
}

// Now run it as a gRPC server
pdatagrpc.RegisterLogsServer(rcv.srv, rcv)
otlpgrpc.RegisterLogsServer(rcv.srv, rcv)
go func() {
_ = rcv.srv.Serve(ln)
}()
Expand All @@ -129,15 +129,15 @@ type mockMetricsReceiver struct {
lastRequest pdata.Metrics
}

func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
_, recordCount := md.MetricAndDataPointCount()
atomic.AddInt32(&r.totalItems, int32(recordCount))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = md
r.metadata, _ = metadata.FromIncomingContext(ctx)
return pdatagrpc.NewMetricsResponse(), nil
return otlpgrpc.NewMetricsResponse(), nil
}

func (r *mockMetricsReceiver) GetLastRequest() pdata.Metrics {
Expand All @@ -154,7 +154,7 @@ func otlpMetricsReceiverOnGRPCServer(ln net.Listener) *mockMetricsReceiver {
}

// Now run it as a gRPC server
pdatagrpc.RegisterMetricsServer(rcv.srv, rcv)
otlpgrpc.RegisterMetricsServer(rcv.srv, rcv)
go func() {
_ = rcv.srv.Serve(ln)
}()
Expand Down
2 changes: 1 addition & 1 deletion internal/pdatagrpc/logs.go → model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdatagrpc
package otlpgrpc

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdatagrpc
package otlpgrpc

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion internal/pdatagrpc/traces.go → model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pdatagrpc
package otlpgrpc

import (
"context"
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
)
Expand Down Expand Up @@ -54,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return pdatagrpc.LogsResponse{}, err
return otlpgrpc.LogsResponse{}, err
}

return pdatagrpc.NewLogsResponse(), nil
return otlpgrpc.NewLogsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
Expand Down
10 changes: 5 additions & 5 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -84,16 +84,16 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Equal(t, pdatagrpc.LogsResponse{}, resp)
assert.Equal(t, otlpgrpc.LogsResponse{}, resp)
}

func makeLogsServiceClient(addr net.Addr) (pdatagrpc.LogsClient, func(), error) {
func makeLogsServiceClient(addr net.Addr) (otlpgrpc.LogsClient, func(), error) {
cc, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, nil, err
}

logClient := pdatagrpc.NewLogsClient(cc)
logClient := otlpgrpc.NewLogsClient(cc)

doneFn := func() { _ = cc.Close() }
return logClient, doneFn, nil
Expand All @@ -115,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func())

// Now run it as a gRPC server
srv := grpc.NewServer()
pdatagrpc.RegisterLogsServer(srv, r)
otlpgrpc.RegisterLogsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
)
Expand Down Expand Up @@ -53,14 +53,14 @@ const (
var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return pdatagrpc.MetricsResponse{}, err
return otlpgrpc.MetricsResponse{}, err
}

return pdatagrpc.NewMetricsResponse(), nil
return otlpgrpc.NewMetricsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
Expand Down
10 changes: 5 additions & 5 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

Expand Down Expand Up @@ -93,16 +93,16 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Equal(t, pdatagrpc.MetricsResponse{}, resp)
assert.Equal(t, otlpgrpc.MetricsResponse{}, resp)
}

func makeMetricsServiceClient(addr net.Addr) (pdatagrpc.MetricsClient, func(), error) {
func makeMetricsServiceClient(addr net.Addr) (otlpgrpc.MetricsClient, func(), error) {
cc, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, nil, err
}

metricsClient := pdatagrpc.NewMetricsClient(cc)
metricsClient := otlpgrpc.NewMetricsClient(cc)

doneFn := func() { _ = cc.Close() }
return metricsClient, doneFn, nil
Expand All @@ -122,7 +122,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func
r := New(receiverID, mc)
// Now run it as a gRPC server
srv := grpc.NewServer()
pdatagrpc.RegisterMetricsServer(srv, r)
otlpgrpc.RegisterMetricsServer(srv, r)
go func() {
_ = srv.Serve(ln)
}()
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
)
Expand Down Expand Up @@ -54,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (otlpgrpc.TracesResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return pdatagrpc.TracesResponse{}, err
return otlpgrpc.TracesResponse{}, err
}

return pdatagrpc.NewTracesResponse(), nil
return otlpgrpc.NewTracesResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
Expand Down
Loading

0 comments on commit 75ebb88

Please sign in to comment.