Skip to content

Commit

Permalink
chore: refactor out and extract into functions
Browse files Browse the repository at this point in the history
Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler committed Aug 21, 2024
1 parent 02f050d commit c12343c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 61 deletions.
80 changes: 80 additions & 0 deletions core/cli/api/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package cli_api

import (
"context"
"fmt"
"net"
"os"
"strings"

"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/node"

"github.com/rs/zerolog/log"
)

func StartP2PStack(ctx context.Context, address, token, networkID, workerID string, federated bool) error {
var n *node.Node
// Here we are avoiding creating multiple nodes:
// - if the federated mode is enabled, we create a federated node and expose a service
// - exposing a service creates a node with specific options, and we don't want to create another node

// If the federated mode is enabled, we expose a service to the local instance running
// at r.Address
if federated {
_, port, err := net.SplitHostPort(address)
if err != nil {
return err
}

// Here a new node is created and started
// and a service is exposed by the node
node, err := p2p.ExposeService(ctx, "localhost", port, token, p2p.NetworkID(networkID, p2p.FederatedID))
if err != nil {
return err
}

if err := p2p.ServiceDiscoverer(ctx, node, token, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
return err
}

n = node
}

// If the p2p mode is enabled, we start the service discovery
if token != "" {
// If a node wasn't created previously, create it
if n == nil {
node, err := p2p.NewNode(token)
if err != nil {
return err
}
err = node.Start(ctx)
if err != nil {
return fmt.Errorf("starting new node: %w", err)
}
n = node
}

// Attach a ServiceDiscoverer to the p2p node
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, workerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, workerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
tunnelEnvVar := strings.Join(tunnelAddresses, ",")

os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)

Check warning

Code scanning / Golang security checks by gosec

Errors unhandled. Warning

Errors unhandled.
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}, true); err != nil {
return err
}
}

return nil
}
64 changes: 3 additions & 61 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package cli
import (
"context"
"fmt"
"net"
"os"
"strings"
"time"

"github.com/mudler/edgevpn/pkg/node"

cli_api "github.com/mudler/LocalAI/core/cli/api"
cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http"
Expand Down Expand Up @@ -121,63 +118,8 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {

backgroundCtx := context.Background()

var n *node.Node
// Avoid creating multiple nodes:
// - if the federated mode is enabled, we create a federated node and expose a service
// Exposing a service creates a node with specific options, and we don't want to create another node

// If the federated mode is enabled, we expose a service to the local instance running
// at r.Address
if r.Federated {
_, port, err := net.SplitHostPort(r.Address)
if err != nil {
return err
}

node, err := p2p.ExposeService(backgroundCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID))
if err != nil {
return err
}

if err := p2p.ServiceDiscoverer(backgroundCtx, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil {
return err
}

n = node
}

// If the p2p mode is enabled, we start the service discovery
if token != "" {
// If a node wasn't created previously, create it
if n == nil {
node, err := p2p.NewNode(token)
if err != nil {
return err
}
err = node.Start(backgroundCtx)
if err != nil {
return fmt.Errorf("starting new node: %w", err)
}
n = node
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(backgroundCtx, n, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}
tunnelEnvVar := strings.Join(tunnelAddresses, ",")

os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}, true); err != nil {
return err
}
if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, p2p.WorkerID, r.Federated); err != nil {
return err
}

idleWatchDog := r.EnableWatchdogIdle
Expand Down

0 comments on commit c12343c

Please sign in to comment.