Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: INSERT VALUES fail when SR schema has a non-default name #8984

Merged

Conversation

spena
Copy link
Member

@spena spena commented Apr 6, 2022

Description

Fixes #8952

This PR allows INSERT VALUES statements to insert data on a stream that was created with a SR schema that uses a non-default name.

[Issue]
When ksql infers a stream schema from SR, it ignores the SR schema name and uses a default name provided by the Connect format, such as ConnectDefault1 for Protobuf or KsqlDataSourceSchema for Avro. However, when inserting values to the stream, ksql expects the SR schema to use the default names mentioned. If other names are used, then the INSERT fails.

# protobuf schema
syntax = "proto2";
package io.confluent.examples.proto;

message product {
  optional uint32 PRODUCT_ID = 1;
  optional string PRODUCT_NAME = 2;
  optional double PRODUCT_PRICE = 3;
}

ksql> create stream products with (kafka_topic='products', value_format='protobuf');

 Message        
----------------
 Stream created 
----------------

ksql> insert into products(product_id, product_name, product_price) values(5, 'p5', 5.99);
Failed to insert values into 'PRODUCTS'. Could not serialize value: [ 5 | 'p5' | 5.99 ]. Error serializing message to topic: products. Failed to serialize Protobuf data from topic products
 :

The reason of the failure is that ksql and Connect API look up the schema (with the default name) in SR for compatibility checks before serializing the value.

[fix]
The fix starts in the InsertValuesExecutor. When a INSERT VALUES statement is executed, it fetches the schema from SR to extract the full schema name, and then uses the format property fullSchemaName to serialize the value to insert. Connect later checks with SR if the schema use for serialization exists in SR and if they are compatible.

Testing done

Unit tests and manual tests

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena requested a review from a team as a code owner April 6, 2022 03:22
@CLAassistant
Copy link

CLAassistant commented Apr 6, 2022

CLA assistant check
All committers have signed the CLA.

@spena spena force-pushed the fix_insert_fails_with_sr_nondefault_name branch 2 times, most recently from fe1ba90 to f66934a Compare April 6, 2022 13:44
Copy link
Member

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @spena ! I suggest to have others taking a look as well. Maybe @vcrfxia or @agavra . I couldn't think of other approaches to resolve this, maybe others can chime in if this is the best solution

// Then:
verify(keySerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is io.confluent.ksql.avro_schemas.KsqlDataSourceSchema the default name? If so, can you change it to something which is not default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


verify(valueSerdeFactory).create(
FormatInfo.of(FormatFactory.AVRO.name(), ImmutableMap.of(
AvroProperties.FULL_SCHEMA_NAME,"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -65,14 +79,14 @@ public String name() {
}
}

private static String avroCompatibleFieldName(final Field field) {
private static String compatibleFieldName(final Field field) {
// Currently the only incompatible field names expected are fully qualified
// column identifiers. Once quoted identifier support is introduced we will
// need to implement something more generic here.
return field.name().replace(".", "_");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this replacement is specific for avro format? If protobuf supports ".", we don't need to replace? In general, I'm not sure how many of the conversions are also needed for protobuf and concerned if we convert too much and break existing protobuf.

Also for the MAP_KEY_NAME and and MAP_VALUE_NAME, not sure if protobuf needs them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also for Protobuf. See the bug #9005

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there docs somewhere for schema name requirements for the different SR formats? I have the same question as @lihaosky above -- it looks like this method replaces periods with underscores and also contains logic for ensuring that maps don't reuse the same schema name. Is the map logic necessary for protobuf as well?

Also, are there any compatibility concerns with this replacement logic? In other words, after the changes in this PR, ksql will start to use a different schema for INSERT VALUES statements for streams of protobuf format. Will this new schema generate a new schema ID when registered to SR? Is that a concern?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are docs for:
Avro https://avro.apache.org/docs/current/spec.html#names

Names

Record, enums and fixed are named types. Each has a fullname that is composed of two parts; a name and a namespace. Equality of names is defined on the fullname.

The name portion of a fullname, record field names, and enum symbols must:

    start with [A-Za-z_]
    subsequently contain only [A-Za-z0-9_]

Letters and digits  
  letter = "A" … "Z" | "a" … "z"
  decimalDigit = "0" … "9"

Identifiers
  ident = letter { letter | decimalDigit | "_" }
  fieldName = ident

It shouldn't be a problem. There's still a bug, though, for Protobuf before and after this PR where CREATE STREAM does not accept . column names (#9005). I am unsure if we should allow that yet, but this PR does not fix it.

For the other logic, such as Maps. This is the part I agree I don't know if we need it. I will figure out if I can get rid of it for Protobuf.

Comment on lines 42 to 43
this.protoCompatibleSchema = SchemaFullNameAppender.appendSchemaFullName(
schema, schemaFullName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we only care about the schema name here, not sure if we need to call appendSchemaFullName which builds other changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The appendSchemaFullName just adds the new name to the passed schema, right? It has other changes to replace . for _, which Avro and Protobuf don't support. I just found the bug in Protobuf #9005

Without this full name added to the schema, the INSERT fails too.

return null;
}

return replaceSchema(ksqlSchema, protoCompatibleRow);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were for avro schema.

Responsible for converting the KSQL schema to a version ready for connect to convert to an
avro schema.
This includes ensuring field names are valid Avro field names and that nested types do not
have name clashes.

Do we need this for protobuf? I'm concerned we are changing too much which is not necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing the schema to add the full schema name is the main thing needed to make the INSERT to work. The Connect serializer checks the row schema and try to match it with SR. If it has a different name, then it fails.

I debugged this when the toConnectRow is called, though. I haven't found a test that calls the toKsqlRow to verify it. Any ideas?

@spena spena force-pushed the fix_insert_fails_with_sr_nondefault_name branch from 3445582 to 50b15d2 Compare April 8, 2022 20:35
Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @spena -- some questions inline!

@@ -356,14 +367,23 @@ private static void ensureKeySchemasMatch(
return;
}

final SchemaRegistryClient schemaRegistryClient = serviceContext.getSchemaRegistryClient();

final FormatInfo formatInfo = addSerializerMissingFormatFields(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the current behavior for INSERT VALUES on Avro and Protobuf streams, prior to the changes in this PR? Do the INSERT statements not work at all, or is it only when the schema in SR has a non-default schema name?

Also, what's the benefit of fetching the schema name from the latest schema and injecting that into ksql's schema, versus simply using the schema in SR to serialize the record directly (i.e., don't use ksql's schema at all)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR, insets only worked with SR schema using a default name. If a non-default name was used, then INSERT fail.

The problem is that when calling the Connect API to serialize the row, the API looks at the row schema and checks with SR that the schema exists. If the row schema has a different name than SR, then the API fails. If the row schema does not have a name, then the API uses a default one. The fix is to add the full name to the schema before calling the API.

I couldn't make the API to use the SR schema to serialize the row. The API validates the SR schema matches the row KSQL intents to serialize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, you're saying that the Connect APIs validate that the schema object we pass matches what's in SR and fails if it doesn't. If that's the case, then we have a problem beyond just figuring out the base schema name, right? The logic in AvroSchemas right now has logic for creating additional schema names (for maps and structs) in a predetermined way from the base schema name provided. If the user has a schema that doesn't fit this pattern, then even if we fetch the base schema name from SR, we won't have a fully matching schema afterwards and the insertion will still fail.

I think the only foolproof way to get around this is to fetch the entire schema from SR and use that for serialization (possibly converting the fetched schema into a Connect schema first, if that's required by the Connect APIs). If we don't want to fetch on every insertion (as @lihaosky pointed out in another comment), then we'll have to store the schema somehow. Either way, this seems like it might be a larger conversation with product implications -- we could take the approach in this PR and solve the issue for (hopefully) most use cases, but we won't have a complete solution unless we do something more drastic.

I wonder why the Connect APIs we use are so rigid. Does it make sense for us to consider relaxing requirements from their API (with a contribution ourselves) as another option around these challenges?

@@ -65,14 +79,14 @@ public String name() {
}
}

private static String avroCompatibleFieldName(final Field field) {
private static String compatibleFieldName(final Field field) {
// Currently the only incompatible field names expected are fully qualified
// column identifiers. Once quoted identifier support is introduced we will
// need to implement something more generic here.
return field.name().replace(".", "_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there docs somewhere for schema name requirements for the different SR formats? I have the same question as @lihaosky above -- it looks like this method replaces periods with underscores and also contains logic for ensuring that maps don't reuse the same schema name. Is the map logic necessary for protobuf as well?

Also, are there any compatibility concerns with this replacement logic? In other words, after the changes in this PR, ksql will start to use a different schema for INSERT VALUES statements for streams of protobuf format. Will this new schema generate a new schema ID when registered to SR? Is that a concern?

Comment on lines 83 to 85
// Currently the only incompatible field names expected are fully qualified
// column identifiers. Once quoted identifier support is introduced we will
// need to implement something more generic here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is concerning (separate from the changes in this PR) -- quoted identifiers have been implemented for quite some time now. The implication of this comment is that field names may now contain other unsafe characters besides periods, and that those need to be replaced as well, otherwise the logic in this method will not produce sanitized schema names in all cases.

@@ -57,8 +58,8 @@ public Schema toConnectSchema(final ParsedSchema schema) {

@Override
public ParsedSchema fromConnectSchema(final Schema schema) {
final Schema avroCompatibleSchema = AvroSchemas
.getAvroCompatibleConnectSchema(schema, formatProps.getFullSchemaName());
final Schema avroCompatibleSchema = SchemaFullNameAppender
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need this change for JSON_SR as well? Does JSON_SR not have the same schema name requirements?

}

@Test
public void shouldThrowWhenNotAuthorizedToReadKeySchemaFromSR() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this test different from shouldThrowWhenNotAuthorizedToReadKeySchemaToSR() above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the same. I'll remove it. I might got confused at some tests.

@spena spena force-pushed the fix_insert_fails_with_sr_nondefault_name branch from 50b15d2 to ab8636c Compare April 11, 2022 19:55
@spena
Copy link
Member Author

spena commented Apr 11, 2022

@lihaosky @vcrfxia I decided to create a ProtobufSchemas class instead. I removed the old SchemaFullNameAppender and return the old AvroSchemas back. The new ProtobufSchemas just adds the schema name to the parent protobuf schema. It does not have all the logic that AvroSchemas has. I didn't find an incompatible use case, but just in case, I will move slowly with these Proto changes.

Copy link
Member

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level question is that can we set VALUE_SCHEMA_FULL_NAME or KEY_SCHEMA_FULL_NAME during schema inference time so that we don't need to check SR and set it.

Comment on lines 126 to 130
if (isAuthErrorCode(e)) {
final AclOperation deniedOperation = SchemaRegistryUtil.getDeniedOperation(e.getMessage());

if (deniedOperation != AclOperation.UNKNOWN) {
throw new KsqlSchemaAuthorizationException(
deniedOperation,
subject
);
}
}

throw new KsqlException(
"Could not get schema for subject " + subject + " and id " + schemaId, e);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: refactor the block into a function since same code is used below in getLatestSchema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// The FULL_SCHEMA_NAME allows the serializer to choose the schema definition
if (supportedProperties.contains(ConnectProperties.FULL_SCHEMA_NAME)) {
if (!formatInfo.getProperties().containsKey(ConnectProperties.FULL_SCHEMA_NAME)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should construct ConnectProperties instances and call getFullSchemaName. For example, for Avro, it always has a default full name: https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroProperties.java#L44-L47

This can save us from calling schema registry for avro format.

Comment on lines +979 to +980
when(srClient.getSchemaById(1))
.thenReturn(new AvroSchema(RAW_SCHEMA));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you call ConnectProperties.getFullSchemaName() in InsertValuesExecutor.java, you don't need to mock this

Comment on lines +1014 to +1015
when(srClient.getSchemaById(1))
.thenReturn(new AvroSchema(RAW_SCHEMA));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +498 to +501
SchemaRegistryUtil.getParsedSchema(srClient, topicName, isKey).map(ParsedSchema::name)
.ifPresent(schemaName ->
propertiesBuilder.put(ConnectProperties.FULL_SCHEMA_NAME, schemaName));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might forgot the reason, why can't we ask the user to recreate their datasource specifying the matching schema name? Then it will be available in ConnectProperties and we don't need to fetch from schema registry every time? Or we should honor the schema name when doing schema inference (Look at the schema full after get the schema from SR and set VALUE_SCHEMA_FULL_NAME or KEY_SCHEMA_FULL_NAME)

I feel it's not good to fetch schema from Schema registry every time we do insertion because:

  1. It hurts performance.
  2. The schema name should be consistent in ksqlDB and schema registry. We should fix this problem instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made an improvement on the latest commit. I obtained the key/value schema full name during creation and keep it in the metastore to avoid hurting the performance. This woks only for new streams and tables, though. When users upgrade their ksql, the older streams will check for SR to get the key/value full name. It's good user experience to not make users re-create their streams. Btw, ksql uses a cached SR client, so consecutive INSERT calls won't get bad performance.

@colinhicks any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine for pre-existing objects to have reduced insert performance. We don't promote INSERT ... VALUES as an optimized write path.

Comment on lines 36 to 46
case ARRAY:
builder = SchemaBuilder.array(schema.valueSchema());
break;
case MAP:
builder = SchemaBuilder.map(
schema.keySchema(),
schema.valueSchema()
);
break;
default:
builder = new SchemaBuilder(schema.type());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious in what situation the root schema is not a struct? Should we just deal with struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return ksqlMap;

case STRUCT:
return convertStruct((Struct) object, schema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the purpose for now is just setting the struct name? Can we just do that? I think for avro, it's doing more stuff because the compatibleSchema is more complex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I modified to only add the schema to the parent struct.

* <p>This includes ensuring field names are valid Avro field names and that nested types do not
* have name clashes.
*/
public class AvroSRSchemaDataTranslator extends ConnectSRSchemaDataTranslator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I remember the reason I added just this for Avro might be protobuf doesn't support optional fields and json_sr's field is always optional. So it's different I think. Can we avoid this refactor and just fix the schema name problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I brought back this class and others.

@spena spena force-pushed the fix_insert_fails_with_sr_nondefault_name branch from 2eedfa8 to 1c3c8b2 Compare April 12, 2022 03:02
Copy link
Member

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @spena ! We can revert the changes in DefaultSchemaInjector and create an issue to track it.

@@ -24,12 +24,12 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

final class AvroSchemas {
public final class AvroSchemas {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@spena spena force-pushed the fix_insert_fails_with_sr_nondefault_name branch from d24935d to 2e0cb8f Compare April 12, 2022 21:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Need better error messaging for "Failed to serialize Avro data from topic ..."
6 participants