Skip to content

Commit

Permalink
feat(format): add AdbcStatementExecuteSchema
Browse files Browse the repository at this point in the history
Fixes apache#318.
  • Loading branch information
lidavidm committed May 17, 2023
1 parent d379f84 commit 85e1f51
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ repos:
- "--linelength=90"
- "--verbose=2"
- repo: https://github.com/golangci/golangci-lint
rev: v1.49.0
rev: v1.52.2
hooks:
- id: golangci-lint
entry: bash -c 'cd go/adbc && golangci-lint run --fix --timeout 5m'
Expand Down
21 changes: 21 additions & 0 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,9 @@ struct ADBC_EXPORT AdbcDriver {
/// worrying about multiple definitions of the same symbol.
struct ADBC_EXPORT AdbcDriver110 {
struct AdbcDriver base;

AdbcStatusCode (*StatementExecuteSchema)(struct AdbcStatement*, struct ArrowSchema*,
struct AdbcError*);
};

/// @}
Expand Down Expand Up @@ -1072,6 +1075,24 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected, struct AdbcError* error);

/// \brief Get the schema of the result set of a query without
/// executing it.
///
/// This invalidates any prior result sets.
///
/// \since ADBC API revision 1.1.0
///
/// \param[in] statement The statement to execute.
/// \param[out] out The result schema.
/// \param[out] error An optional location to return an error
/// message if necessary.
///
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not support this.
ADBC_EXPORT
AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error);

/// \brief Turn this statement into a prepared statement to be
/// executed multiple times.
///
Expand Down
18 changes: 17 additions & 1 deletion c/driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ AdbcStatusCode StatementExecutePartitions(struct AdbcStatement* statement,
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode StatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -560,6 +566,16 @@ AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
->base.StatementExecuteQuery(statement, out, rows_affected, error);
}

AdbcStatusCode AdbcStatementExecuteSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
return static_cast<struct AdbcDriver110*>(statement->private_driver)
->StatementExecuteSchema(statement, schema, error);
}

AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -852,7 +868,7 @@ AdbcStatusCode PolyfillDriver100(AdbcDriver* driver, AdbcError* error) {
}

AdbcStatusCode PolyfillDriver110(AdbcDriver110* driver, AdbcError* error) {
// No new functions yet
FILL_DEFAULT(driver, StatementExecuteSchema);
return PolyfillDriver100(&driver->base, error);
}

Expand Down
13 changes: 13 additions & 0 deletions go/adbc/adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,16 @@ type Statement interface {
// an error with a StatusNotImplemented code.
ExecutePartitions(context.Context) (*arrow.Schema, Partitions, int64, error)
}

// Statement110 is an extension interface for methods added to Statement in
// ADBC API revision 1.1.0.
type Statement110 interface {
Statement

// ExecuteSchema returns the schema of the result set of a query without
// executing it.
//
// If the driver does not support this, this will return an error with a
// StatusNotImplemented code.
ExecuteSchema(context.Context) (*arrow.Schema, error)
}
16 changes: 16 additions & 0 deletions go/adbc/driver/flightsql/flightsql_adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,22 @@ func (c *cnxn) executeSubstraitUpdate(ctx context.Context, plan flightsql.Substr
return c.cl.ExecuteSubstraitUpdate(ctx, plan, opts...)
}

func (c *cnxn) getExecuteSchema(ctx context.Context, query string, opts ...grpc.CallOption) (schemaResult *flight.SchemaResult, err error) {
if c.txn != nil {
return c.txn.GetExecuteSchema(ctx, query, opts...)
}

return c.cl.GetExecuteSchema(ctx, query, opts...)
}

func (c *cnxn) getExecuteSubstraitSchema(ctx context.Context, plan flightsql.SubstraitPlan, opts ...grpc.CallOption) (schemaResult *flight.SchemaResult, err error) {
if c.txn != nil {
return c.txn.GetExecuteSubstraitSchema(ctx, plan, opts...)
}

return c.cl.GetExecuteSubstraitSchema(ctx, plan, opts...)
}

func (c *cnxn) prepare(ctx context.Context, query string, opts ...grpc.CallOption) (*flightsql.PreparedStatement, error) {
if c.txn != nil {
return c.txn.Prepare(ctx, query, opts...)
Expand Down
1 change: 1 addition & 0 deletions go/adbc/driver/flightsql/flightsql_adbc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (s *FlightSQLQuirks) DropTable(cnxn adbc.Connection, tblname string) error
func (s *FlightSQLQuirks) Alloc() memory.Allocator { return s.mem }
func (s *FlightSQLQuirks) BindParameter(_ int) string { return "?" }
func (s *FlightSQLQuirks) SupportsConcurrentStatements() bool { return true }
func (s *FlightSQLQuirks) SupportsExecuteSchema() bool { return true }
func (s *FlightSQLQuirks) SupportsPartitionedData() bool { return true }
func (s *FlightSQLQuirks) SupportsTransactions() bool { return true }
func (s *FlightSQLQuirks) SupportsGetParameterSchema() bool { return false }
Expand Down
55 changes: 55 additions & 0 deletions go/adbc/driver/flightsql/flightsql_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,35 @@ func (s *sqlOrSubstrait) executeUpdate(ctx context.Context, cnxn *cnxn, opts ...
}
}

func (s *sqlOrSubstrait) getExecuteSchema(ctx context.Context, alloc memory.Allocator, cnxn *cnxn, opts ...grpc.CallOption) (*arrow.Schema, error) {
var result *flight.SchemaResult
var err error
if s.sqlQuery != "" {
result, err = cnxn.getExecuteSchema(ctx, s.sqlQuery, opts...)
} else if s.substraitPlan != nil {
result, err = cnxn.getExecuteSubstraitSchema(ctx, flightsql.SubstraitPlan{Plan: s.substraitPlan, Version: s.substraitVersion}, opts...)
} else {
return nil, adbc.Error{
Code: adbc.StatusInvalidState,
Msg: "[Flight SQL Statement] cannot call ExecuteSchema without a query or prepared statement",
}
}

if err != nil {
return nil, err
}

schema, err := flight.DeserializeSchema(result.GetSchema(), alloc)
if err != nil {
return nil, adbc.Error{
Code: adbc.StatusInternal,
Msg: "[Flight SQL Statement] server returned invalid schema",
}
}

return schema, nil
}

func (s *sqlOrSubstrait) prepare(ctx context.Context, cnxn *cnxn, opts ...grpc.CallOption) (*flightsql.PreparedStatement, error) {
if s.sqlQuery != "" {
return cnxn.prepare(ctx, s.sqlQuery, opts...)
Expand Down Expand Up @@ -247,6 +276,32 @@ func (s *statement) ExecuteQuery(ctx context.Context) (rdr array.RecordReader, n
return
}

// ExecuteSchema returns the schema of the result set of a query without
// executing it.
//
// If the driver does not support this, this will return an error with a
// StatusNotImplemented code.
func (s *statement) ExecuteSchema(ctx context.Context) (schema *arrow.Schema, err error) {
ctx = metadata.NewOutgoingContext(ctx, s.hdrs)

if s.prepared != nil {
schema = s.prepared.DatasetSchema()
if schema == nil {
err = adbc.Error{
Msg: "[Flight SQL Statement] server did not provide schema for prepared statement",
Code: adbc.StatusNotImplemented}
}
} else {
schema, err = s.query.getExecuteSchema(ctx, s.alloc, s.cnxn, s.timeouts)
}

if err != nil {
err = adbcFromFlightStatus(err)
}

return
}

// ExecuteUpdate executes a statement that does not generate a result
// set. It returns the number of rows affected if known, otherwise -1.
func (s *statement) ExecuteUpdate(ctx context.Context) (n int64, err error) {
Expand Down
46 changes: 46 additions & 0 deletions go/adbc/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type DriverQuirks interface {
BindParameter(index int) string
// Whether two statements can be used at the same time on a single connection
SupportsConcurrentStatements() bool
// Whether retrieving the schema of a query is supported
SupportsExecuteSchema() bool
// Whether AdbcStatementExecutePartitions should work
SupportsPartitionedData() bool
// Whether transactions are supported (Commit/Rollback on connection)
Expand Down Expand Up @@ -407,6 +409,28 @@ func (s *StatementTests) TestNewStatement() {
s.Equal(adbc.StatusInvalidState, adbcError.Code)
}

func (s *StatementTests) TestSQLExecuteSchema() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()

query := "SELECT 1"
s.NoError(stmt.SetSqlQuery(query))

sc, err := stmt.(adbc.Statement110).ExecuteSchema(s.ctx)
if !s.Quirks.SupportsExecuteSchema() {
var adbcError adbc.Error
s.ErrorAs(err, &adbcError)
s.Equal(adbc.StatusNotImplemented, adbcError.Code)
return
}
// TODO: check for NotImplemented here, too, since even if the
// driver supports it the server may not
s.NoError(err)

s.Len(sc.Fields(), 1)
}

func (s *StatementTests) TestSqlPartitionedInts() {
stmt, err := s.Cnxn.NewStatement()
s.Require().NoError(err)
Expand Down Expand Up @@ -460,6 +484,28 @@ func (s *StatementTests) TestSqlPartitionedInts() {
s.False(rdr.Next())
}

func (s *StatementTests) TestSQLPrepareExecuteSchema() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()

query := "SELECT 1"
s.NoError(stmt.SetSqlQuery(query))
s.NoError(stmt.Prepare(s.ctx))

// TODO: move new validation tests into new file?
sc, err := stmt.(adbc.Statement110).ExecuteSchema(s.ctx)
if !s.Quirks.SupportsExecuteSchema() {
var adbcError adbc.Error
s.ErrorAs(err, &adbcError)
s.Equal(adbc.StatusNotImplemented, adbcError.Code)
return
}
s.NoError(err)

s.Len(sc.Fields(), 1)
}

func (s *StatementTests) TestSQLPrepareGetParameterSchema() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ default void bind(VectorSchemaRoot root) throws AdbcException {
*/
UpdateResult executeUpdate() throws AdbcException;

/**
* Get the schema of the result set of a query without executing it.
*
* @since ADBC API revision 1.1.0
* @throws AdbcException with status {@link AdbcStatusCode#NOT_IMPLEMENTED} if the driver does not
* support this.
*/
default Schema executeSchema() throws AdbcException {
throw AdbcException.notImplemented("Statement does not support executeSchema");
}

/**
* Execute a result set-generating query and get a list of partitions of the result set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public static void beforeAll() {
@Override
@Disabled("Requires spec clarification")
public void prepareQueryWithParameters() {}

@Override
@Disabled("Not supported by the SQLite test server")
public void executeSchema() {}

@Override
@Disabled("Not supported by the SQLite test server")
public void executeSchemaPrepared() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ public QueryResult executeQuery() throws AdbcException {
new FlightInfoReader(allocator, client, clientCache, info.getEndpoints()));
}

@Override
public Schema executeSchema() throws AdbcException {
if (bulkOperation != null) {
throw AdbcException.invalidState("[Flight SQL] Must executeUpdate() for bulk ingestion");
} else if (sqlQuery == null) {
throw AdbcException.invalidState("[Flight SQL] Must setSqlQuery() before execute");
}
return execute(
FlightSqlClient.PreparedStatement::getResultSetSchema,
(client) -> client.getExecuteSchema(sqlQuery).getSchema());
}

@Override
public UpdateResult executeUpdate() throws AdbcException {
if (bulkOperation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.LongStream;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcParameterBinder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
Expand Down Expand Up @@ -199,6 +203,37 @@ private void invalidatePriorQuery() throws AdbcException {
}
}

@Override
public Schema executeSchema() throws AdbcException {
if (bulkOperation != null) {
throw AdbcException.invalidState("[JDBC] Ingestion operations have no schema");
} else if (sqlQuery == null) {
throw AdbcException.invalidState("[JDBC] Must setSqlQuery() first");
}

try (final PreparedStatement preparedStatement =
connection.prepareStatement(
sqlQuery, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY)) {
final ResultSetMetaData rsmd = preparedStatement.getMetaData();
final JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder()
.setAllocator(allocator)
.setCalendar(JdbcToArrowUtils.getUtcCalendar())
.build();
try {
return JdbcToArrowUtils.jdbcToArrowSchema(rsmd, config);
} catch (SQLException e) {
throw JdbcDriverUtil.fromSqlException("Failed to convert JDBC schema to Arrow schema:", e);
}
} catch (SQLFeatureNotSupportedException e) {
throw AdbcException.notImplemented(
"[JDBC] Driver does not support getting a result set schema")
.withCause(e);
} catch (SQLException e) {
throw JdbcDriverUtil.fromSqlException("Failed to prepare statement:", e);
}
}

@Override
public UpdateResult executeUpdate() throws AdbcException {
if (bulkOperation != null) {
Expand Down
Loading

0 comments on commit 85e1f51

Please sign in to comment.