Skip to content

Commit

Permalink
Fix closing streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed May 12, 2023
1 parent e91c48e commit b4b6ae6
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/executor/gh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (*GHExecutor) Metadata(context.Context) (api.MetadataOutput, error) {

// Execute returns a given command as a response.
func (e *GHExecutor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.CheckKubeConfigProvided(pluginName, in.Context.KubeConfig); err != nil {
if err := pluginx.ValidateKubeConfigProvided(pluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/executor/helm/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (e *Executor) Metadata(context.Context) (api.MetadataOutput, error) {
// - history
// - get [all|manifest|hooks|notes]
func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.CheckKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/executor/kubectl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (e *Executor) Metadata(context.Context) (api.MetadataOutput, error) {

// Execute returns a given command as response.
func (e *Executor) Execute(ctx context.Context, in executor.ExecuteInput) (executor.ExecuteOutput, error) {
if err := pluginx.CheckKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
if err := pluginx.ValidateKubeConfigProvided(PluginName, in.Context.KubeConfig); err != nil {
return executor.ExecuteOutput{}, err
}

Expand Down
10 changes: 8 additions & 2 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,16 @@ func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
go func() {
for {
select {
case event := <-out.Output:
case event, ok := <-out.Output:
if !ok {
return
}
log.WithField("event", string(event)).Debug("Dispatching received event...")
d.dispatch(ctx, event, dispatch)
case msg := <-out.Event:
case msg, ok := <-out.Event:
if !ok {
return
}
log.WithField("message", msg).Debug("Dispatching received message...")
d.dispatchMsg(ctx, msg, dispatch)
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion internal/source/kubernetes/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewSource(version string) *Source {

// Stream streams Kubernetes events
func (*Source) Stream(ctx context.Context, input source.StreamInput) (source.StreamOutput, error) {
if err := pluginx.CheckKubeConfigProvided(PluginName, input.Context.KubeConfig); err != nil {
if err := pluginx.ValidateKubeConfigProvided(PluginName, input.Context.KubeConfig); err != nil {
return source.StreamOutput{}, err
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/api/source/grpc_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"fmt"
"io"
"log"

"github.com/hashicorp/go-plugin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/kubeshop/botkube/pkg/api"
Expand Down Expand Up @@ -114,11 +115,13 @@ func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcClient{
client: NewSourceClient(c),
logger: NewLogger(),
}, nil
}

type grpcClient struct {
client SourceClient
logger logrus.FieldLogger
}

func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput, error) {
Expand Down Expand Up @@ -152,20 +155,22 @@ func (p *grpcClient) Stream(ctx context.Context, in StreamInput) (StreamOutput,
// On any other error, the stream is aborted and the error contains the RPC
// status.
if err != nil {
log.Print(err)
p.logger.Errorf("canceling streaming: %s", status.Convert(err).Message())
// TODO: we should consider adding error feedback channel to StreamOutput.
return
}
var event Event
if len(feature.Event) != 0 && string(feature.Event) != "" {
if err := json.Unmarshal(feature.Event, &event); err != nil {
log.Printf("while unmarshalling message from JSON: %s", err.Error())
p.logger.Errorf("canceling streaming: cannot unmarshal JSON message: %s", err.Error())
return
}
}
out.Output <- feature.Output
out.Event <- event
}
close(out.Output)
close(out.Event)
}()

return out, nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/source/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package source

import (
"os"

"github.com/sirupsen/logrus"
)

// NewLogger returns a new logger used internally. We should replace it in the near future, as we shouldn't be so opinionated.
func NewLogger() logrus.FieldLogger {
logger := logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
return logger
}
4 changes: 2 additions & 2 deletions pkg/pluginx/kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func PersistKubeConfig(_ context.Context, kc []byte) (string, func(context.Conte
return abs, deleteFn, nil
}

// CheckKubeConfigProvided returns an error if a given kubeconfig is empty or nil.
func CheckKubeConfigProvided(pluginName string, kubeconfig []byte) error {
// ValidateKubeConfigProvided returns an error if a given kubeconfig is empty or nil.
func ValidateKubeConfigProvided(pluginName string, kubeconfig []byte) error {
if len(kubeconfig) != 0 {
return nil
}
Expand Down

0 comments on commit b4b6ae6

Please sign in to comment.