Skip to content

Commit

Permalink
Rework memif to abstract sockets
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Sep 9, 2021
1 parent a70a9de commit d95a637
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 131 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ require (
github.com/stretchr/testify v1.7.0
github.com/thanhpk/randstr v1.0.4
github.com/vishvananda/netlink v1.1.0
golang.org/x/sys v0.0.0-20210603125802-9665404d3644
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200609130330-bd2cb7843e1b
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.2.4 // indirect
)

replace github.com/edwarnicke/govpp => github.com/Bolodya1997/govpp v0.0.0-20210906092737-6cdcb637b31f
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.fd.io/govpp.git v0.3.6-0.20210202134006-4c1cccf48cd1/go.mod h1:OCVd4W8SH+666KRQoMj6PM+oipLDZAHhqMz9B1TGbgI=
git.fd.io/govpp.git v0.3.6-0.20210615121054-5de7f6b85458 h1:6I9Hu2Tfg8qYtjKrdD9aRhIvMnNHQhh6MEoDC0hd8zc=
git.fd.io/govpp.git v0.3.6-0.20210615121054-5de7f6b85458/go.mod h1:OCVd4W8SH+666KRQoMj6PM+oipLDZAHhqMz9B1TGbgI=
github.com/Bolodya1997/govpp v0.0.0-20210906092737-6cdcb637b31f h1:pY/NN/l3S4eeDDUA+VJMa41zt+hcSMcr2p67L26yoM0=
github.com/Bolodya1997/govpp v0.0.0-20210906092737-6cdcb637b31f/go.mod h1:mIwCBOyMP3Mzfk+ZQShN+aZ24Le8zl+J0dyRI8qcI+o=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw=
Expand Down Expand Up @@ -28,8 +31,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/edwarnicke/exechelper v1.0.2/go.mod h1:/T271jtNX/ND4De6pa2aRy2+8sNtyCDB1A2pp4M+fUs=
github.com/edwarnicke/govpp v0.0.0-20210817123650-e0e3b4843cf5 h1:G9dEjCjU24jMtuWnWEgZ9eO2x5BPlyrmPLOK9nkfKnU=
github.com/edwarnicke/govpp v0.0.0-20210817123650-e0e3b4843cf5/go.mod h1:qCZYvdwFh5TJepc7DwDmOQ9olgfLjjX90JlyRE4SpYw=
github.com/edwarnicke/grpcfd v0.1.0 h1:f0lmmNDYawQaW+dMNoF8d4DxwPxxczNSFuvu5C7ptRk=
github.com/edwarnicke/grpcfd v0.1.0/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw=
Expand Down
32 changes: 24 additions & 8 deletions pkg/networkservice/mechanisms/memif/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows
// +build linux

package memif

Expand All @@ -29,8 +29,9 @@ import (
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/switchcase"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
Expand All @@ -49,7 +50,13 @@ func NewClient(vppConn api.Connection) networkservice.NetworkServiceClient {
&memifClient{
vppConn: vppConn,
},
memifproxy.New(),
switchcase.NewClient(&switchcase.ClientCase{
Condition: func(ctx context.Context, conn *networkservice.Connection) bool {
_, ok := loadDirectMemifInfo(ctx)
return !ok
},
Client: memifproxy.New(),
}),
)
}

Expand All @@ -64,11 +71,13 @@ func mechanismsContain(list []*networkservice.Mechanism, t string) bool {

func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
if !mechanismsContain(request.MechanismPreferences, memif.MECHANISM) {
request.MechanismPreferences = append(request.MechanismPreferences, &networkservice.Mechanism{
Cls: cls.LOCAL,
Type: memif.MECHANISM,
Parameters: make(map[string]string),
})
mechanism := memif.New("")
if info, ok := loadDirectMemifInfo(ctx); ok {
mechanism.GetParameters()[common.InodeURL] = info.namespace
} else {
mechanism.GetParameters()[common.InodeURL] = netNSURL
}
request.MechanismPreferences = append(request.MechanismPreferences, mechanism)
}

postponeCtxFunc := postpone.ContextWithValues(ctx)
Expand All @@ -78,6 +87,13 @@ func (m *memifClient) Request(ctx context.Context, request *networkservice.Netwo
return nil, err
}

if mechanism := memif.ToMechanism(conn.GetMechanism()); mechanism != nil {
if info, ok := loadDirectMemifInfo(ctx); ok {
info.filename = mechanism.GetParameters()[memif.SocketFilename]
return conn, nil
}
}

if err := create(ctx, conn, m.vppConn, metadata.IsClient(m)); err != nil {
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/mechanisms/memif/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows
// +build linux

package memif_test

Expand Down
66 changes: 44 additions & 22 deletions pkg/networkservice/mechanisms/memif/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build linux

package memif

import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sync/atomic"
"runtime"
"time"

"git.fd.io/govpp.git/api"
Expand All @@ -33,15 +36,31 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice/payload"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/pkg/errors"
"golang.org/x/sys/unix"

"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/up"
"github.com/networkservicemesh/sdk-vpp/pkg/tools/ifindex"
)

var lastSocketID uint32
var netNSURL string

// nolint:gochecknoinits
func init() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

fd, err := unix.Open("/proc/thread-self/ns/net", unix.O_RDONLY|unix.O_CLOEXEC, 0)
if err != nil {
panic("failed to open '/proc/thread-self/ns/net': " + err.Error())
}

netNSURL = (&url.URL{
Scheme: memifMech.SocketFileScheme,
Path: fmt.Sprintf("/proc/%d/fd/%d", os.Getpid(), fd),
}).String()
}

func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppConn api.Connection, isClient bool) (socketID uint32, err error) {
// Extract the socket filename
u, err := url.Parse(mechanism.GetSocketFileURL())
if err != nil {
return 0, errors.Wrapf(err, "not a valid url %q", mechanism.GetSocketFileURL())
Expand All @@ -50,43 +69,54 @@ func createMemifSocket(ctx context.Context, mechanism *memifMech.Mechanism, vppC
return 0, errors.Errorf("socket file url must have scheme %q, actual %q", memifMech.SocketFileScheme, u.Scheme)
}

// Create the socketID
socketID = atomic.AddUint32(&lastSocketID, 1) // TODO - work out a solution that works long term
now := time.Now()
memifSocketAddDel := &memif.MemifSocketFilenameAddDel{
memifSocketAddDel := &memif.MemifSocketFilenameAddDelV2{
IsAdd: true,
SocketID: socketID,
SocketFilename: u.Path,
SocketID: ^uint32(0),
SocketFilename: mechanism.GetSocketFilename(),
Namespace: u.Path,
}
if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil {

now := time.Now()

reply, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel)
if err != nil {
return 0, errors.WithStack(err)
}
memifSocketAddDel.SocketID = reply.SocketID

log.FromContext(ctx).
WithField("SocketID", memifSocketAddDel.SocketID).
WithField("SocketFilename", memifSocketAddDel.SocketFilename).
WithField("IsAdd", memifSocketAddDel.IsAdd).
WithField("duration", time.Since(now)).
WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed")

store(ctx, isClient, memifSocketAddDel)
return socketID, nil

return memifSocketAddDel.SocketID, nil
}

func deleteMemifSocket(ctx context.Context, vppConn api.Connection, isClient bool) error {
memifSocketAddDel, ok := load(ctx, isClient)
if !ok {
return nil
}

memifSocketAddDel.IsAdd = false

now := time.Now()
if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDel(ctx, memifSocketAddDel); err != nil {

if _, err := memif.NewServiceClient(vppConn).MemifSocketFilenameAddDelV2(ctx, memifSocketAddDel); err != nil {
return errors.WithStack(err)
}

log.FromContext(ctx).
WithField("SocketID", memifSocketAddDel.SocketID).
WithField("SocketFilename", memifSocketAddDel.SocketFilename).
WithField("IsAdd", memifSocketAddDel.IsAdd).
WithField("duration", time.Since(now)).
WithField("vppapi", "MemifSocketFilenameAddDel").Debug("completed")

return nil
}

Expand Down Expand Up @@ -159,10 +189,7 @@ func create(ctx context.Context, conn *networkservice.Connection, vppConn api.Co
return nil
}
if !isClient {
if err := os.MkdirAll(filepath.Dir(socketFile(conn)), 0700); err != nil {
return errors.Wrapf(err, "failed to create memif socket directory %s", socketFile(conn))
}
mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.SocketFileScheme, Path: socketFile(conn)}).String())
mechanism.GetParameters()[memifMech.SocketFilename] = socketFile(conn)
}
mode := memif.MEMIF_MODE_API_IP
if conn.GetPayload() == payload.Ethernet {
Expand All @@ -187,15 +214,10 @@ func del(ctx context.Context, conn *networkservice.Connection, vppConn api.Conne
if err := deleteMemifSocket(ctx, vppConn, isClient); err != nil {
return err
}
if !isClient {
if err := os.RemoveAll(filepath.Dir(socketFile(conn))); err != nil {
return errors.Wrapf(err, "failed to delete %s", filepath.Dir(socketFile(conn)))
}
}
}
return nil
}

func socketFile(conn *networkservice.Connection) string {
return filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket")
return "@" + filepath.Join(os.TempDir(), "memif", conn.GetId(), "memif.socket")
}
78 changes: 23 additions & 55 deletions pkg/networkservice/mechanisms/memif/memifproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !windows
// +build linux

package memifproxy

import (
"context"
"net/url"
"os"
"path/filepath"

Expand All @@ -35,12 +34,6 @@ import (
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

const (
memifNetwork = "unixpacket"
maxFDCount = 1
bufferSize = 128
)

type memifProxyClient struct{}

// New - create a new memifProxy client chain element
Expand All @@ -49,74 +42,49 @@ func New() networkservice.NetworkServiceClient {
}

func (m *memifProxyClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
postponeCtxFunc := postpone.ContextWithValues(ctx)

if mechanism := memifMech.ToMechanism(request.GetConnection().GetMechanism()); mechanism != nil {
if listener, ok := load(ctx, metadata.IsClient(m)); ok {
mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.SocketFileScheme, Path: listener.socketFilename}).String())
mechanism.GetParameters()[memifMech.SocketFilename] = listener.socketFilename
}
}

postponeCtxFunc := postpone.ContextWithValues(ctx)

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}

mechanism := memifMech.ToMechanism(conn.GetMechanism())
if mechanism == nil {
return conn, nil
}

// If we are already running a proxy... just keep running it
if _, ok := load(ctx, true); ok {
return conn, nil
}

if err = os.MkdirAll(filepath.Dir(listenSocketFilename(conn)), 0700); err != nil {
err = errors.Wrapf(err, "unable to mkdir %s", filepath.Dir(listenSocketFilename(conn)))
if closeErr := m.closeOnFailure(postponeCtxFunc, conn, opts); closeErr != nil {
err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error())
}
return nil, err
}

listener, err := newProxyListener(mechanism, listenSocketFilename(conn))
if err != nil {
if closeErr := m.closeOnFailure(postponeCtxFunc, conn, opts); closeErr != nil {
err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error())
if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil {
if _, ok := load(ctx, metadata.IsClient(m)); !ok {
var listener *proxyListener
if listener, err = newProxyListener(mechanism, listenSocketFilename(conn)); err != nil {
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

if _, closeErr := m.Close(closeCtx, conn, opts...); closeErr != nil {
err = errors.Wrapf(err, "connection closed with error: %s", closeErr.Error())
}
return nil, err
}
store(ctx, metadata.IsClient(m), listener)
}
return nil, err
mechanism.GetParameters()[memifMech.SocketFilename] = listenSocketFilename(conn)
}

store(ctx, metadata.IsClient(m), listener)

return conn, nil
}

func (m *memifProxyClient) closeOnFailure(postponeCtxFunc func() (context.Context, context.CancelFunc), conn *networkservice.Connection, opts []grpc.CallOption) error {
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

_, err := m.Close(closeCtx, conn, opts...)

return err
}

func (m *memifProxyClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
if mechanism := memifMech.ToMechanism(conn.GetMechanism()); mechanism != nil {
if listener, ok := load(ctx, metadata.IsClient(m)); ok {
mechanism.SetSocketFileURL((&url.URL{Scheme: memifMech.SocketFileScheme, Path: listener.socketFilename}).String())
if listener, ok := loadAndDelete(ctx, metadata.IsClient(m)); ok {
mechanism.GetParameters()[memifMech.SocketFilename] = listener.socketFilename
defer func() { _ = listener.Close() }()
}
}

rv, err := next.Client(ctx).Close(ctx, conn)
if listener, ok := loadAndDelete(ctx, metadata.IsClient(m)); ok {
_ = listener.Close()
}
_ = os.RemoveAll(filepath.Dir(listenSocketFilename(conn)))
return rv, err
return next.Client(ctx).Close(ctx, conn, opts...)
}

func listenSocketFilename(conn *networkservice.Connection) string {
return filepath.Join(os.TempDir(), "memifproxy", conn.GetId(), "memif.socket")
return "@" + filepath.Join(os.TempDir(), "memifproxy", conn.GetId(), "memif.socket")
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/pkg/errors"
)

const (
maxFDCount = 1
bufferSize = 128
)

type proxyConnection struct {
in net.Conn
out net.Conn
Expand Down
Loading

0 comments on commit d95a637

Please sign in to comment.