Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-9864] Optimize the creation usage of AST documents #431

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package ast

const InvalidRef = -1

var astDocumentPool = newDocumentPool()

type Document struct {
Input Input
RootNodes []Node
Expand Down Expand Up @@ -55,7 +57,7 @@ type Document struct {
Index Index
}

func NewDocument() *Document {
func newDocumentWithPreAllocation() *Document {
return &Document{
RootNodes: make([]Node, 0, 48),
RootOperationTypeDefinitions: make([]RootOperationTypeDefinition, 0, 3),
Expand Down Expand Up @@ -106,6 +108,14 @@ func NewDocument() *Document {
}
}

func NewDocument() *Document {
return astDocumentPool.Get()
}

func (d *Document) Recycle() {
astDocumentPool.Put(d)
}

func (d *Document) Reset() {
d.RootNodes = d.RootNodes[:0]
d.SchemaDefinitions = d.SchemaDefinitions[:0]
Expand Down
26 changes: 26 additions & 0 deletions pkg/ast/document_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ast

import "sync"

type documentPool struct {
p sync.Pool
}

func newDocumentPool() *documentPool {
return &documentPool{
p: sync.Pool{
New: func() interface{} {
return newDocumentWithPreAllocation()
},
},
}
}

func (p *documentPool) Put(b *Document) {
b.Reset()
p.p.Put(b)
}

func (p *documentPool) Get() *Document {
return p.p.Get().(*Document)
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,10 @@ func (p *Planner) updateRepresentationsVariable(fieldConfig *plan.FieldConfigura
}

// "variables\":{\"representations\":[{\"upc\":\$$0$$\,\"__typename\":\"Product\"}]}}
parser := astparser.NewParser()
doc := ast.NewDocument()
doc.Input.ResetInputString(p.config.Federation.ServiceSDL)

parser := astparser.NewParser()
report := &operationreport.Report{}
parser.Parse(doc, report)
if report.HasErrors() {
Expand Down
9 changes: 5 additions & 4 deletions pkg/graphql/execution_engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (e *ExecutionEngineV2) Setup(ctx context.Context, postProcessor *postproces
}

func (e *ExecutionEngineV2) Plan(postProcessor *postprocess.Processor, operation *Request, report *operationreport.Report) (plan.Plan, error) {
cachedPlan := e.getCachedPlan(postProcessor, &operation.document, &e.config.schema.document, operation.OperationName, report)
cachedPlan := e.getCachedPlan(postProcessor, operation, &e.config.schema.document, operation.OperationName, report)
if report.HasErrors() {
return nil, report
}
Expand Down Expand Up @@ -317,12 +317,12 @@ func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, wri
return e.customExecutionEngineExecutor.Execute(ctx, operation, writer, options...)
}

func (e *ExecutionEngineV2) getCachedPlan(postProcessor *postprocess.Processor, operation, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {
func (e *ExecutionEngineV2) getCachedPlan(postProcessor *postprocess.Processor, request *Request, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {

hash := pool.Hash64.Get()
hash.Reset()
defer pool.Hash64.Put(hash)
err := astprinter.Print(operation, definition, hash)
err := astprinter.Print(&request.document, definition, hash)
if err != nil {
report.AddInternalError(err)
return nil
Expand All @@ -332,13 +332,14 @@ func (e *ExecutionEngineV2) getCachedPlan(postProcessor *postprocess.Processor,

if cached, ok := e.executionPlanCache.Get(cacheKey); ok {
if p, ok := cached.(plan.Plan); ok {
request.isDocumentRecyclable = true
return p
}
}

e.plannerMu.Lock()
defer e.plannerMu.Unlock()
planResult := e.planner.Plan(operation, definition, operationName, report)
planResult := e.planner.Plan(&request.document, definition, operationName, report)
if report.HasErrors() {
return nil
}
Expand Down
74 changes: 70 additions & 4 deletions pkg/graphql/execution_engine_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,7 +2357,7 @@ func TestExecutionEngineV2_GetCachedPlan(t *testing.T) {
}

report := operationreport.Report{}
cachedPlan := engine.getCachedPlan(firstInternalExecCtx.postProcessor, &gqlRequest.document, &schema.document, gqlRequest.OperationName, &report)
cachedPlan := engine.getCachedPlan(firstInternalExecCtx.postProcessor, &gqlRequest, &schema.document, gqlRequest.OperationName, &report)
_, oldestCachedPlan, _ := engine.executionPlanCache.GetOldest()
assert.False(t, report.HasErrors())
assert.Equal(t, 1, engine.executionPlanCache.Len())
Expand All @@ -2368,7 +2368,7 @@ func TestExecutionEngineV2_GetCachedPlan(t *testing.T) {
http.CanonicalHeaderKey("Authorization"): []string{"123abc"},
}

cachedPlan = engine.getCachedPlan(secondInternalExecCtx.postProcessor, &gqlRequest.document, &schema.document, gqlRequest.OperationName, &report)
cachedPlan = engine.getCachedPlan(secondInternalExecCtx.postProcessor, &gqlRequest, &schema.document, gqlRequest.OperationName, &report)
_, oldestCachedPlan, _ = engine.executionPlanCache.GetOldest()
assert.False(t, report.HasErrors())
assert.Equal(t, 1, engine.executionPlanCache.Len())
Expand All @@ -2385,7 +2385,7 @@ func TestExecutionEngineV2_GetCachedPlan(t *testing.T) {
}

report := operationreport.Report{}
cachedPlan := engine.getCachedPlan(firstInternalExecCtx.postProcessor, &gqlRequest.document, &schema.document, gqlRequest.OperationName, &report)
cachedPlan := engine.getCachedPlan(firstInternalExecCtx.postProcessor, &gqlRequest, &schema.document, gqlRequest.OperationName, &report)
_, oldestCachedPlan, _ := engine.executionPlanCache.GetOldest()
assert.False(t, report.HasErrors())
assert.Equal(t, 1, engine.executionPlanCache.Len())
Expand All @@ -2396,7 +2396,7 @@ func TestExecutionEngineV2_GetCachedPlan(t *testing.T) {
http.CanonicalHeaderKey("Authorization"): []string{"xyz098"},
}

cachedPlan = engine.getCachedPlan(secondInternalExecCtx.postProcessor, &differentGqlRequest.document, &schema.document, differentGqlRequest.OperationName, &report)
cachedPlan = engine.getCachedPlan(secondInternalExecCtx.postProcessor, &differentGqlRequest, &schema.document, differentGqlRequest.OperationName, &report)
_, oldestCachedPlan, _ = engine.executionPlanCache.GetOldest()
assert.False(t, report.HasErrors())
assert.Equal(t, 2, engine.executionPlanCache.Len())
Expand Down Expand Up @@ -2865,3 +2865,69 @@ func BenchmarkExecutionEngineV2_Execute(b *testing.B) {
execCtxCancel()
}
}

func BenchmarkExecutionEngineV2_Execute_AstDocumentPool(b *testing.B) {
b.ResetTimer() // Reset the benchmark timer before starting the loop
b.ReportAllocs() // Report memory allocations

schemaString := `type Query {
hello: String!
}`
schema, _ := NewSchemaFromString(schemaString)

operation := Request{
OperationName: "",
Variables: nil,
Query: "{ hello }",
}

dataSources := []plan.DataSourceConfiguration{
{
RootNodes: []plan.TypeField{
{
TypeName: "Query",
FieldNames: []string{"hello"},
},
},
ChildNodes: []plan.TypeField{},
Factory: &rest_datasource.Factory{
Client: &http.Client{
Transport: benchmarkRoundTripper{},
},
},
Custom: rest_datasource.ConfigJSON(rest_datasource.Configuration{
Fetch: rest_datasource.FetchConfiguration{
URL: "https://example.com/",
Method: "GET",
},
}),
},
}
fields := []plan.FieldConfiguration{
{
TypeName: "Query",
FieldName: "hello",
Arguments: []plan.ArgumentConfiguration{},
},
}

engineConf := NewEngineV2Configuration(schema)
engineConf.SetDataSources(dataSources)
engineConf.SetFieldConfigurations(fields)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

engine, _ := NewExecutionEngineV2(ctx, abstractlogger.Noop{}, engineConf)
for n := 0; n < b.N; n++ {
resultWriter := NewEngineResultWriter()
execCtx, execCtxCancel := context.WithCancel(context.Background())
_ = engine.Execute(execCtx, &operation, &resultWriter)
execCtxCancel()

operation.Cleanup()
// Force the engine to parse and normalize the query again
operation.isParsed = false
operation.isNormalized = false
operation.isDocumentRecyclable = false
}
}
20 changes: 15 additions & 5 deletions pkg/graphql/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type Request struct {
Variables json.RawMessage `json:"variables,omitempty"`
Query string `json:"query"`

document ast.Document
isParsed bool
isNormalized bool
request resolve.Request
document ast.Document
isDocumentRecyclable bool
isParsed bool
isNormalized bool
request resolve.Request

validForSchema map[uint64]ValidationResult
}
Expand Down Expand Up @@ -96,7 +97,7 @@ func (r *Request) CalculateComplexity(complexityCalculator ComplexityCalculator,
return complexityCalculator.Calculate(&r.document, &schema.document)
}

func (r Request) Print(writer io.Writer) (n int, err error) {
func (r *Request) Print(writer io.Writer) (n int, err error) {
report := r.parseQueryOnce()
if report.HasErrors() {
return 0, report
Expand All @@ -105,6 +106,15 @@ func (r Request) Print(writer io.Writer) (n int, err error) {
return writer.Write(r.document.Input.RawBytes)
}

// Cleanup releases the resources that have been allocated during the query execution.
// Please note that a Request instance is not reusable.
func (r *Request) Cleanup() {
if !r.isDocumentRecyclable {
return
}
r.document.Recycle()
}

func (r *Request) IsNormalized() bool {
return r.isNormalized
}
Expand Down
Loading