Skip to content

Commit

Permalink
Re-add rowDataChan buffering to avoid Get call stalling. Closes #382
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre authored Aug 22, 2022
1 parent dccf629 commit a712d9d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
6 changes: 3 additions & 3 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ func (p *Plugin) executeForConnection(ctx context.Context, req *proto.ExecuteReq
}

// asyncronously fetch items
log.Printf("[TRACE] calling fetchItemsAsync, table: %s, matrixItem: %v, limit: %d, connectionCallId: %s\"", table.Name, queryData.Matrix, limit, connectionCallId)
if err := table.fetchItemsAsync(ctx, queryData); err != nil {
log.Printf("[WARN] fetchItemsAsync returned an error, table: %s, error: %v", table.Name, err)
log.Printf("[TRACE] calling fetchItems, table: %s, matrixItem: %v, limit: %d, connectionCallId: %s\"", table.Name, queryData.Matrix, limit, connectionCallId)
if err := table.fetchItems(ctx, queryData); err != nil {
log.Printf("[WARN] fetchItems returned an error, table: %s, error: %v", table.Name, err)
return err

}
Expand Down
7 changes: 5 additions & 2 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/turbot/steampipe-plugin-sdk/v4/telemetry"
)

// how may rows do we cache in the rowdata channel
const rowDataBufferSize = 100

// NOTE - any field added here must also be added to ShallowCopy

type QueryData struct {
Expand Down Expand Up @@ -117,8 +120,8 @@ func newQueryData(connectionCallId string, plugin *Plugin, queryContext *QueryCo

// asyncronously read items using the 'get' or 'list' API
// items are streamed on rowDataChan, errors returned on errorChan
rowDataChan: make(chan *RowData),
errorChan: make(chan error),
rowDataChan: make(chan *RowData, rowDataBufferSize),
errorChan: make(chan error, 1),
outputChan: outputChan,
listWg: &wg,

Expand Down
10 changes: 8 additions & 2 deletions plugin/table_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ When executing for each matrix item, the matrix item is put into the context, av
*/

// call either 'get' or 'list'.
func (t *Table) fetchItemsAsync(ctx context.Context, queryData *QueryData) error {
ctx, span := telemetry.StartSpan(ctx, t.Plugin.Name, "Table.fetchItemsAsync (%s)", t.Name)
func (t *Table) fetchItems(ctx context.Context, queryData *QueryData) error {
ctx, span := telemetry.StartSpan(ctx, t.Plugin.Name, "Table.fetchItems (%s)", t.Name)

defer span.End()

Expand Down Expand Up @@ -162,6 +162,12 @@ func (t *Table) doGetForQualValues(ctx context.Context, queryData *QueryData, ke
var getWg sync.WaitGroup
var errorChan = make(chan (error), len(qualValueList.Values))

// NOTE: ensure QueryData.rowDataChan can buffer sufficient items
// (we normally expect it would be sufficient)
if len(qualValueList.Values) > rowDataBufferSize {
queryData.rowDataChan = make(chan *RowData, len(qualValueList.Values))
}

// we will make a copy of queryData and update KeyColumnQuals to replace the list value with a single qual value
for _, qv := range qualValueList.Values {
// make a shallow copy of the query data and modify the quals
Expand Down

0 comments on commit a712d9d

Please sign in to comment.