Skip to content

Commit

Permalink
CosmosDB iterator to use dynamic batch size after the first list (#7892)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Jun 20, 2024
1 parent 7261276 commit 28de850
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions pkg/kv/cosmosdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Store struct {

const (
DriverName = "cosmosdb"

// '-1' Used by the cosmosdb client for dynamic page size
dynamicPageSize = -1
)

// encoding is the encoding used to encode the partition keys, ids and values.
Expand Down Expand Up @@ -358,8 +361,9 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp
startKey: options.KeyStart,
queryCtx: ctx,
encoding: encoding,
batchSize: options.BatchSize,
}
if err := it.runQuery(options.BatchSize); err != nil {
if err := it.runQuery(true); err != nil {
return nil, convertError(err)
}
return it, nil
Expand All @@ -372,6 +376,7 @@ type EntriesIterator struct {
store *Store
partitionKey []byte
startKey []byte
batchSize int

entry *kv.Entry
err error
Expand Down Expand Up @@ -414,18 +419,25 @@ func (e *EntriesIterator) Next() bool {
if !e.queryPager.More() {
return false
}
var err error
e.currPage, err = e.queryPager.NextPage(e.queryCtx)
if err != nil {
e.err = fmt.Errorf("getting next page: %w", convertError(err))
return false
}
if len(e.currPage.Items) == 0 {
// returned page is empty, no more items
return false
if e.batchSize != dynamicPageSize {
if err := e.handleBatchSizeChange(); err != nil {
e.err = convertError(err)
return false
}
} else {
var err error
e.currPage, err = e.queryPager.NextPage(e.queryCtx)
if err != nil {
e.err = fmt.Errorf("getting next page: %w", convertError(err))
return false
}
if len(e.currPage.Items) == 0 {
// returned page is empty, no more items
return false
}
e.currPageSeekedKey = nil
e.currEntryIdx = -1
}
e.currPageSeekedKey = nil
e.currEntryIdx = -1
}
e.currEntryIdx++
key, value := e.getKeyValue(e.currEntryIdx)
Expand All @@ -440,11 +452,20 @@ func (e *EntriesIterator) Next() bool {
return true
}

// handleBatchSizeChange handles running query after the first query ran with limited batch size.
// The reason we switch the batch size is to avoid issues like https://github.com/treeverse/lakeFS/issues/7864
func (e *EntriesIterator) handleBatchSizeChange() error {
e.startKey = e.entry.Key
e.batchSize = dynamicPageSize
return e.runQuery(false)
}

func (e *EntriesIterator) SeekGE(key []byte) {
e.startKey = key
if !e.isInRange() {
// '-1' Used for dynamic page size.
if err := e.runQuery(-1); err != nil {
e.batchSize = dynamicPageSize
if err := e.runQuery(true); err != nil {
e.err = convertError(err)
}
return
Expand Down Expand Up @@ -475,11 +496,17 @@ func (e *EntriesIterator) Close() {
e.err = kv.ErrClosedEntries
}

func (e *EntriesIterator) runQuery(limit int) error {
func (e *EntriesIterator) runQuery(includeStartKey bool) error {
operator := ">="
if !includeStartKey {
operator = ">"
}
query := fmt.Sprintf("select * from c where c.key %s @start order by c.key", operator)

pk := azcosmos.NewPartitionKeyString(encoding.EncodeToString(e.partitionKey))
e.queryPager = e.store.containerClient.NewQueryItemsPager("select * from c where c.key >= @start order by c.key", pk, &azcosmos.QueryOptions{
e.queryPager = e.store.containerClient.NewQueryItemsPager(query, pk, &azcosmos.QueryOptions{
ConsistencyLevel: e.store.consistencyLevel.ToPtr(),
PageSizeHint: int32(limit),
PageSizeHint: int32(e.batchSize),
QueryParameters: []azcosmos.QueryParameter{{
Name: "@start",
Value: encoding.EncodeToString(e.startKey),
Expand Down

0 comments on commit 28de850

Please sign in to comment.