Skip to content

Commit

Permalink
Add ConnectionConfigChangedFunc callback function to Plugin, which is…
Browse files Browse the repository at this point in the history
… called when the connection config changes. Closes #387
  • Loading branch information
kaidaguerre authored Aug 22, 2022
1 parent c7056a3 commit 961db9c
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 190 deletions.
13 changes: 12 additions & 1 deletion connection/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ type ConnectionCache struct {
}

func NewConnectionCache(connectionName string, connectionCache *cache.Cache[any]) *ConnectionCache {
return &ConnectionCache{connectionName, connectionCache}
return &ConnectionCache{
connectionName: connectionName,
cache: connectionCache,
}
}

func (c *ConnectionCache) Set(ctx context.Context, key string, value interface{}) error {
Expand All @@ -32,10 +35,13 @@ func (c *ConnectionCache) SetWithTTL(ctx context.Context, key string, value inte
key,
value,
store.WithExpiration(expiration),
// put connection name in tags
store.WithTags([]string{c.connectionName}),
)

// wait for value to pass through buffers (necessary for ristretto)
time.Sleep(10 * time.Millisecond)

return err
}

Expand All @@ -55,6 +61,11 @@ func (c *ConnectionCache) Delete(ctx context.Context, key string) {
c.cache.Delete(ctx, key)
}

// Clear deletes all cache items for this connection
func (c *ConnectionCache) Clear(ctx context.Context) {
c.cache.Invalidate(ctx, store.WithInvalidateTags([]string{c.connectionName}))
}

func (c *ConnectionCache) buildCacheKey(key string) string {
return fmt.Sprintf("__connection_cache_key_%s__%s", c.connectionName, key)
}
6 changes: 0 additions & 6 deletions plugin/connection_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ type ConnectionConfigSchema struct {
NewInstance ConnectionConfigInstanceFunc
}

func NewConnectionConfigSchema() *ConnectionConfigSchema {
return &ConnectionConfigSchema{
Schema: map[string]*schema.Attribute{},
}
}

// Parse function parses the hcl string into a connection config struct.
// The schema and the struct to parse into are provided by the plugin
func (c *ConnectionConfigSchema) Parse(configString string) (config interface{}, err error) {
Expand Down
210 changes: 34 additions & 176 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/eko/gocache/v3/store"
"github.com/hashicorp/go-hclog"
"github.com/turbot/go-kit/helpers"
connection_manager "github.com/turbot/steampipe-plugin-sdk/v4/connection"
"github.com/turbot/steampipe-plugin-sdk/v4/error_helpers"
"github.com/turbot/steampipe-plugin-sdk/v4/grpc"
"github.com/turbot/steampipe-plugin-sdk/v4/grpc/proto"
Expand Down Expand Up @@ -69,6 +70,9 @@ type Plugin struct {
// every table must implement these columns
RequiredColumns []*Column
ConnectionConfigSchema *ConnectionConfigSchema
// ConnectionConfigChangedFunc is a callback function which is called from UpdateConnectionConfigs
// when any connection configs have changed
ConnectionConfigChangedFunc func(p *Plugin, old, new *Connection)

// map of connection data (schema, config, connection cache)
// keyed by connection name
Expand All @@ -79,12 +83,15 @@ type Plugin struct {
queryCache *query_cache.QueryCache
// shared connection cache - this is the underlying cache used for all queryData ConnectionCache
connectionCacheStore *cache.Cache[any]
// map of the connection caches, keyed by connection name
connectionCacheMap map[string]*connection_manager.ConnectionCache
}

// Initialise creates the 'connection manager' (which provides caching), sets up the logger
// and sets the file limit.
func (p *Plugin) Initialise() {
p.ConnectionMap = make(map[string]*ConnectionData)
p.connectionCacheMap = make(map[string]*connection_manager.ConnectionCache)

p.Logger = p.setupLogger()
log.Printf("[INFO] Initialise plugin '%s', using sdk version %s", p.Name, version.String())
Expand Down Expand Up @@ -119,6 +126,11 @@ func (p *Plugin) Initialise() {
p.DefaultGetConfig.initialise(nil)
}

// create default ConnectionConfigChangedFunc if needed
if p.ConnectionConfigChangedFunc == nil {
p.ConnectionConfigChangedFunc = defaultConnectionConfigChangedFunc
}

// set file limit
// TODO REMOVE WITH GO 1.19
p.setuLimit()
Expand All @@ -142,182 +154,11 @@ func (p *Plugin) createConnectionCacheStore() error {
return nil
}

func (p *Plugin) SetConnectionConfig(connectionName, connectionConfigString string) (err error) {
log.Printf("[TRACE] SetConnectionConfig %s", connectionName)
return p.SetAllConnectionConfigs([]*proto.ConnectionConfig{
{
Connection: connectionName,
Config: connectionConfigString,
},
}, 0)
}

func (p *Plugin) SetAllConnectionConfigs(configs []*proto.ConnectionConfig, maxCacheSizeMb int) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("SetAllConnectionConfigs failed: %s", helpers.ToError(r).Error())
} else {
p.Logger.Debug("SetAllConnectionConfigs finished")
}
}()

log.Printf("[TRACE] SetAllConnectionConfigs setting %d configs", len(configs))

// if this plugin does not have dynamic config, we can share table map and schema
var exemplarSchema map[string]*proto.TableSchema
var exemplarTableMap map[string]*Table

var aggregators []*proto.ConnectionConfig
for _, config := range configs {
// NOTE: do not set connection config for aggregator connections
if len(config.ChildConnections) > 0 {
log.Printf("[TRACE] connection %s is an aggregator - handle separately", config.Connection)
aggregators = append(aggregators, config)
continue
}

connectionName := config.Connection

connectionConfigString := config.Config
if connectionName == "" {
log.Printf("[WARN] SetAllConnectionConfigs failed - ConnectionConfig contained empty connection name")
return fmt.Errorf("SetAllConnectionConfigs failed - ConnectionConfig contained empty connection name")
}

// create connection object
c := &Connection{Name: connectionName}

// if config was provided, parse it
if connectionConfigString != "" {
if p.ConnectionConfigSchema == nil {
return fmt.Errorf("connection config has been set for connection '%s', but plugin '%s' does not define connection config schema", connectionName, p.Name)
}
// ask plugin for a struct to deserialise the config into
config, err := p.ConnectionConfigSchema.Parse(connectionConfigString)
if err != nil {
return err
}
c.Config = config
}

schema := exemplarSchema
tableMap := exemplarTableMap
var err error

if tableMap == nil {
log.Printf("[TRACE] connection %s build schema and table map", connectionName)
// if the plugin defines a CreateTables func, call it now
ctx := context.WithValue(context.Background(), context_key.Logger, p.Logger)
tableMap, err = p.initialiseTables(ctx, c)
if err != nil {
return err
}

// populate the plugin schema
schema, err = p.buildSchema(tableMap)
if err != nil {
return err
}

if p.SchemaMode == SchemaModeStatic {
exemplarSchema = schema
exemplarTableMap = tableMap
}
}

// add to connection map
p.ConnectionMap[connectionName] = &ConnectionData{
TableMap: tableMap,
Connection: c,
Schema: schema,
}
}

for _, aggregatorConfig := range aggregators {
firstChild := p.ConnectionMap[aggregatorConfig.ChildConnections[0]]
// we do not currently support aggregator connections for dynamic schema
if p.SchemaMode == SchemaModeDynamic {
return fmt.Errorf("aggregator connections are not supported for dynamic plugins: connection '%s', plugin: '%s'", aggregatorConfig.Connection, aggregatorConfig.Plugin)
}

// add to connection map using the first child's schema
p.ConnectionMap[aggregatorConfig.Connection] = &ConnectionData{
TableMap: firstChild.TableMap,
Connection: &Connection{Name: aggregatorConfig.Connection},
Schema: firstChild.Schema,
}
}

// now create the query cache - do this AFTER setting the connection config so the cache can build
// the connection schema map
p.ensureCache(maxCacheSizeMb)

return nil
}

func (p *Plugin) UpdateConnectionConfigs(added []*proto.ConnectionConfig, deleted []*proto.ConnectionConfig, changed []*proto.ConnectionConfig) error {
log.Printf("[TRACE] UpdateConnectionConfigs added %v, deleted %v, changed %v", added, deleted, changed)

// if this plugin does not have dynamic config, we can share table map and schema
var exemplarSchema map[string]*proto.TableSchema
var exemplarTableMap map[string]*Table
if p.SchemaMode == SchemaModeStatic {
for _, connectionData := range p.ConnectionMap {
exemplarSchema = connectionData.Schema
exemplarTableMap = connectionData.TableMap
}
}

for _, addedConnection := range added {
schema := exemplarSchema
tableMap := exemplarTableMap
// create connection object
c := &Connection{
Name: addedConnection.Connection,
Config: addedConnection.Config,
}
if addedConnection.Config != "" {
if p.ConnectionConfigSchema == nil {
return fmt.Errorf("connection config has been set for connection '%s', but plugin '%s' does not define connection config schema", addedConnection, p.Name)
}
// ask plugin for a struct to deserialise the config into
config, err := p.ConnectionConfigSchema.Parse(addedConnection.Config)
if err != nil {
return err
}
c.Config = config
}

if p.SchemaMode == SchemaModeDynamic {
var err error
log.Printf("[TRACE] UpdateConnectionConfigs - connection %s build schema and table map", addedConnection.Connection)
ctx := context.WithValue(context.Background(), context_key.Logger, p.Logger)
tableMap, err = p.initialiseTables(ctx, c)
if err != nil {
return err
}

// populate the plugin schema
schema, err = p.buildSchema(tableMap)
if err != nil {
return err
}
}

p.ConnectionMap[addedConnection.Connection] = &ConnectionData{
TableMap: tableMap,
Connection: c,
Schema: schema,
}
}

// update the query cache schema map
connectionSchemaMap := p.buildConnectionSchemaMap()
p.queryCache.PluginSchemaMap = connectionSchemaMap

// Ignore deleted and updated for now

return nil
func (p *Plugin) newConnectionCache(connectionName string) *connection_manager.ConnectionCache {
connectionCache := connection_manager.NewConnectionCache(connectionName, p.connectionCacheStore)
// add to map of connection caches
p.connectionCacheMap[connectionName] = connectionCache
return connectionCache
}

// GetSchema is the handler function for the GetSchema grpc function
Expand Down Expand Up @@ -435,6 +276,23 @@ func (p *Plugin) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_E
return helpers.CombineErrors(errors...)
}

// ClearConnectionCache clears the connection cache for the given connection
func (p *Plugin) ClearConnectionCache(ctx context.Context, connectionName string) {
// get the connection cache for this connection
connectionCache, ok := p.connectionCacheMap[connectionName]
if !ok {
// not expected
log.Printf("[WARN] ClearConnectionCache failed - no connection cache found for connection %s", connectionName)
return
}
connectionCache.Clear(ctx)
}

// ClearQueryCache clears the query cache for the given connection
func (p *Plugin) ClearQueryCache(ctx context.Context, connectionName string) {
p.queryCache.ClearForConnection(ctx, connectionName)
}

func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteRequest, connectionName string, executeData *proto.ExecuteConnectionData, outputChan chan *proto.ExecuteResponse) (err error) {
const rowBufferSize = 10
var rowChan = make(chan *proto.Row, rowBufferSize)
Expand Down
Loading

0 comments on commit 961db9c

Please sign in to comment.