Skip to content

Commit

Permalink
Merge pull request #1458 from ydb-platform/fix-tx
Browse files Browse the repository at this point in the history
* Added option `ydb.WithSessionPoolSessionIdleTimeToLive` for restric…
  • Loading branch information
asmyasnikov authored Sep 13, 2024
2 parents 200688b + d26f929 commit 8cb1931
Show file tree
Hide file tree
Showing 24 changed files with 748 additions and 301 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added option `ydb.WithSessionPoolSessionIdleTimeToLive` for restrict idle time of query sessions
* Fixed bug with leak of query transactions
* Changed `ydb_go_sdk_ydb_driver_conn_requests` metrics splitted to `ydb_go_sdk_ydb_driver_conn_request_statuses` and `ydb_go_sdk_ydb_driver_conn_request_methods`
* Fixed metadata for operation service connection
* Fixed composing query traces in call `db.Query.Do[Tx]` using option `query.WithTrace`
Expand Down
32 changes: 16 additions & 16 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ type (
Item
}
Config[PT ItemConstraint[T], T any] struct {
trace *Trace
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleThreshold time.Duration
trace *Trace
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
}
itemInfo[PT ItemConstraint[T], T any] struct {
idle *xlist.Element[PT]
touched time.Time
idle *xlist.Element[PT]
lastUsage time.Time
}
waitChPool[PT ItemConstraint[T], T any] interface {
GetOrNew() *chan PT
Expand Down Expand Up @@ -99,9 +99,9 @@ func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] {
}
}

func WithIdleThreshold[PT ItemConstraint[T], T any](idleThreshold time.Duration) Option[PT, T] {
func WithIdleTimeToLive[PT ItemConstraint[T], T any](idleTTL time.Duration) Option[PT, T] {
return func(c *Config[PT, T]) {
c.idleThreshold = idleThreshold
c.idleTimeToLive = idleTTL
}
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
if newItem != nil {
p.mu.WithLock(func() {
p.index[newItem] = itemInfo[PT, T]{
touched: p.config.clock.Now(),
lastUsage: p.config.clock.Now(),
}
})
}
Expand Down Expand Up @@ -461,7 +461,7 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
panic(fmt.Sprintf("inconsistent index: (%v, %+v, %+v)", has, el, info.idle))
}

return item, info.touched
return item, info.lastUsage
}

// removes first session from idle and resets the keepAliveCount
Expand Down Expand Up @@ -547,7 +547,7 @@ func (p *Pool[PT, T]) pushIdle(item PT, now time.Time) {
}

p.changeState(func() Stats {
info.touched = now
info.lastUsage = now
info.idle = p.idle.PushBack(item)
p.index[item] = info

Expand Down Expand Up @@ -595,7 +595,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
return info
})

if p.config.idleThreshold > 0 && p.config.clock.Since(info.touched) > p.config.idleThreshold {
if p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive {
p.closeItem(ctx, item)
p.mu.WithLock(func() {
p.changeState(func() Stats {
Expand Down
14 changes: 7 additions & 7 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,22 +394,22 @@ func TestPool(t *testing.T) {
// replace default async closer for sync testing
WithSyncCloseItem[*testItem, testItem](),
WithClock[*testItem, testItem](fakeClock),
WithIdleThreshold[*testItem, testItem](idleThreshold),
WithIdleTimeToLive[*testItem, testItem](idleThreshold),
WithTrace[*testItem, testItem](defaultTrace),
)

s1 := mustGetItem(t, p)
s2 := mustGetItem(t, p)

// Put both items at the absolutely same time.
// That is, both items must be updated their touched timestamp.
// That is, both items must be updated their lastUsage timestamp.
mustPutItem(t, p, s1)
mustPutItem(t, p, s2)

require.Len(t, p.index, 2)
require.Equal(t, 2, p.idle.Len())

// Move clock to longer than idleThreshold
// Move clock to longer than idleTimeToLive
fakeClock.Advance(idleThreshold + time.Nanosecond)

// on get item from idle list the pool must check the item idle timestamp
Expand All @@ -423,15 +423,15 @@ func TestPool(t *testing.T) {
t.Fatal("unexpected number of closed items")
}

// Move time to idleThreshold / 2 - this emulate a "spent" some time working within item.
// Move time to idleTimeToLive / 2 - this emulate a "spent" some time working within item.
fakeClock.Advance(idleThreshold / 2)

// Now put that item back
// pool must update a touched timestamp of item
// pool must update a lastUsage timestamp of item
mustPutItem(t, p, s3)

// Move time to idleThreshold / 2
// Total time since last updating touched timestampe is more than idleThreshold
// Move time to idleTimeToLive / 2
// Total time since last updating lastUsage timestampe is more than idleTimeToLive
fakeClock.Advance(idleThreshold/2 + time.Nanosecond)

require.Len(t, p.index, 1)
Expand Down
37 changes: 18 additions & 19 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ func do(

err := op(ctx, s)
if err != nil {
if xerrors.IsOperationError(err) {
s.SetStatus(session.StatusClosed)
}
s.SetStatus(session.StatusError)

return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -263,27 +261,27 @@ func doTx(
txSettings tx.Settings,
opts ...retry.Option,
) (finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) (opErr error) {
tx, err := s.Begin(ctx, txSettings)
if err != nil {
return xerrors.WithStackTrace(err)
}
err = op(ctx, tx)
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))

defer func() {
_ = tx.Rollback(ctx)

if opErr != nil {
s.SetStatus(session.StatusError)
}
}()

err = op(ctx, tx)
if err != nil {
return xerrors.WithStackTrace(err)
}

err = tx.CommitTx(ctx)
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}

return xerrors.WithStackTrace(err)
}

Expand Down Expand Up @@ -338,12 +336,12 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}

err = readAll(ctx, r)
err = readAll(ctx, streamResult)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -382,7 +380,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
_, streamResult, err := execute(ctx, s.ID(), s.client, q,
streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
Expand Down Expand Up @@ -434,12 +432,12 @@ func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
_, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
if err != nil {
return xerrors.WithStackTrace(err)
}

rs, err = readMaterializedResultSet(ctx, r)
rs, err = readMaterializedResultSet(ctx, streamResult)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -530,6 +528,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
var (
createCtx context.Context
Expand Down
Loading

0 comments on commit 8cb1931

Please sign in to comment.