diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 5b7edf367853..8cb4cbc1bb1a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -694,12 +694,12 @@ func (s *Server) GetBytesMonitor() *mon.BytesMonitor { // // Args: // args: The initial session parameters. They are validated by SetupConn -// and an error is returned if this validation fails. +// and an error is returned if this validation fails. // stmtBuf: The incoming statement for the new connExecutor. // clientComm: The interface through which the new connExecutor is going to -// produce results for the client. +// produce results for the client. // memMetrics: The metrics that statements executed on this connection will -// contribute to. +// contribute to. func (s *Server) SetupConn( ctx context.Context, args SessionArgs, @@ -1735,7 +1735,7 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData { // Args: // parentMon: The root monitor. // reserved: Memory reserved for the connection. The connExecutor takes -// ownership of this memory. +// ownership of this memory. func (ex *connExecutor) activate( ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount, ) { @@ -2362,8 +2362,6 @@ func isCopyToExternalStorage(cmd CopyIn) bool { func (ex *connExecutor) execCopyIn( ctx context.Context, cmd CopyIn, ) (_ fsm.Event, retPayload fsm.EventPayload, retErr error) { - logStatements := logStatementsExecuteEnabled.Get(ex.planner.execCfg.SV()) - ex.incrementStartedStmtCounter(cmd.Stmt) defer func() { if retErr == nil && !payloadHasError(retPayload) { @@ -2374,10 +2372,6 @@ func (ex *connExecutor) execCopyIn( } }() - if logStatements { - log.SqlExec.Infof(ctx, "executing %s", cmd) - } - // When we're done, unblock the network connection. defer cmd.CopyDone.Done() @@ -2432,40 +2426,41 @@ func (ex *connExecutor) execCopyIn( ex.initPlanner(ctx, p) ex.resetPlanner(ctx, p, txn, stmtTS) } - var cm copyMachineInterface - var err error - if isCopyToExternalStorage(cmd) { - cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon) - } else { - // The planner will be prepared before use. - p := planner{execCfg: ex.server.cfg} - cm, err = newCopyMachine( - ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon, - // execInsertPlan - func(ctx context.Context, p *planner, res RestrictedCommandResult) error { - _, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */) - return err - }, - ) - } - if err != nil { - ev := eventNonRetriableErr{IsCommit: fsm.False} - payload := eventNonRetriableErrPayload{err: err} - return ev, payload, nil + + // These fields need to be set for logging purposes. + ex.planner.stmt = Statement{ + Statement: cmd.ParsedStmt, } - defer func() { - cm.Close(ctx) + ann := tree.MakeAnnotations(0) + ex.planner.extendedEvalCtx.Context.Annotations = &ann + ex.planner.extendedEvalCtx.Context.Placeholders = &tree.PlaceholderInfo{} + ex.planner.curPlan.stmt = &ex.planner.stmt + var cm copyMachineInterface + var copyErr error + // Log the query for sampling. + defer func() { + var numInsertedRows int + if cm != nil { + numInsertedRows = cm.numInsertedRows() + } // These fields are not available in COPY, so use the empty value. - var stmtFingerprintID roachpb.StmtFingerprintID + f := tree.NewFmtCtx(tree.FmtHideConstants) + f.FormatNode(cmd.Stmt) + stmtFingerprintID := roachpb.ConstructStatementFingerprintID( + f.CloseAndGetString(), + copyErr != nil, + ex.implicitTxn(), + ex.planner.CurrentDatabase(), + ) var stats topLevelQueryStats ex.planner.maybeLogStatement( ctx, ex.executorType, int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter, - cm.numInsertedRows(), - retErr, + numInsertedRows, + copyErr, ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived), &ex.extraTxnState.hasAdminRoleCache, ex.server.TelemetryLoggingMetrics, @@ -2474,9 +2469,30 @@ func (ex *connExecutor) execCopyIn( ) }() - if err := ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error { + if isCopyToExternalStorage(cmd) { + cm, copyErr = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg, ex.state.mon) + } else { + // The planner will be prepared before use. + p := planner{execCfg: ex.server.cfg} + cm, copyErr = newCopyMachine( + ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon, + // execInsertPlan + func(ctx context.Context, p *planner, res RestrictedCommandResult) error { + _, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */) + return err + }, + ) + } + if copyErr != nil { + ev := eventNonRetriableErr{IsCommit: fsm.False} + payload := eventNonRetriableErrPayload{err: copyErr} + return ev, payload, nil + } + defer cm.Close(ctx) + + if copyErr = ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error { return cm.run(ctx) - }); err != nil { + }); copyErr != nil { // TODO(andrei): We don't have a retriable error story for the copy machine. // When running outside of a txn, the copyMachine should probably do retries // internally. When not, it's unclear what we should do. For now, we abort @@ -2485,7 +2501,7 @@ func (ex *connExecutor) execCopyIn( // should terminate the connection) from query errors. For now, we treat all // errors as query errors. ev := eventNonRetriableErr{IsCommit: fsm.False} - payload := eventNonRetriableErrPayload{err: err} + payload := eventNonRetriableErrPayload{err: copyErr} return ev, payload, nil } return nil, nil, nil diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index 40db4caf38ba..bc0fb4443e6f 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -321,7 +321,8 @@ var _ Command = Flush{} // CopyIn is the command for execution of the Copy-in pgwire subprotocol. type CopyIn struct { - Stmt *tree.CopyFrom + ParsedStmt parser.Statement + Stmt *tree.CopyFrom // Conn is the network connection. Execution of the CopyFrom statement takes // control of the connection. Conn pgwirebase.Conn diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 8446076899fb..ecd876dc3e69 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -273,6 +273,9 @@ func newCopyMachine( } func (c *copyMachine) numInsertedRows() int { + if c == nil { + return 0 + } return c.insertedRows } diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index 4e3979f232d2..e87d27de1246 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -171,6 +171,9 @@ func CopyInFileStmt(destination, schema, table string) string { } func (f *fileUploadMachine) numInsertedRows() int { + if f == nil { + return 0 + } return f.c.numInsertedRows() } diff --git a/pkg/sql/copy_in_test.go b/pkg/sql/copy_in_test.go index 71b9299834df..d4d70ef78053 100644 --- a/pkg/sql/copy_in_test.go +++ b/pkg/sql/copy_in_test.go @@ -406,6 +406,80 @@ func TestCopyError(t *testing.T) { } } +// TestCopyTrace verifies copy works with tracing turned on. +func TestCopyTrace(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, strings := range [][]string{ + {`SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, + {`SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true`}, + {`SET CLUSTER SETTING sql.log.unstructured_entries.enabled = true`, `SET CLUSTER SETTING sql.trace.log_statement_execute = true`}, + } { + t.Run(strings[0], func(t *testing.T) { + params, _ := tests.CreateTestServerParams() + s, db, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + _, err := db.Exec(` + CREATE TABLE t ( + i INT PRIMARY KEY + ); + `) + require.NoError(t, err) + + for _, str := range strings { + _, err = db.Exec(str) + require.NoError(t, err) + } + + t.Run("success", func(t *testing.T) { + txn, err := db.Begin() + const val = 2 + require.NoError(t, err) + { + stmt, err := txn.Prepare(pq.CopyIn("t", "i")) + require.NoError(t, err) + _, err = stmt.Exec(val) + require.NoError(t, err) + require.NoError(t, stmt.Close()) + } + require.NoError(t, txn.Commit()) + + var i int + require.NoError(t, db.QueryRow("SELECT i FROM t").Scan(&i)) + require.Equal(t, val, i) + }) + + t.Run("error in statement", func(t *testing.T) { + txn, err := db.Begin() + require.NoError(t, err) + { + _, err := txn.Prepare(pq.CopyIn("xxx", "yyy")) + require.Error(t, err) + require.ErrorContains(t, err, `relation "xxx" does not exist`) + } + require.NoError(t, txn.Rollback()) + }) + + t.Run("error during copy", func(t *testing.T) { + txn, err := db.Begin() + require.NoError(t, err) + { + stmt, err := txn.Prepare(pq.CopyIn("t", "i")) + require.NoError(t, err) + _, err = stmt.Exec("bob") + require.NoError(t, err) + err = stmt.Close() + require.Error(t, err) + require.ErrorContains(t, err, `could not parse "bob" as type int`) + } + require.NoError(t, txn.Rollback()) + }) + }) + } +} + // TestCopyTransaction verifies that COPY data can be used after it is done // within a transaction. func TestCopyTransaction(t *testing.T) { diff --git a/pkg/sql/event_log.go b/pkg/sql/event_log.go index 0397e39eccad..1a747ef18465 100644 --- a/pkg/sql/event_log.go +++ b/pkg/sql/event_log.go @@ -276,7 +276,9 @@ func logEventInternalForSQLStatements( ) error { // Inject the common fields into the payload provided by the caller. injectCommonFields := func(event logpb.EventPayload) error { - event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime + if txn != nil { + event.CommonDetails().Timestamp = txn.ReadTimestamp().WallTime + } sqlCommon, ok := event.(eventpb.EventWithCommonSQLPayload) if !ok { return errors.AssertionFailedf("unknown event type: %T", event) @@ -494,9 +496,9 @@ func InsertEventRecords( // for tests. // // Otherwise, an asynchronous task is spawned to do the write: -// - if there's at txn, after the txn commit time (i.e. we don't log -// if the txn ends up aborting), using a txn commit trigger. -// - otherwise (no txn), immediately. +// - if there's at txn, after the txn commit time (i.e. we don't log +// if the txn ends up aborting), using a txn commit trigger. +// - otherwise (no txn), immediately. func insertEventRecords( ctx context.Context, execCfg *ExecutorConfig, diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index b6ff8d18f85d..1a4a6fb480e1 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -615,14 +615,14 @@ func (c *conn) serveImpl( // // Args: // ac: An interface used by the authentication process to receive password data -// and to ultimately declare the authentication successful. +// and to ultimately declare the authentication successful. // reserved: Reserved memory. This method takes ownership and guarantees that it -// will be closed when this function returns. +// will be closed when this function returns. // cancelConn: A function to be called when this goroutine exits. Its goal is to -// cancel the connection's context, thus stopping the connection's goroutine. -// The returned channel is also closed before this goroutine dies, but the -// connection's goroutine is not expected to be reading from that channel -// (instead, it's expected to always be monitoring the network connection). +// cancel the connection's context, thus stopping the connection's goroutine. +// The returned channel is also closed before this goroutine dies, but the +// connection's goroutine is not expected to be reading from that channel +// (instead, it's expected to always be monitoring the network connection). func (c *conn) processCommandsAsync( ctx context.Context, authOpt authOptions, @@ -864,6 +864,7 @@ func (c *conn) handleSimpleQuery( ctx, sql.CopyIn{ Conn: c, + ParsedStmt: stmts[i], Stmt: cp, CopyDone: ©Done, TimeReceived: timeReceived,