Skip to content

Commit

Permalink
Revise plugin API (#5068)
Browse files Browse the repository at this point in the history
Signed-off-by: khanhtc1202 <[email protected]>
  • Loading branch information
khanhtc1202 authored Jul 25, 2024
1 parent 67bc2a9 commit 4845a1d
Show file tree
Hide file tree
Showing 16 changed files with 1,370 additions and 2,712 deletions.
207 changes: 3 additions & 204 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
)

type planner struct {
Expand All @@ -41,7 +37,7 @@ type planner struct {

// The pluginClient is used to call pluggin that actually
// performs planning deployment.
pluginClient platform.PlatformPluginClient
pluginClient pluginapi.PluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient
Expand All @@ -68,7 +64,7 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClient platform.PlatformPluginClient,
pluginClient pluginapi.PluginClient,
apiClient apiClient,
notifier notifier,
pipedConfig []byte,
Expand Down Expand Up @@ -151,201 +147,4 @@ func (p *planner) Run(ctx context.Context) error {
}()

return nil

// in := &platform.BuildPlanRequest{
// Deployment: p.deployment,
// WorkingDir: p.workingDir,
// LastSuccessfulCommitHash: p.lastSuccessfulCommitHash,
// LastSuccessfulConfigFileName: p.lastSuccessfulConfigFilename,
// PipedConfig: p.pipedConfig,
// }

// out, err := p.pluginClient.BuildPlan(ctx, in)

// // If the deployment was already cancelled, we ignore the plan result.
// select {
// case cmd := <-p.cancelledCh:
// if cmd != nil {
// p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_CANCELLED
// desc := fmt.Sprintf("Deployment was cancelled by %s while planning", cmd.Commander)
// p.reportDeploymentCancelled(ctx, cmd.Commander, desc)
// return cmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, nil)
// }
// default:
// }

// if err != nil {
// p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
// return p.reportDeploymentFailed(ctx, fmt.Sprintf("Unable to plan the deployment (%v)", err))
// }

// p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_PLANNED
// return p.reportDeploymentPlanned(ctx, out.Plan)
}

// func (p *planner) reportDeploymentPlanned(ctx context.Context, out *platform.DeploymentPlan) error {
// var (
// err error
// retry = pipedservice.NewRetry(10)
// req = &pipedservice.ReportDeploymentPlannedRequest{
// DeploymentId: p.deployment.Id,
// Summary: out.Summary,
// StatusReason: "The deployment has been planned",
// RunningCommitHash: p.lastSuccessfulCommitHash,
// RunningConfigFilename: p.lastSuccessfulConfigFilename,
// Versions: out.Versions,
// Stages: out.Stages,
// DeploymentChainId: p.deployment.DeploymentChainId,
// DeploymentChainBlockIndex: p.deployment.DeploymentChainBlockIndex,
// }
// )

// accounts, err := p.getMentionedAccounts(model.NotificationEventType_EVENT_DEPLOYMENT_PLANNED)
// if err != nil {
// p.logger.Error("failed to get the list of accounts", zap.Error(err))
// }

// defer func() {
// p.notifier.Notify(model.NotificationEvent{
// Type: model.NotificationEventType_EVENT_DEPLOYMENT_PLANNED,
// Metadata: &model.NotificationEventDeploymentPlanned{
// Deployment: p.deployment,
// Summary: out.Summary,
// MentionedAccounts: accounts,
// },
// })
// }()

// for retry.WaitNext(ctx) {
// if _, err = p.apiClient.ReportDeploymentPlanned(ctx, req); err == nil {
// return nil
// }
// err = fmt.Errorf("failed to report deployment status to control-plane: %v", err)
// }

// if err != nil {
// p.logger.Error("failed to mark deployment to be planned", zap.Error(err))
// }
// return err
// }

func (p *planner) reportDeploymentFailed(ctx context.Context, reason string) error {
var (
err error
now = p.nowFunc()
req = &pipedservice.ReportDeploymentCompletedRequest{
DeploymentId: p.deployment.Id,
Status: model.DeploymentStatus_DEPLOYMENT_FAILURE,
StatusReason: reason,
StageStatuses: nil,
DeploymentChainId: p.deployment.DeploymentChainId,
DeploymentChainBlockIndex: p.deployment.DeploymentChainBlockIndex,
CompletedAt: now.Unix(),
}
retry = pipedservice.NewRetry(10)
)

users, groups, err := p.getApplicationNotificationMentions(model.NotificationEventType_EVENT_DEPLOYMENT_FAILED)
if err != nil {
p.logger.Error("failed to get the list of users or groups", zap.Error(err))
}

defer func() {
p.notifier.Notify(model.NotificationEvent{
Type: model.NotificationEventType_EVENT_DEPLOYMENT_FAILED,
Metadata: &model.NotificationEventDeploymentFailed{
Deployment: p.deployment,
Reason: reason,
MentionedAccounts: users,
MentionedGroups: groups,
},
})
}()

for retry.WaitNext(ctx) {
if _, err = p.apiClient.ReportDeploymentCompleted(ctx, req); err == nil {
return nil
}
err = fmt.Errorf("failed to report deployment status to control-plane: %v", err)
}

if err != nil {
p.logger.Error("failed to mark deployment to be failed", zap.Error(err))
}
return err
}

func (p *planner) reportDeploymentCancelled(ctx context.Context, commander, reason string) error {
var (
err error
now = p.nowFunc()
req = &pipedservice.ReportDeploymentCompletedRequest{
DeploymentId: p.deployment.Id,
Status: model.DeploymentStatus_DEPLOYMENT_CANCELLED,
StatusReason: reason,
StageStatuses: nil,
DeploymentChainId: p.deployment.DeploymentChainId,
DeploymentChainBlockIndex: p.deployment.DeploymentChainBlockIndex,
CompletedAt: now.Unix(),
}
retry = pipedservice.NewRetry(10)
)

users, groups, err := p.getApplicationNotificationMentions(model.NotificationEventType_EVENT_DEPLOYMENT_CANCELLED)
if err != nil {
p.logger.Error("failed to get the list of users or groups", zap.Error(err))
}

defer func() {
p.notifier.Notify(model.NotificationEvent{
Type: model.NotificationEventType_EVENT_DEPLOYMENT_CANCELLED,
Metadata: &model.NotificationEventDeploymentCancelled{
Deployment: p.deployment,
Commander: commander,
MentionedAccounts: users,
MentionedGroups: groups,
},
})
}()

for retry.WaitNext(ctx) {
if _, err = p.apiClient.ReportDeploymentCompleted(ctx, req); err == nil {
return nil
}
err = fmt.Errorf("failed to report deployment status to control-plane: %v", err)
}

if err != nil {
p.logger.Error("failed to mark deployment to be cancelled", zap.Error(err))
}
return err
}

func (p *planner) getMentionedUsers(event model.NotificationEventType) ([]string, error) {
n, ok := p.metadataStore.Shared().Get(model.MetadataKeyDeploymentNotification)
if !ok {
return []string{}, nil
}

var notification config.DeploymentNotification
if err := json.Unmarshal([]byte(n), &notification); err != nil {
return nil, fmt.Errorf("could not extract mentions config: %w", err)
}

return notification.FindSlackUsers(event), nil
}

// getApplicationNotificationMentions returns the list of users groups who should be mentioned in the notification.
func (p *planner) getApplicationNotificationMentions(event model.NotificationEventType) ([]string, []string, error) {
n, ok := p.metadataStore.Shared().Get(model.MetadataKeyDeploymentNotification)
if !ok {
return []string{}, []string{}, nil
}

var notification config.DeploymentNotification
if err := json.Unmarshal([]byte(n), &notification); err != nil {
return nil, nil, fmt.Errorf("could not extract mentions config: %w", err)
}

return notification.FindSlackUsers(event), notification.FindSlackGroups(event), nil
}
10 changes: 5 additions & 5 deletions pkg/app/pipedv1/controller/pluginregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
"sync"

"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
)

type PluginRegistry interface {
Plugin(k model.ApplicationKind) (platform.PlatformPluginClient, bool)
Plugin(k model.ApplicationKind) (pluginapi.PluginClient, bool)
}

type pluginRegistry struct {
plugins map[model.ApplicationKind]platform.PlatformPluginClient
plugins map[model.ApplicationKind]pluginapi.PluginClient
mu sync.RWMutex
}

func (r *pluginRegistry) Plugin(k model.ApplicationKind) (platform.PlatformPluginClient, bool) {
func (r *pluginRegistry) Plugin(k model.ApplicationKind) (pluginapi.PluginClient, bool) {
r.mu.RLock()
defer r.mu.RUnlock()

Expand All @@ -49,7 +49,7 @@ func (r *pluginRegistry) Plugin(k model.ApplicationKind) (platform.PlatformPlugi
}

var defaultPluginRegistry = &pluginRegistry{
plugins: make(map[model.ApplicationKind]platform.PlatformPluginClient),
plugins: make(map[model.ApplicationKind]pluginapi.PluginClient),
}

func DefaultPluginRegistry() PluginRegistry {
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/pipedv1/plugin/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

func GetPlanSourceCloner(input *platform.PlanPluginInput) (deploysource.SourceCloner, error) {
func GetPlanSourceCloner(input *deployment.PlanPluginInput) (deploysource.SourceCloner, error) {
gitPath, err := exec.LookPath("git")
if err != nil {
return nil, err
Expand Down
File renamed without changes.
63 changes: 63 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes/planner/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import (
"context"
"fmt"

"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/regexpool"

"go.uber.org/zap"
"google.golang.org/grpc"
)

type secretDecrypter interface {
Decrypt(string) (string, error)
}

type PlannerService struct {
deployment.UnimplementedDeploymentServiceServer

Decrypter secretDecrypter
RegexPool *regexpool.Pool
Logger *zap.Logger
}

// Register registers all handling of this service into the specified gRPC server.
func (a *PlannerService) Register(server *grpc.Server) {
deployment.RegisterDeploymentServiceServer(server, a)
}

// NewPlannerService creates a new planService.
func NewPlannerService(
decrypter secretDecrypter,
logger *zap.Logger,
) *PlannerService {
return &PlannerService{
Decrypter: decrypter,
RegexPool: regexpool.DefaultPool(),
Logger: logger.Named("planner"),
}
}

func (ps *PlannerService) DetermineStrategy(ctx context.Context, in *deployment.DetermineStrategyRequest) (*deployment.DetermineStrategyResponse, error) {
return nil, fmt.Errorf("not implemented yet")
}

func (ps *PlannerService) DetermineVersions(ctx context.Context, in *deployment.DetermineVersionsRequest) (*deployment.DetermineVersionsResponse, error) {
return nil, fmt.Errorf("not implemented yet")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"context"
"time"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/platform/kubernetes/planner"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes/planner"
"github.com/pipe-cd/pipecd/pkg/cli"
"github.com/pipe-cd/pipecd/pkg/rpc"
"github.com/spf13/cobra"
Expand Down
Loading

0 comments on commit 4845a1d

Please sign in to comment.