Skip to content

Commit

Permalink
feat: optimize p2p (#3338)
Browse files Browse the repository at this point in the history
* feat: optimize p2p

* feat: optimize p2p async download
  • Loading branch information
AlkaidChan authored Jul 9, 2024
1 parent 4864aad commit 2d100f2
Show file tree
Hide file tree
Showing 35 changed files with 2,017 additions and 1,236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
pbds "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/data-service"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/jsoni"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/lock"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/types"
)

// Interface defines all the supported operations to get resource cache.
Expand All @@ -43,8 +42,6 @@ type Interface interface {
GetReleasedKvValue(kt *kit.Kit, bizID, appID, releaseID uint32, key string) (string, error)
SetClientMetric(kt *kit.Kit, bizID, appID uint32, payload []byte) error
BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Client, clientEventData []*pbce.ClientEvent) error
GetAsyncDownloadTask(kt *kit.Kit, bizID uint32, taskID string) (string, error)
SetAsyncDownloadTask(kt *kit.Kit, bizID uint32, taskID string, task *types.AsyncDownloadTaskCache) error
}

// New initialize a cache client.
Expand Down Expand Up @@ -133,22 +130,3 @@ func (c *client) BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Cl
}
return nil
}

// GetAsyncDownloadTask get async download task from cache
func (c *client) GetAsyncDownloadTask(kt *kit.Kit, bizID uint32, taskID string) (string, error) {
return c.bds.Get(kt.Ctx, keys.Key.AsyncDownloadTaskKey(bizID, taskID))
}

// SetAsyncDownloadTask set async download task to cache
func (c *client) SetAsyncDownloadTask(kt *kit.Kit, bizID uint32, taskID string,
task *types.AsyncDownloadTaskCache) error {
js, err := jsoni.Marshal(task)
if err != nil {
return err
}
if err := c.bds.Set(kt.Ctx, keys.Key.AsyncDownloadTaskKey(bizID, taskID), string(js),
keys.Key.AppMetaTtlSec(false)); err != nil {
return err
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ var Key = &keyGenerator{
releasedHookTTLRange: [2]int{6 * oneDaySeconds, 7 * oneDaySeconds},
appMetaTTLRange: [2]int{6 * oneDaySeconds, 7 * oneDaySeconds},
appHasRITTLRange: [2]int{5 * 60, 10 * 60},
asyncDownloadTaskTTLRange: [2]int{30 * 60, 60 * 60},
}

type namespace string
Expand All @@ -50,7 +49,6 @@ const (
appID namespace = "app-id"
releasedKv namespace = "released-kv"
clientMetric namespace = "client-metric"
asyncDownloadTask namespace = "async-download-task"
)

type keyGenerator struct {
Expand All @@ -63,7 +61,6 @@ type keyGenerator struct {
releasedHookTTLRange [2]int
appMetaTTLRange [2]int
appHasRITTLRange [2]int
asyncDownloadTaskTTLRange [2]int
}

// ClientMetricKey generate the client metric cache key.
Expand All @@ -75,27 +72,6 @@ func (k keyGenerator) ClientMetricKey(bizID uint32, appID uint32) string {
}.String()
}

// AsyncDownloadTaskKey generate the async download task cache key.
func (k keyGenerator) AsyncDownloadTaskKey(bizID uint32, taskID string) string {
return element{
biz: bizID,
ns: asyncDownloadTask,
key: taskID,
}.String()
}

// AsyncDownloadTaskTtlSec generate the async download task's TTL seconds
func (k keyGenerator) AsyncDownloadTaskTtlSec(withRange bool) int {
if withRange {
//nolint:gosec
r := rand.New(rand.NewSource(time.Now().UnixNano()))
seconds := r.Intn(k.asyncDownloadTaskTTLRange[1]-k.asyncDownloadTaskTTLRange[0]) +
k.asyncDownloadTaskTTLRange[0]
return seconds
}
return k.asyncDownloadTaskTTLRange[0]
}

// ReleasedGroup generate a release's released group cache key to save all the released groups under this release
func (k keyGenerator) ReleasedGroup(bizID uint32, appID uint32) string {
return element{
Expand Down
31 changes: 0 additions & 31 deletions bcs-services/bcs-bscp/cmd/cache-service/service/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,34 +237,3 @@ func (s *Service) SetClientMetric(ctx context.Context, req *pbcs.SetClientMetric
}
return &pbcs.SetClientMetricResp{}, nil
}

// SetAsyncDownloadTask set async download task
func (s *Service) SetAsyncDownloadTask(ctx context.Context, req *pbcs.SetAsyncDownloadTaskReq) (
*pbcs.SetAsyncDownloadTaskResp, error) {
kt := kit.FromGrpcContext(ctx)
task := &types.AsyncDownloadTaskCache{
BizID: req.BizId,
AppID: req.AppId,
TaskID: req.TaskId,
FilePath: req.FilePath,
FileName: req.FileName,
}
err := s.op.SetAsyncDownloadTask(kt, req.BizId, req.TaskId, task)
if err != nil {
return nil, err
}
return &pbcs.SetAsyncDownloadTaskResp{}, nil
}

// GetAsyncDownloadTask get async download task
func (s *Service) GetAsyncDownloadTask(ctx context.Context, req *pbcs.GetAsyncDownloadTaskReq) (
*pbcs.JsonRawResp, error) {
kt := kit.FromGrpcContext(ctx)
task, err := s.op.GetAsyncDownloadTask(kt, req.BizId, req.TaskId)
if err != nil {
return nil, err
}
return &pbcs.JsonRawResp{
JsonRaw: task,
}, nil
}
8 changes: 0 additions & 8 deletions bcs-services/bcs-bscp/cmd/feed-server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/cmd/feed-server/crontab"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/cmd/feed-server/options"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/cmd/feed-server/service"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/cc"
Expand Down Expand Up @@ -74,7 +73,6 @@ type feedServer struct {
serve *grpc.Server
sd serviced.ServiceDiscover
service *service.Service
cleaner *crontab.SourceFileCacheCleaner
}

// prepare do prepare jobs before run feed server.
Expand Down Expand Up @@ -122,10 +120,6 @@ func (fs *feedServer) prepare(opt *options.Option) error {
}
fs.service = svc

cleaner := crontab.NewSourceFileCacheCleaner()
fs.cleaner = cleaner
fs.cleaner.Run()

return nil
}

Expand Down Expand Up @@ -225,8 +219,6 @@ func (fs *feedServer) listenAndServe() error {

func (fs *feedServer) finalizer() {

fs.cleaner.Stop()

if err := fs.sd.Deregister(); err != nil {
logs.Errorf("process service shutdown, but deregister failed, err: %v", err)
return
Expand Down

This file was deleted.

Loading

0 comments on commit 2d100f2

Please sign in to comment.