Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: inject schema using schema id for cas #8441

Merged
merged 7 commits into from
Dec 8, 2021

Conversation

lihaosky
Copy link
Member

@lihaosky lihaosky commented Dec 2, 2021

Description

  • Inject schema for cas statement if schema id is provided.
  • Update QTT test to use SchemaRegisterInjector as well.
  • Disabled a few failing QTT tests as it also fails in current master branch (Decimal not working with schema registry #8465)

Testing done

  • Unit test
  • QTT test

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined any key columns.</li>
* <li>The statement does not defined any key columns or has KEY_SCHEMA_ID property set</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If KEY_SCHEMA_ID is set, then no key columns must be defined either, right? Could you update the line?

* <ul>
* <li>The statement is a CT/CS.</li>
* <li>The statement does not defined any key columns.</li>
* <li>The statement does not defined any key columns or has KEY_SCHEMA_ID property set</li>
* <li>The key format of the statement supports schema inference.</li>
* </ul>
* And similarly for value columns.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VALUE_SCHEMA_ID is the property in the case of value columns, right? Can you update the doc?

id = srClient.getLatestSchemaMetadata(subject).getId();
}

final ParsedSchema schema = srClient.getSchemaBySubjectAndId(subject, id);
return fromParsedSchema(topicName, id, schema, expectedFormat, serdeFeatures, isKey,
schemaId.isPresent());
return fromParsedSchema(topicName, id, schemaId, schema, expectedFormat, serdeFeatures,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id and schemaId are the same. Can this become into one parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take a look how to refactor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually schemaId is optional and is used to indicate whether schemaId is provided by user to fetched from latest schema. Maybe I can refactor to call it isIdProvided

Comment on lines 239 to 240
containsString("Error serializing message to topic: bob. "
+ "Failed to serialize Protobuf data from topic bob"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to know what's the real error? Is this a partial error btw?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually dependent PR's code and removed. Let me rebase this PR...

@lihaosky lihaosky force-pushed the schema-id-cas branch 4 times, most recently from 5412f7e to 14c925e Compare December 7, 2021 20:20
PersistenceSchema.from(schema, serdeFeatures)
);
int id = SchemaRegistryUtil.registerSchema(srClient, parsedSchema, topic, subject, isKey);
System.out.print("h");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this print an accident?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove

final ParsedSchema parsedSchema = translator.toParsedSchema(
PersistenceSchema.from(schema, serdeFeatures)
);
int id = SchemaRegistryUtil.registerSchema(srClient, parsedSchema, topic, subject, isKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you do with the id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove

Comment on lines 289 to 290
int id = srClient.register(KsqlConstants.getSRSubject(topic.getName(), true), schema);
System.out.print("h");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the id use? What about the print call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove

this.testCase = requireNonNull(testCase, "testCase");
}

@Test
public void shouldBuildAndExecuteQueries() {
if (!testCase.getName().contains("DECIMAL - key - inference - default precision")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this usd for debugging?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Will remove

@@ -7,7 +7,7 @@
"CREATE STREAM INPUT (K STRING KEY, foo INT) WITH (WRAP_SINGLE_VALUE=true, kafka_topic='input', value_format='KAFKA');"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"type": "io.confluent.ksql.util.KsqlException",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the KsqlStatementException stay? There is a check at the end of the KsqlResource.handleKsqlStatements() method that returns a different value depending on the exception.

This question applies for the rest of the tests that changes to this new exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will catch KsqlException and rethrow

Copy link
Member

@spena spena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@lihaosky lihaosky merged commit a60bb21 into confluentinc:master Dec 8, 2021
@lihaosky lihaosky deleted the schema-id-cas branch December 8, 2021 16:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants