Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle QualifiedNameReference when selecting stream/table keys #1697

Merged
merged 6 commits into from
Aug 10, 2018

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Aug 4, 2018

Description

SchemaKStream.select may get a QualifiedNameReference as the expression
in a select call. The logic that looks for and preserves the key field in
the resulting SchemaKStream needs to handle expressions of this type. It
also needs to handle cases where the key field name is fully qualified, as
happens with joins.

Fixes #1686. Fixes #1550 .

Testing done

Describe the testing stategy. Unit and integration tests are expected for any behavior changes.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@rodesai rodesai requested a review from a team August 4, 2018 00:41
@apurvam
Copy link
Contributor

apurvam commented Aug 8, 2018

Is this a fix for #1686 , or just a step toward a fix?

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the patch @rodesai . I left a couple of comments.

SchemaKStream projectedSchemaKStream = initialSchemaKStream.select(projectNameExpressionPairList);
assertThat(
projectedSchemaKStream.getKeyField(),
equalTo(new Field("COL0", 2, Schema.OPTIONAL_INT64_SCHEMA)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just ksqlStream.getKeyField()? That should be the key of the derived stream, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in this case that's true. I'll change it.

"CREATE TABLE TEST_TABLE (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE TEST_TABLE_2 (ID bigint, F3 varchar) WITH (kafka_topic='right_topic_2', value_format='{FORMAT}', key='ID');",
"CREATE TABLE INNER_JOIN WITH (PARTITIONS=1) as SELECT t.id, name, value, f1, f2 FROM test t join TEST_TABLE tt on t.id = tt.id;",
"CREATE TABLE INNER_JOIN_2 AS SELECT t_id, name, f1, f3 FROM inner_join tt join TEST_TABLE_2 t ON t.id = tt.t_id;"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically the key field for the join stream is the of the form leftTable_leftKeyField? Is this documented anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the format for columns selected using the '.' operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.. is this documented? Because it is important to do joins downstream, isn't it? If it isn't, we should file an issue to update the documentation.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, once the comments are addressed.

@rodesai
Copy link
Contributor Author

rodesai commented Aug 8, 2018

@apurvam this is a fix for #1686

Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -312,6 +312,27 @@
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "zero", "F2": 0}, "timestamp": 10000},
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000}
]
},
{
"name": "multi-way join",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this since multi-way join would resemble multiple joins in one query, like 3-way join. How about join-pipeline?

SchemaKStream.select may get a QualifiedNameReference as the expression
in a select call. The logic that looks for and preserves the key field in
the resulting SchemaKStream needs to handle expressions of this type. It
also needs to handle cases where the key field name is fully qualified, as
happens with joins.
@rodesai rodesai merged commit eaa723f into confluentinc:5.0.x Aug 10, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants