Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgwire: implement cancellation protocol #34520

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/sql/cluster_wide_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ func BytesToClusterWideID(b []byte) ClusterWideID {
func (id ClusterWideID) GetNodeID() int32 {
return int32(0xFFFFFFFF & id.Lo)
}

// GetPGWireCancelInfo extracts the timestamp of the cluster wide ID as 2
// int32s, for use by the pgwire cancellation protocol.
func (id ClusterWideID) GetPGWireCancelInfo() (int32, int32) {
return int32(id.Hi >> 32), int32(0xFFFFFFFF & id.Hi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original id is 128 bits. I'd like a comment here showing that using only 64 of the bits is secure. Presumably the 128 was chosen for a good reason and for some reason 64 wasn't good enough. So, why is it ok here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the cluster-wide ID needs to be unique across the entire cluster. This ID only needs to be unique on a single node, so I figured you could just use the timestamp part of the cluster-wide ID. Maybe I'm wrong though, thoughts?

This commit wouldn't support canceling queries on node A by sending the message to node B, but judging by the lib/pq implementation that's not needed or expected anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it theoretically possible for two identical wall times to be used for two different connections on the same node? If so then one connection could incorrectly cancel another connection. I'm not sure exactly how the hlc timestamp is generated here, or if this is possible or not. Seems like it might be because of the logical part. Like if one node's clock gets too ahead or behind (I'm not sure which) of the others in the cluster then...it's hlc clock will like not progress its wall time but increment it's logical right? In that case two connections during that time period will have the same wall and could hit this. If this can't happen I'd like a comment explaining why.

Copy link
Member

@tbg tbg Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very science dog here, but after the absence of this PR annoyed me for the millionth time, I looked. So we need 64 bit but our IDs are 128bit, and that's making us uneasy, right?

https://github.com/cockroachdb/vendored/blob/223d2a8a5a0cc5fe1268e7942b4db86798bed955/github.com/lib/pq/conn_go18.go#L134-L137

We have to keep the NodeID for load-balanced cancellation to work. NodeIDs are int32 but they're always positive, so we have So we have 16+32 bits left. If we fill the 48 bits with true pseudo-randomness (i.e. don't pick part of the timestamp here), wouldn't that be good enough? We can even boost it up if we dedicate the first bit to deciding whether the NodeID is uvarint encoded or not. NodeIDs are almost certainly nowhere close to MaxInt32 which gives us extra bits in the expected case. But honestly, that seems over the top. 48 bits of randomness should be plenty and at the same time cheap enough to generate.

I haven't looked but I'm guessing that we like that our ClusterWideIDs contain relevant data (?), so my suggestion would be to just tack these random 48bit on locally (they're not shared with other nodes).

To handle a cancel() request, the receiving node first checks if the NodeID matches, in which case it cancels locally. Otherwise it proxies the cancel() over to the node that is supposed to handle it.

Would love for this to get finished. We write this code way too often:

query := SET statement_timeout='5s'; SELECT

or worse, forget, and things just hang if they didn't work out the way they ought to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeIDs are int32 but they're always positive, so we have 16+32 bits left.

Your math is off. The sign is just 1 bit. There's just 33 bits left, not 48.

The rest of the argument relative to randomness feels weird to me. Your argument stands trivially if we are guaranteed that all queries terminate nearly instantaneously. But what happens in case a query is long-running? The chance of conflict increases.

We can probably compute the probability based on a distribution of latencies and the frequency of new queries. Unfortunately I don't have the math at the top of my head. Do you remember how that works?

the receiving node first checks if the NodeID matches, in which case it cancels locally. Otherwise it proxies the cancel() over to the node that is supposed to handle it.

Yes that bit is correct (and necessary when load balancers are involved).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use it to implement our own thing in tests.

I just looked and CustomCancel allows us to just call conn.Cancel() which is exactly what I think we'd want to do throughout all CRDB-related uses of this.

I don't have a strong case for implementing the cancel protocol with this option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the CustomCancel config was removed in the latest version of pgx, so we should avoid introducing a dependency on it. This is discussed in jackc/pgx#679, which is also a generally interesting discussion on PgBouncer's handling of the cancel protocol: jackc/pgx#679 (comment). @andreimatei might be interested in that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CustomCancel is removed in the latest version because that version now closes the connection upon context cancellation. That's great! It's even better and we could use that to fix our test behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go isn't the only language that uses the cancel protocol though! Let's not make the mistake of blinding ourselves to other client behavior - even if Go's cancel works well for our purposes, other languages still use the cancel protocol.

See PGJDBC, the main Java driver: https://github.com/pgjdbc/pgjdbc/blob/14576f4bca3a2484fd4f81a0d8276ae5cab9a419/pgjdbc/src/main/java/org/postgresql/core/QueryExecutorBase.java#L164

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jordan my comment above is driven by context: the reason why Tobias (and then myself) got interested in this recently is because it's annoying our own test suite -- which is focused on Go, lib/pq and hopefully soon pgx.

For the wider picture, unfortunately as established above we are still in a bind and there's no good solution in sight. This conclusion is further substantiated by the discussion on pgx's own repository ,see the PR linked by Nathan above.

As per my comment at #34520 (comment) as well as Matt's at #34520 (comment)
I personally wouldn't spend time on this further than needed to advance our own testing agenda. The design of postgres' cancel protocol is just very bad and trying to do something here will create a lot of complexity on our side (with unclear gain).

}
18 changes: 17 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ func (h ConnectionHandler) GetParamStatus(ctx context.Context, varName string) s
return defVal
}

// GetStatusParam retrieves the configured value of the session
// variable identified by varName. This is used for the initial
// message sent to a client during a session set-up.
func (h ConnectionHandler) GetSessionID() ClusterWideID {
return h.ex.sessionID
}

// ServeConn serves a client connection by reading commands from the stmtBuf
// embedded in the ConnHandler.
//
Expand Down Expand Up @@ -655,6 +662,7 @@ func (s *Server) newConnExecutor(
settings: s.cfg.Settings,
}
ex.extraTxnState.txnRewindPos = -1
ex.sessionID = ex.generateID()
ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta)
ex.machine = fsm.MakeMachine(TxnStateTransitions, stateNoTxn{}, &ex.state)

Expand Down Expand Up @@ -1282,7 +1290,6 @@ func (ex *connExecutor) run(
ex.ctxHolder.connCtx = ctx
ex.onCancelSession = onCancel

ex.sessionID = ex.generateID()
ex.server.cfg.SessionRegistry.register(ex.sessionID, ex)
ex.planner.extendedEvalCtx.setSessionID(ex.sessionID)
defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID)
Expand Down Expand Up @@ -2209,6 +2216,15 @@ func (ex *connExecutor) cancelQuery(queryID ClusterWideID) bool {
return false
}

// cancelCurrentQueries is part of the registrySession interface.
func (ex *connExecutor) cancelCurrentQueries() {
ex.mu.Lock()
defer ex.mu.Unlock()
for _, queryMeta := range ex.mu.ActiveQueries {
queryMeta.cancel()
}
}

// cancelSession is part of the registrySession interface.
func (ex *connExecutor) cancelSession() {
if ex.onCancelSession == nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,36 +1098,52 @@ type SessionArgs struct {
// Use register() and deregister() to modify this registry.
type SessionRegistry struct {
syncutil.Mutex
sessions map[ClusterWideID]registrySession
sessions map[ClusterWideID]registrySession
sessionsByPGWire map[uint64]registrySession
}

// NewSessionRegistry creates a new SessionRegistry with an empty set
// of sessions.
func NewSessionRegistry() *SessionRegistry {
return &SessionRegistry{sessions: make(map[ClusterWideID]registrySession)}
return &SessionRegistry{
sessions: make(map[ClusterWideID]registrySession),
sessionsByPGWire: make(map[uint64]registrySession),
}
}

func (r *SessionRegistry) register(id ClusterWideID, s registrySession) {
r.Lock()
r.sessions[id] = s
r.sessionsByPGWire[id.Hi] = s
r.Unlock()
}

func (r *SessionRegistry) deregister(id ClusterWideID) {
r.Lock()
delete(r.sessions, id)
delete(r.sessionsByPGWire, id.Hi)
r.Unlock()
}

type registrySession interface {
user() string
cancelQuery(queryID ClusterWideID) bool
cancelCurrentQueries()
cancelSession()
// serialize serializes a Session into a serverpb.Session
// that can be served over RPC.
serialize() serverpb.Session
}

// CancelQueryByPGWire looks up the associated query in the session registry and cancels it.
func (r *SessionRegistry) CancelQueryByPGWire(code, secret uint32) {
r.Lock()
defer r.Unlock()
if session, ok := r.sessionsByPGWire[(uint64(code)<<32)|uint64(secret)]; ok {
session.cancelCurrentQueries()
}
}

// CancelQuery looks up the associated query in the session registry and cancels it.
func (r *SessionRegistry) CancelQuery(queryIDStr string, username string) (bool, error) {
queryID, err := StringToClusterWideID(queryIDStr)
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,18 @@ func (c *conn) sendInitialConnData(
return sql.ConnectionHandler{}, err
}

// Send our cancel code and secret to the client, so they can cancel the
// connection. The cancel code and secret are both int32's. The cancel code
// is supposed to be the connection's PID, but since we don't have one of
// those, we'll just generate a random int64 and use it instead.
code, secret := connHandler.GetSessionID().GetPGWireCancelInfo()
c.msgBuilder.initMsg(pgwirebase.ServerMsgBackendKeyData)
c.msgBuilder.putInt32(code)
c.msgBuilder.putInt32(secret)
if err := c.msgBuilder.finishMsg(c.conn); err != nil {
return sql.ConnectionHandler{}, err
}

// An initial readyForQuery message is part of the handshake.
c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)
c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/pgwire/pgwirebase/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
ClientMsgTerminate ClientMessageType = 'X'

ServerMsgAuth ServerMessageType = 'R'
ServerMsgBackendKeyData ServerMessageType = 'K'
ServerMsgBindComplete ServerMessageType = '2'
ServerMsgCommandComplete ServerMessageType = 'C'
ServerMsgCloseComplete ServerMessageType = '3'
Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/pgwire/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,15 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket
// If the client is really issuing a cancel request, close the door
// in their face (we don't support it yet). Make a note of that use
// in telemetry.
telemetry.Inc(sqltelemetry.CancelRequestCounter)
_ = conn.Close()
code, err := buf.GetUint32()
if err != nil {
return err
}
secret, err := buf.GetUint32()
if err != nil {
return err
}
s.execCfg.SessionRegistry.CancelQueryByPGWire(code, secret)
return nil

case version30:
Expand Down