Skip to content

Commit

Permalink
Add support for connection config and multi-region. Closes #21. Closes
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre committed Feb 16, 2021
1 parent 415e872 commit 77e31d8
Show file tree
Hide file tree
Showing 29 changed files with 1,568 additions and 242 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/iancoleman/strcase v0.1.2
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/olekukonko/tablewriter v0.0.4
github.com/stevenle/topsort v0.0.0-20130922064739-8130c1d7596b
github.com/turbot/go-kit v0.1.1
Expand Down
37 changes: 22 additions & 15 deletions grpc/pluginServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@ package grpc

import (
"github.com/hashicorp/go-plugin"
pb "github.com/turbot/steampipe-plugin-sdk/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
pluginshared "github.com/turbot/steampipe-plugin-sdk/grpc/shared"
"github.com/turbot/steampipe-plugin-sdk/version"
)

type ExecuteFunc func(req *pb.ExecuteRequest, stream pb.WrapperPlugin_ExecuteServer) error
type GetSchemaFunc func() (map[string]*pb.TableSchema, error)
type ExecuteFunc func(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) error
type GetSchemaFunc func() (map[string]*proto.TableSchema, error)
type SetConnectionConfigFunc func(string, string) error

// PluginServer :: server for a single plugin
type PluginServer struct {
pb.UnimplementedWrapperPluginServer
pluginName string
executeFunc ExecuteFunc
getSchemaFunc GetSchemaFunc
proto.UnimplementedWrapperPluginServer
pluginName string
executeFunc ExecuteFunc
setConnectionConfigFunc SetConnectionConfigFunc
getSchemaFunc GetSchemaFunc
}

func NewPluginServer(pluginName string, getSchemaFunc GetSchemaFunc, executeFunc ExecuteFunc) *PluginServer {

func NewPluginServer(pluginName string, getSchemaFunc GetSchemaFunc, executeFunc ExecuteFunc, setConnectionConfigFunc SetConnectionConfigFunc) *PluginServer {
return &PluginServer{
pluginName: pluginName,
executeFunc: executeFunc,
getSchemaFunc: getSchemaFunc,
pluginName: pluginName,
executeFunc: executeFunc,
setConnectionConfigFunc: setConnectionConfigFunc,
getSchemaFunc: getSchemaFunc,
}
}

func (s PluginServer) GetSchema(_ *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
func (s PluginServer) GetSchema(_ *proto.GetSchemaRequest) (*proto.GetSchemaResponse, error) {
schema, err := s.getSchemaFunc()
return &pb.GetSchemaResponse{Schema: &pb.Schema{Schema: schema, SdkVersion: version.String()}}, err
return &proto.GetSchemaResponse{Schema: &proto.Schema{Schema: schema, SdkVersion: version.String()}}, err
}

func (s PluginServer) Execute(req *pb.ExecuteRequest, stream pb.WrapperPlugin_ExecuteServer) error {
func (s PluginServer) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) error {
return s.executeFunc(req, stream)
}

func (s PluginServer) SetConnectionConfig(req *proto.SetConnectionConfigRequest) (*proto.SetConnectionConfigResponse, error) {
err := s.setConnectionConfigFunc(req.ConnectionName, req.ConnectionConfig)
return &proto.SetConnectionConfigResponse{}, err
}

func (s PluginServer) Serve() {
pluginMap := map[string]plugin.Plugin{
s.pluginName: &pluginshared.WrapperPlugin{Impl: s},
Expand Down
7 changes: 4 additions & 3 deletions grpc/proto/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions grpc/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message QueryContext {
message ExecuteRequest {
string table = 1;
QueryContext query_context = 2;
string connection = 3;
}

message ExecuteResponse {
Expand All @@ -79,9 +80,16 @@ message GetSchemaResponse {

}

message SetConnectionConfigRequest{
string connection_name = 1;
string connection_config = 2;
}
message SetConnectionConfigResponse{}

service WrapperPlugin {
rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
rpc Execute(ExecuteRequest) returns (stream ExecuteResponse);
rpc SetConnectionConfig(SetConnectionConfigRequest) returns (SetConnectionConfigResponse);
}

message Row {
Expand Down
20 changes: 14 additions & 6 deletions grpc/shared/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,45 @@ package shared
import (
"context"

pb "github.com/turbot/steampipe-plugin-sdk/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
)

// GRPCClient is an implementation of
//WrapperPluginClient service that talks over RPC.
type GRPCClient struct {
// Proto client use to make the grpc service calls.
client pb.WrapperPluginClient
client proto.WrapperPluginClient
// this context is created by the plugin package, and is canceled when the
// plugin process ends.
ctx context.Context
}

func (c *GRPCClient) GetSchema(req *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
func (c *GRPCClient) GetSchema(req *proto.GetSchemaRequest) (*proto.GetSchemaResponse, error) {
return c.client.GetSchema(c.ctx, req)
}

func (c *GRPCClient) Execute(req *pb.ExecuteRequest) (pb.WrapperPlugin_ExecuteClient, error) {
func (c *GRPCClient) Execute(req *proto.ExecuteRequest) (proto.WrapperPlugin_ExecuteClient, error) {
return c.client.Execute(c.ctx, req)
}

func (c *GRPCClient) SetConnectionConfig(req *proto.SetConnectionConfigRequest) (*proto.SetConnectionConfigResponse, error) {
return c.client.SetConnectionConfig(c.ctx, req)
}

// Here is the gRPC server that GRPCClient talks to.
type GRPCServer struct {
// This is the real implementation
Impl WrapperPluginServer
}

func (m *GRPCServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
func (m *GRPCServer) GetSchema(_ context.Context, req *proto.GetSchemaRequest) (*proto.GetSchemaResponse, error) {
return m.Impl.GetSchema(req)
}

func (m *GRPCServer) Execute(req *pb.ExecuteRequest, server pb.WrapperPlugin_ExecuteServer) error {
func (m *GRPCServer) Execute(req *proto.ExecuteRequest, server proto.WrapperPlugin_ExecuteServer) error {
return m.Impl.Execute(req, server)

}
func (m *GRPCServer) SetConnectionConfig(_ context.Context, req *proto.SetConnectionConfigRequest) (*proto.SetConnectionConfigResponse, error) {
return m.Impl.SetConnectionConfig(req)
}
17 changes: 9 additions & 8 deletions grpc/shared/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ package shared
import (
"context"

pb "github.com/turbot/steampipe-plugin-sdk/grpc/proto"

"github.com/hashicorp/go-plugin"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
"google.golang.org/grpc"
)

Expand All @@ -23,13 +22,15 @@ var Handshake = plugin.HandshakeConfig{
// - the client interface (implemented by the client stub)
// this is because the signature of the client and server are difference when using grpc streaming
type WrapperPluginServer interface {
GetSchema(req *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error)
Execute(req *pb.ExecuteRequest, stream pb.WrapperPlugin_ExecuteServer) error
GetSchema(req *proto.GetSchemaRequest) (*proto.GetSchemaResponse, error)
Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) error
SetConnectionConfig(req *proto.SetConnectionConfigRequest) (*proto.SetConnectionConfigResponse, error)
}

type WrapperPluginClient interface {
GetSchema(request *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error)
Execute(req *pb.ExecuteRequest) (pb.WrapperPlugin_ExecuteClient, error)
GetSchema(request *proto.GetSchemaRequest) (*proto.GetSchemaResponse, error)
Execute(req *proto.ExecuteRequest) (proto.WrapperPlugin_ExecuteClient, error)
SetConnectionConfig(req *proto.SetConnectionConfigRequest) (*proto.SetConnectionConfigResponse, error)
}

// This is the implementation of plugin.GRPCServer so we can serve/consume this.
Expand All @@ -42,11 +43,11 @@ type WrapperPlugin struct {
}

func (p *WrapperPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error {
pb.RegisterWrapperPluginServer(s, &GRPCServer{Impl: p.Impl})
proto.RegisterWrapperPluginServer(s, &GRPCServer{Impl: p.Impl})
return nil
}

// return a GRPCClient, called by Dispense
func (p *WrapperPlugin) GRPCClient(ctx context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &GRPCClient{client: pb.NewWrapperPluginClient(c), ctx: ctx}, nil
return &GRPCClient{client: proto.NewWrapperPluginClient(c), ctx: ctx}, nil
}
101 changes: 101 additions & 0 deletions plugin/connection_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package plugin

import (
"errors"
"fmt"
"log"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/hcl/v2"

"github.com/turbot/steampipe-plugin-sdk/plugin/schema"

"github.com/hashicorp/hcl/v2/hcldec"
"github.com/hashicorp/hcl/v2/hclparse"
"github.com/turbot/go-kit/helpers"
"github.com/zclconf/go-cty/cty/gocty"
)

type ConnectionConfigInstanceFunc func() interface{}

type Connection struct {
Name string
// the connection config
// NOTE: we always pass and store connection config BY VALUE
Config interface{}
}

// ConnectionConfigSchema :: struct used to define the connection config schema and store the config for each plugin connection
type ConnectionConfigSchema struct {
Schema map[string]*schema.Attribute
// function which returns an instance of a connection config struct
NewInstance ConnectionConfigInstanceFunc
}

func NewConnectionConfigSchema() *ConnectionConfigSchema {
return &ConnectionConfigSchema{
Schema: map[string]*schema.Attribute{},
}
}

// Parse :: parse the hcl string into a connection config struct.
// The schema and the struct to parse into are provided by the plugin
func (c *ConnectionConfigSchema) Parse(configString string) (config interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] ConnectionConfigSchema Parse caught a panic: %v\n", r)
err = status.Error(codes.Internal, fmt.Sprintf("ConnectionConfigSchema Parse failed with panic %v", r))
}
}()

// ensure a schema is set
if len(c.Schema) == 0 {
return nil, fmt.Errorf("cannot parse connection config as no config schema is set in the connection config")
}
if c.NewInstance == nil {
return nil, fmt.Errorf("cannot parse connection config as no NewInstance function is specified in the connection config")
}
configStruct := c.NewInstance()
spec := schema.SchemaToObjectSpec(c.Schema)
parser := hclparse.NewParser()

file, diags := parser.ParseHCL([]byte(configString), "/")
if diags.HasErrors() {
return nil, DiagsToError("failed to parse connection config", diags)
}
value, diags := hcldec.Decode(file.Body, spec, nil)
if diags.HasErrors() {
return nil, DiagsToError("failed to parse connection config", diags)
}

// decode into the provided struct
if err := gocty.FromCtyValue(value, configStruct); err != nil {
return nil, fmt.Errorf("Failed to marshal parsed config into config struct: %v", err)
}

// return the struct by value
return helpers.DereferencePointer(configStruct), nil
}

// convert hcl diags into an error
func DiagsToError(prefix string, diags hcl.Diagnostics) error {
// convert the first diag into an error
if !diags.HasErrors() {
return nil
}
for _, diag := range diags {
if diag.Severity == hcl.DiagError {
errString := fmt.Sprintf("%s: ", diag.Summary)
if diag.Detail != "" {
errString += fmt.Sprintf(": %s", diag.Detail)
}
if prefix != "" {
errString = fmt.Sprintf("%s: %s", prefix, errString)
}
return errors.New(errString)
}
}
return diags.Errs()[0]
}
Loading

0 comments on commit 77e31d8

Please sign in to comment.