Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
fix: Sub-resource partial fetch error handling (#163)
Browse files Browse the repository at this point in the history
Co-authored-by: Kemal Hadimli <[email protected]>
  • Loading branch information
disq and disq authored Jan 24, 2022
1 parent 134164d commit a0adcc0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
4 changes: 2 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes

g, gctx := errgroup.WithContext(ctx)
finishedResources := make(map[string]bool, len(resources))
l := sync.Mutex{}
l := &sync.Mutex{}
var totalResourceCount uint64 = 0
for _, resource := range resources {
table, ok := p.ResourceMap[resource]
Expand All @@ -202,9 +202,9 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes
}
resourceCount, err := execData.ResolveTable(gctx, p.meta, nil)
l.Lock()
defer l.Unlock()
finishedResources[r] = true
atomic.AddUint64(&totalResourceCount, resourceCount)
defer l.Unlock()
if err != nil {
status := cqproto.ResourceFetchFailed
if err == context.Canceled {
Expand Down
26 changes: 15 additions & 11 deletions provider/schema/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type ExecutionData struct {
partialFetchChan chan ResourceFetchError
// When the execution started
executionStart time.Time
// parent is the parent ExecutionData
parent *ExecutionData
}

// ResourceFetchError represents a single partial fetch failed resource
Expand Down Expand Up @@ -119,15 +121,16 @@ func (e *ExecutionData) ResolveTable(ctx context.Context, meta ClientMeta, paren
}
g, ctx := errgroup.WithContext(ctx)
// Start the partial fetch failure result channel routine
finishedPartialFetchChan := make(chan bool)
if e.partialFetch {
var finishedPartialFetchChan chan bool
if e.partialFetch && e.parent == nil {
finishedPartialFetchChan = make(chan bool)
e.partialFetchChan = make(chan ResourceFetchError, partialFetchFailureBufferLength)
go func() {
for fetchResourceFailure := range e.partialFetchChan {
meta.Logger().Debug("received failed partial fetch resource", "resource", fetchResourceFailure, "table", e.Table.Name)
e.PartialFetchFailureResult = append(e.PartialFetchFailureResult, fetchResourceFailure)
}
finishedPartialFetchChan <- true
close(finishedPartialFetchChan)
}()
}
var totalResources uint64
Expand All @@ -140,7 +143,7 @@ func (e *ExecutionData) ResolveTable(ctx context.Context, meta ClientMeta, paren
})
}
err := g.Wait()
if e.partialFetch {
if e.partialFetch && e.parent == nil {
close(e.partialFetchChan)
<-finishedPartialFetchChan
}
Expand All @@ -149,13 +152,14 @@ func (e *ExecutionData) ResolveTable(ctx context.Context, meta ClientMeta, paren

func (e *ExecutionData) WithTable(t *Table) *ExecutionData {
return &ExecutionData{
Table: t,
ResourceName: e.ResourceName,
Db: e.Db,
Logger: e.Logger,
extraFields: e.extraFields,
partialFetch: e.partialFetch,
PartialFetchFailureResult: []ResourceFetchError{},
Table: t,
ResourceName: e.ResourceName,
Db: e.Db,
Logger: e.Logger,
extraFields: e.extraFields,
partialFetch: e.partialFetch,
partialFetchChan: e.partialFetchChan,
parent: e,
}
}

Expand Down

0 comments on commit a0adcc0

Please sign in to comment.