Skip to content

Commit

Permalink
Merge pull request #84 from ydb-platform/stream-result
Browse files Browse the repository at this point in the history
drop creation of goroutine on each stream call - stream.Recv() called…
  • Loading branch information
asmyasnikov authored Jan 17, 2022
2 parents 685a635 + 0b24cd2 commit b5971c6
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 147 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Added `retry.RetryableError()` for returns user-defined error which must be retryed
* Renamed internal type `internal.errors.OperationCompleted` to `internal.errors.OperationStatus`
* Added `String()` method to `table.KeyRange` and `table.Value` types
* Replaced creation of goroutine on each stream call to explicit call stream.Recv() on NextResultSet()

## 3.6.2
* Refactored table retry helpers
Expand Down
3 changes: 2 additions & 1 deletion internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,12 @@ func (c *conn) NewStream(

var s grpc.ClientStream
s, err = cc.NewStream(ctx, desc, method, append(opts, grpc.MaxCallRecvMsgSize(50*1024*1024))...)
c.release(ctx)
if err != nil {
err = errors.MapGRPCError(err)
if errors.MustPessimizeEndpoint(err) {
c.pessimize(ctx, err)
}
c.release(ctx)
return nil, err
}

Expand All @@ -349,6 +349,7 @@ func (c *conn) NewStream(
s: s,
onDone: func(ctx context.Context) {
cancel()
c.release(ctx)
},
recv: streamRecv,
}, nil
Expand Down
54 changes: 20 additions & 34 deletions internal/scripting/scripting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package scheme

import (
"context"
"io"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Scripting_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Scripting"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
"github.com/ydb-platform/ydb-go-sdk/v3/scripting"
Expand Down Expand Up @@ -107,43 +107,29 @@ func (c *client) StreamExecute(
return nil, err
}

r := scanner.NewStream()
go func() {
var (
response *Ydb_Scripting.ExecuteYqlPartialResponse
err error
)
defer func() {
cancel()
r.Close()
}()
for {
return scanner.NewStream(
func(ctx context.Context) (
set *Ydb.ResultSet,
stats *Ydb_TableStats.QueryStats,
err error,
) {
select {
case <-ctx.Done():
err = ctx.Err()
r.SetErr(err)
return
return nil, nil, ctx.Err()
default:
if response, err = stream.Recv(); err != nil {
if !errors.Is(err, io.EOF) {
r.SetErr(err)
// nolint:ineffassign
err = nil
}
return
}
if result := response.GetResult(); result != nil {
if resultSet := result.GetResultSet(); resultSet != nil {
r.Append(resultSet)
}
if stats := result.GetQueryStats(); stats != nil {
r.UpdateStats(stats)
}
response, err := stream.Recv()
result := response.GetResult()
if result == nil || err != nil {
return nil, nil, err
}
return result.GetResultSet(), result.GetQueryStats(), nil
}
}
}()
return r, nil
},
func(err error) error {
cancel()
return err
},
), nil
}

func (c *client) Close(context.Context) error {
Expand Down
60 changes: 28 additions & 32 deletions internal/table/scanner/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package scanner

import (
"context"
"io"
"sync"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
public "github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/stats"
)

type result struct {
scanner

stats *Ydb_TableStats.QueryStats
statsMtx sync.RWMutex
stats *Ydb_TableStats.QueryStats

closedMtx sync.RWMutex
closed bool
Expand All @@ -24,7 +27,8 @@ type result struct {
type streamResult struct {
result

ch chan *Ydb.ResultSet
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error)
close func(error) error
}

type unaryResult struct {
Expand Down Expand Up @@ -55,19 +59,6 @@ func (r *result) isClosed() bool {
return r.closed
}

func (r *result) UpdateStats(stats *Ydb_TableStats.QueryStats) {
r.stats = stats
}

func (r *streamResult) Append(set *Ydb.ResultSet) {
r.closedMtx.RLock()
defer r.closedMtx.RUnlock()
if r.closed {
return
}
r.ch <- set
}

type resultWithError interface {
SetErr(err error)
}
Expand All @@ -80,14 +71,15 @@ type UnaryResult interface {
type StreamResult interface {
public.StreamResult
resultWithError

Append(set *Ydb.ResultSet)
UpdateStats(stats *Ydb_TableStats.QueryStats)
}

func NewStream() StreamResult {
func NewStream(
recv func(ctx context.Context) (*Ydb.ResultSet, *Ydb_TableStats.QueryStats, error),
onClose func(error) error,
) StreamResult {
r := &streamResult{
ch: make(chan *Ydb.ResultSet, 1),
recv: recv,
close: onClose,
}
return r
}
Expand Down Expand Up @@ -128,15 +120,11 @@ func (r *streamResult) NextResultSet(ctx context.Context, columns ...string) boo
if r.inactive() {
return false
}
select {
case s, ok := <-r.ch:
if !ok || s == nil {
return false
}
r.Reset(s, columns...)
return true

case <-ctx.Done():
s, stats, err := r.recv(ctx)
if errors.Is(err, io.EOF) {
return false
}
if err != nil {
r.errMtx.Lock()
if r.err == nil {
r.err = ctx.Err()
Expand All @@ -145,6 +133,13 @@ func (r *streamResult) NextResultSet(ctx context.Context, columns ...string) boo
r.Reset(nil)
return false
}
r.Reset(s, columns...)
if stats != nil {
r.statsMtx.Lock()
r.stats = stats
r.statsMtx.Unlock()
}
return true
}

// CurrentResultSet get current result set
Expand All @@ -155,8 +150,10 @@ func (r *result) CurrentResultSet() public.Set {
// Stats returns query execution queryStats.
func (r *result) Stats() stats.QueryStats {
var s queryStats
r.statsMtx.RLock()
s.stats = r.stats
s.processCPUTime = time.Microsecond * time.Duration(r.stats.GetProcessCpuTimeUs())
r.statsMtx.RUnlock()
s.processCPUTime = time.Microsecond * time.Duration(s.stats.GetProcessCpuTimeUs())
s.pos = 0
return &s
}
Expand All @@ -169,8 +166,7 @@ func (r *streamResult) Close() (err error) {
return nil
}
r.closed = true
close(r.ch)
return nil
return r.close(r.Err())
}

func (r *result) inactive() bool {
Expand Down
104 changes: 41 additions & 63 deletions internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package table

import (
"context"
"io"
"net/url"
"strconv"
"sync"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/cluster"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
Expand Down Expand Up @@ -700,43 +700,34 @@ func (s *session) StreamReadTable(
onDone := trace.TableOnSessionQueryStreamRead(s.trace, &ctx, s)
if err != nil {
cancel()
onDone(nil, err)
onDone(err)
return nil, err
}

r := scanner.NewStream()
go func() {
var (
response *Ydb_Table.ReadTableResponse
err error
)
defer func() {
cancel()
onDone(r, errors.HideEOF(err))
r.Close()
}()
for {
return scanner.NewStream(
func(ctx context.Context) (
set *Ydb.ResultSet,
stats *Ydb_TableStats.QueryStats,
err error,
) {
select {
case <-ctx.Done():
err = ctx.Err()
r.SetErr(err)
return
return nil, nil, ctx.Err()
default:
if response, err = stream.Recv(); err != nil {
if !errors.Is(err, io.EOF) {
r.SetErr(err)
}
return
}
if result := response.GetResult(); result != nil {
if resultSet := result.GetResultSet(); resultSet != nil {
r.Append(resultSet)
}
response, err := stream.Recv()
result := response.GetResult()
if result == nil || err != nil {
return nil, nil, err
}
return result.GetResultSet(), nil, nil
}
}
}()
return r, nil
},
func(err error) error {
cancel()
onDone(err)
return err
},
), nil
}

// StreamExecuteScanQuery scan-reads table at given path with given options.
Expand Down Expand Up @@ -771,47 +762,34 @@ func (s *session) StreamExecuteScanQuery(
onDone := trace.TableOnSessionQueryStreamExecute(s.trace, &ctx, s, q, params)
if err != nil {
cancel()
onDone(nil, err)
onDone(err)
return nil, err
}

r := scanner.NewStream()
go func() {
var (
response *Ydb_Table.ExecuteScanQueryPartialResponse
err error
)
defer func() {
cancel()
onDone(r, errors.HideEOF(err))
r.Close()
}()
for {
return scanner.NewStream(
func(ctx context.Context) (
set *Ydb.ResultSet,
stats *Ydb_TableStats.QueryStats,
err error,
) {
select {
case <-ctx.Done():
err = ctx.Err()
r.SetErr(err)
return
return nil, nil, ctx.Err()
default:
if response, err = stream.Recv(); err != nil {
if !errors.Is(err, io.EOF) {
r.SetErr(err)
err = nil
}
return
}
if result := response.GetResult(); result != nil {
if resultSet := result.GetResultSet(); resultSet != nil {
r.Append(resultSet)
}
if stats := result.GetQueryStats(); stats != nil {
r.UpdateStats(stats)
}
response, err := stream.Recv()
result := response.GetResult()
if result == nil || err != nil {
return nil, nil, err
}
return result.GetResultSet(), result.GetQueryStats(), nil
}
}
}()
return r, nil
},
func(err error) error {
cancel()
onDone(err)
return err
},
), nil
}

// BulkUpsert uploads given list of ydb struct values to the table.
Expand Down
Loading

0 comments on commit b5971c6

Please sign in to comment.