Skip to content

Commit

Permalink
feat(federation): do not allocate services, directly connect with libp2p
Browse files Browse the repository at this point in the history
Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler committed Aug 20, 2024
1 parent 12a6c27 commit 2b0be62
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 54 deletions.
8 changes: 4 additions & 4 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (fs *FederatedServer) RandomServer() string {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
tunnelAddresses = append(tunnelAddresses, v.ID)
} else {
delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked
delete(fs.requestTable, v.ID) // make sure it's not tracked
log.Info().Msgf("Node %s is offline", v.ID)
}
}
Expand All @@ -61,8 +61,8 @@ func (fs *FederatedServer) syncTableStatus() {

for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
fs.ensureRecordExist(v.TunnelAddress)
currentTunnels[v.TunnelAddress] = struct{}{}
fs.ensureRecordExist(v.ID)
currentTunnels[v.ID] = struct{}{}
}
}

Expand Down
65 changes: 16 additions & 49 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ import (
"errors"
"fmt"
"net"
"time"

"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)

func (f *FederatedServer) Start(ctx context.Context) error {

n, err := NewNode(f.p2ptoken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
Expand All @@ -29,7 +25,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {

if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}, true); err != nil {
}, false); err != nil {
return err
}

Expand All @@ -50,21 +46,8 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
<-ctx.Done()
l.Close()
}()
ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
},
)

nodeAnnounce(ctx, node)

defer l.Close()
for {
Expand All @@ -82,52 +65,36 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
tunnelAddr := ""

workerID := ""
if fs.workerTarget != "" {
for _, v := range GetAvailableNodes(fs.service) {
if v.ID == fs.workerTarget {
tunnelAddr = v.TunnelAddress
break
}
}
workerID = fs.workerTarget
} else if fs.loadBalanced {
log.Debug().Msgf("Load balancing request")

tunnelAddr = fs.SelectLeastUsedServer()
if tunnelAddr == "" {
workerID = fs.SelectLeastUsedServer()
if workerID == "" {
log.Debug().Msgf("Least used server not found, selecting random")
tunnelAddr = fs.RandomServer()
workerID = fs.RandomServer()
}

} else {
tunnelAddr = fs.RandomServer()
workerID = fs.RandomServer()
}

if tunnelAddr == "" {
if workerID == "" {
log.Error().Msg("No available nodes yet")
return
}

log.Debug().Msgf("Selected tunnel %s", tunnelAddr)

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
log.Debug().Msgf("Selected node %s", workerID)
nodeData, exists := GetNode(fs.service, workerID)
if !exists {
log.Error().Msgf("Node %s not found", workerID)
return
}

log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer

tunnelConn.Close()
conn.Close()

proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
if fs.loadBalanced {
fs.RecordRequest(tunnelAddr)
fs.RecordRequest(workerID)
}
}()
}
Expand Down
14 changes: 14 additions & 0 deletions core/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type NodeData struct {
Name string
ID string
TunnelAddress string
ServiceID string
LastSeen time.Time
}

Expand All @@ -39,6 +40,19 @@ func GetAvailableNodes(serviceID string) []NodeData {
return availableNodes
}

func GetNode(serviceID, nodeID string) (NodeData, bool) {
if serviceID == "" {
serviceID = defaultServicesID
}
mu.Lock()
defer mu.Unlock()
if _, ok := nodes[serviceID]; !ok {
return NodeData{}, false
}
nd, exists := nodes[serviceID][nodeID]
return nd, exists
}

func AddNode(serviceID string, node NodeData) {
if serviceID == "" {
serviceID = defaultServicesID
Expand Down
3 changes: 2 additions & 1 deletion core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string,
// Open a stream
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
if err != nil {
zlog.Error().Msg("cannot open stream peer")
zlog.Error().Err(err).Msg("cannot open stream peer")

conn.Close()
// ll.Debugf("could not open stream '%s'", err.Error())
Expand Down Expand Up @@ -263,6 +263,7 @@ var muservice sync.Mutex
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
nd.ServiceID = sserv
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
Expand Down

0 comments on commit 2b0be62

Please sign in to comment.