Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(federation): do not allocate local services for load balancing #3337

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (fs *FederatedServer) RandomServer() string {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
tunnelAddresses = append(tunnelAddresses, v.ID)
} else {
delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked
delete(fs.requestTable, v.ID) // make sure it's not tracked
log.Info().Msgf("Node %s is offline", v.ID)
}
}
Expand All @@ -61,8 +61,8 @@ func (fs *FederatedServer) syncTableStatus() {

for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
fs.ensureRecordExist(v.TunnelAddress)
currentTunnels[v.TunnelAddress] = struct{}{}
fs.ensureRecordExist(v.ID)
currentTunnels[v.ID] = struct{}{}
}
}

Expand Down
65 changes: 16 additions & 49 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ import (
"errors"
"fmt"
"net"
"time"

"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)

func (f *FederatedServer) Start(ctx context.Context) error {

n, err := NewNode(f.p2ptoken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
Expand All @@ -29,7 +25,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {

if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}, true); err != nil {
}, false); err != nil {
return err
}

Expand All @@ -50,21 +46,8 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
<-ctx.Done()
l.Close()
}()
ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
},
)

nodeAnnounce(ctx, node)

defer l.Close()
for {
Expand All @@ -82,52 +65,36 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
tunnelAddr := ""

workerID := ""
if fs.workerTarget != "" {
for _, v := range GetAvailableNodes(fs.service) {
if v.ID == fs.workerTarget {
tunnelAddr = v.TunnelAddress
break
}
}
workerID = fs.workerTarget
} else if fs.loadBalanced {
log.Debug().Msgf("Load balancing request")

tunnelAddr = fs.SelectLeastUsedServer()
if tunnelAddr == "" {
workerID = fs.SelectLeastUsedServer()
if workerID == "" {
log.Debug().Msgf("Least used server not found, selecting random")
tunnelAddr = fs.RandomServer()
workerID = fs.RandomServer()
}

} else {
tunnelAddr = fs.RandomServer()
workerID = fs.RandomServer()
}

if tunnelAddr == "" {
if workerID == "" {
log.Error().Msg("No available nodes yet")
return
}

log.Debug().Msgf("Selected tunnel %s", tunnelAddr)

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
log.Debug().Msgf("Selected node %s", workerID)
nodeData, exists := GetNode(fs.service, workerID)
if !exists {
log.Error().Msgf("Node %s not found", workerID)
return
}

log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer

tunnelConn.Close()
conn.Close()

proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
if fs.loadBalanced {
fs.RecordRequest(tunnelAddr)
fs.RecordRequest(workerID)
}
}()
}
Expand Down
14 changes: 14 additions & 0 deletions core/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type NodeData struct {
Name string
ID string
TunnelAddress string
ServiceID string
LastSeen time.Time
}

Expand All @@ -39,6 +40,19 @@ func GetAvailableNodes(serviceID string) []NodeData {
return availableNodes
}

func GetNode(serviceID, nodeID string) (NodeData, bool) {
if serviceID == "" {
serviceID = defaultServicesID
}
mu.Lock()
defer mu.Unlock()
if _, ok := nodes[serviceID]; !ok {
return NodeData{}, false
}
nd, exists := nodes[serviceID][nodeID]
return nd, exists
}

func AddNode(serviceID string, node NodeData) {
if serviceID == "" {
serviceID = defaultServicesID
Expand Down
120 changes: 63 additions & 57 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,7 @@ func nodeID(s string) string {
return fmt.Sprintf("%s-%s", hostname, s)
}

func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {

zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
zlog.Error().Err(err).Msg("Error listening")
return err
}
go func() {
<-ctx.Done()
l.Close()
}()

// ll.Info("Binding local port on", srcaddr)

func nodeAnnounce(ctx context.Context, node *node.Node) {
ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
Expand All @@ -97,6 +82,66 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
ledger.Add(protocol.UsersLedgerKey, updatedMap)
},
)
}

func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
ledger, _ := node.Ledger()
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error().Msg("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
}

// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error().Msg("cannot decode peer")

conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
}

// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Err(err).Msg("cannot open stream peer")

conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer

stream.Close()
conn.Close()
}

func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
zlog.Error().Err(err).Msg("Error listening")
return err
}
go func() {
<-ctx.Done()
l.Close()
}()

nodeAnnounce(ctx, node)

defer l.Close()
for {
Expand All @@ -114,47 +159,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
// Retrieve current ID for ip in the blockchain
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service)
service := &types.Service{}
existingValue.Unmarshal(service)
// If mismatch, update the blockchain
if !found {
zlog.Error().Msg("Service not found on blockchain")
conn.Close()
// ll.Debugf("service '%s' not found on blockchain", serviceID)
return
}

// Decode the Peer
d, err := peer.Decode(service.PeerID)
if err != nil {
zlog.Error().Msg("cannot decode peer")

conn.Close()
// ll.Debugf("could not decode peer '%s'", service.PeerID)
return
}

// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Msg("cannot open stream peer")

conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
return
}
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, stream, conn)
go copyStream(closer, conn, stream)
<-closer

stream.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
proxyP2PConnection(ctx, node, service, conn)
}()
}
}
Expand Down Expand Up @@ -258,6 +263,7 @@ var muservice sync.Mutex
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
nd.ServiceID = sserv
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
Expand Down