Skip to content

Commit

Permalink
Change Attach signature to return ConnContext func (#153)
Browse files Browse the repository at this point in the history
Resolves #152

Make sure the library-provided ConnContext function is used by the caller when they setup the HTTP Server to Attach() to, so that the context correctly stores a references to the connection. This is an internal requirement for plain http server implementation.
  • Loading branch information
tshinde-splunk authored Dec 7, 2022
1 parent 29d0b78 commit cea1d9d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 10 deletions.
16 changes: 10 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"crypto/tls"
"net"
"net/http"

"github.com/open-telemetry/opamp-go/server/types"
Expand Down Expand Up @@ -34,18 +35,21 @@ type StartSettings struct {

type HTTPHandlerFunc func(http.ResponseWriter, *http.Request)

type ConnContext func(ctx context.Context, c net.Conn) context.Context

type OpAMPServer interface {
// Attach prepares the OpAMP Server to begin handling requests from an existing
// http.Server. The returned HTTPHandlerFunc should be added as a handler to the
// desired http.Server by the caller and the http.Server should be started by
// the caller after that.
// http.Server. The returned HTTPHandlerFunc and ConnContext should be added as a
// handler and ConnContext respectively to the desired http.Server by the caller
// and the http.Server should be started by the caller after that. The ConnContext
// is only used for plain http connections.
// For example:
// handler, _ := Server.Attach()
// handler, connContext, _ := Server.Attach()
// mux := http.NewServeMux()
// mux.HandleFunc("/opamp", handler)
// httpSrv := &http.Server{Handler:mux,Addr:"127.0.0.1:4320"}
// httpSrv := &http.Server{Handler:mux,Addr:"127.0.0.1:4320", ConnContext: connContext}
// httpSrv.ListenAndServe()
Attach(settings Settings) (HTTPHandlerFunc, error)
Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error)

// Start an OpAMP Server and begin accepting connections. Starts its own http.Server
// using provided settings. This should block until the http.Server is ready to
Expand Down
6 changes: 3 additions & 3 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ func New(logger types.Logger) *server {
return &server{logger: logger}
}

func (s *server) Attach(settings Settings) (HTTPHandlerFunc, error) {
func (s *server) Attach(settings Settings) (HTTPHandlerFunc, ConnContext, error) {
s.settings = settings
s.wsUpgrader = websocket.Upgrader{
EnableCompression: settings.EnableCompression,
}
return s.httpHandler, nil
return s.httpHandler, contextWithConn, nil
}

func (s *server) Start(settings StartSettings) error {
if s.httpServer != nil {
return errAlreadyStarted
}

_, err := s.Attach(settings.Settings)
_, _, err := s.Attach(settings.Settings)
if err != nil {
return err
}
Expand Down
84 changes: 83 additions & 1 deletion server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func TestServerAttachAcceptConnection(t *testing.T) {
settings := Settings{Callbacks: callbacks}
srv := New(&sharedinternal.NopLogger{})
require.NotNil(t, srv)
handlerFunc, err := srv.Attach(settings)
handlerFunc, _, err := srv.Attach(settings)
require.NoError(t, err)

// Create an HTTP Server and make it handle OpAMP connections.
Expand All @@ -447,6 +447,88 @@ func TestServerAttachAcceptConnection(t *testing.T) {
eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
}

func TestServerAttachSendMessagePlainHTTP(t *testing.T) {
connectedCalled := int32(0)
connectionCloseCalled := int32(0)
var rcvMsg atomic.Value

var srvConn types.Connection
callbacks := CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{Accept: true}
},
OnConnectedFunc: func(conn types.Connection) {
atomic.StoreInt32(&connectedCalled, 1)
srvConn = conn
},
OnMessageFunc: func(conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Remember received message.
rcvMsg.Store(message)

// Send a response.
response := protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
Capabilities: uint64(protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus),
}
return &response
},
OnConnectionCloseFunc: func(conn types.Connection) {
atomic.StoreInt32(&connectionCloseCalled, 1)
assert.EqualValues(t, srvConn, conn)
},
}

// Prepare to attach OpAMP Server to an HTTP Server created separately.
settings := Settings{Callbacks: callbacks}
srv := New(&sharedinternal.NopLogger{})
require.NotNil(t, srv)
handlerFunc, ContextWithConn, err := srv.Attach(settings)
require.NoError(t, err)

// Create an HTTP Server and make it handle OpAMP connections.
mux := http.NewServeMux()
path := "/opamppath"
mux.HandleFunc(path, handlerFunc)
hs := httptest.NewUnstartedServer(mux)
hs.Config.ConnContext = ContextWithConn
hs.Start()
defer hs.Close()

// Send a message to the Server.
sendMsg := protobufs.AgentToServer{
InstanceUid: "12345678",
}
b, err := proto.Marshal(&sendMsg)
require.NoError(t, err)
resp, err := http.Post("http://"+hs.Listener.Addr().String()+path, contentTypeProtobuf, bytes.NewReader(b))
require.NoError(t, err)

// Wait until Server receives the message.
eventually(t, func() bool { return rcvMsg.Load() != nil })
assert.True(t, atomic.LoadInt32(&connectedCalled) == 1)

// Verify the received message is what was sent.
assert.True(t, proto.Equal(rcvMsg.Load().(proto.Message), &sendMsg))

// Read Server's response.
b, err = io.ReadAll(resp.Body)
require.NoError(t, err)

assert.EqualValues(t, http.StatusOK, resp.StatusCode)
assert.EqualValues(t, contentTypeProtobuf, resp.Header.Get(headerContentType))

// Decode the response.
var response protobufs.ServerToAgent
err = proto.Unmarshal(b, &response)
require.NoError(t, err)

// Verify the response.
assert.EqualValues(t, sendMsg.InstanceUid, response.InstanceUid)
assert.EqualValues(t, protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus, response.Capabilities)

eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
}

func TestServerHonoursClientRequestContentEncoding(t *testing.T) {

hc := http.Client{}
Expand Down

0 comments on commit cea1d9d

Please sign in to comment.