Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master #339

Merged
merged 27 commits into from
Oct 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5f225a9
Merge remote-tracking branch 'origin/0.1.x'
Aug 22, 2017
2b14467
Yes, this is intentional. Cleaning up after someone merged wrong dire…
Aug 22, 2017
d4e08a3
Merge remote-tracking branch 'origin/0.1.x'
Aug 22, 2017
7c43fa4
Merge remote-tracking branch 'origin/0.1.x'
Aug 22, 2017
141aca7
Merge remote-tracking branch 'origin/0.1.x'
Aug 22, 2017
919130d
bump build-tools to 4.0.0-SNAPSHOT
Aug 22, 2017
4caf925
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
546500b
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
509d94d
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
bf5a549
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
4550b1e
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
452df42
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
989e6d3
Merge remote-tracking branch 'origin/0.1.x'
Aug 24, 2017
b16b6c5
Merge remote-tracking branch 'origin/0.1.x'
Aug 25, 2017
c3184d3
Merge remote-tracking branch 'origin/0.1.x'
Aug 25, 2017
0988e8d
Merge remote-tracking branch 'origin/0.1.x'
Aug 25, 2017
dd0b8dc
Merge remote-tracking branch 'origin/0.1.x'
Aug 26, 2017
9c52487
Merge remote-tracking branch 'origin/0.1.x'
Aug 29, 2017
6b896d7
Fixed misspelled words
Sep 5, 2017
1e5496b
Merge pull request #280 from juneng603/feature-fix-misspelled-words
hjafarpour Sep 5, 2017
34c5f9f
Merge remote-tracking branch 'origin/0.1.x'
ewencp Sep 15, 2017
c6d7773
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
22b898e
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
bf90854
Merge pull request #313 from TheRealHaui/small-code-quality-improvements
logscape Sep 27, 2017
117d760
Merge branch '0.1.x'
hjafarpour Oct 5, 2017
46f349e
Merge branch 'master' of https://github.com/confluentinc/ksql
hjafarpour Oct 5, 2017
243c00f
Merge remote-tracking branch 'origin/0.1.x'
Oct 5, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions build-tools/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~
~ Copyright 2017 Confluent Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.confluent</groupId>
<artifactId>build-tools</artifactId>
<version>4.0.0-SNAPSHOT</version>
<name>Build Tools</name>
</project>
25 changes: 11 additions & 14 deletions docs/quickstart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,30 @@ Because KSQL queries data in a Kafka cluster, you will need to bring up a Kafka
Copyright 2017 Confluent Inc.

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>
```

3. KSQL provides a structured query language to query Kafka data, so you need some data to query. For this quick start, you will produce mock streams to the Kafka cluster.

* If you are using our Docker Compose files, a Docker container is already running with a data generator that is continuously producing Kafka messages to the Kafka cluster. No further action is required.
* If you are not using our Docker environment, then follow these [instructions](quickstart-non-docker.md#produce-topic-data) to generate data to the Kafka cluster.
* If you are using our Docker Compose files, a Docker container is already running with a data generator that is continuously producing Kafka messages to the Kafka cluster. No further action is required
* If you are not using our Docker environment, then follow these [instructions](quickstart-non-docker.md#produce-topic-data) to generate data to the Kafka cluster

## Create a Stream and Table

This KSQL quick start shows examples querying data from Kafka topics called `pageviews` and `users` using the following schemas:

![image](/docs/quickstart/ksql-quickstart-schemas.jpg)

Before proceeding, please check:

> **Before proceeding, please confirm:**
>
> * In the terminal window where you started KSQL, you see the `ksql>` prompt.
> * If you *are not* using Docker, you must have manually run the data generator to produce topics called `pageviews`
> and `users`. If you haven't done this, please follow these
> [instructions to manually generate data](/docs/quickstart/quickstart-non-docker.md#produce-topic-data).
> (If you *are* using Docker this is done automatically for you.)
* In the terminal window where you started KSQL, you see the `ksql>` prompt
* If you are not using Docker, you must manually have run the data generator to produce topics called `pageviews` and `users`. If you haven't done this, please follow these [instructions](/docs/quickstart/quickstart-non-docker.md#produce-topic-data) to generate data. (Docker compose file automatically runs the data generator)


1. Create a STREAM `pageviews_original` from the Kafka topic `pageviews`, specifying the `value_format` of `DELIMITED`. Then `DESCRIBE` the new STREAM. Notice that KSQL created additional columns called `ROWTIME`, which corresponds to the Kafka message timestamp, and `ROWKEY`, which corresponds to the Kafka message key.
1. Create a STREAM `pageviews_original` from the Kafka topic `pageviews`, specifying the `value_format` of `DELIMITED`. Describe the new STREAM. Notice that KSQL created additional columns called `ROWTIME`, which corresponds to the Kafka message timestamp, and `ROWKEY`, which corresponds to the Kafka message key.

```bash
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
Expand All @@ -80,7 +77,7 @@ This KSQL quick start shows examples querying data from Kafka topics called `pag
PAGEID | VARCHAR(STRING)
```

2. Create a TABLE `users_original` from the Kafka topic `users`, specifying the `value_format` of `JSON`. Then `DESCRIBE` the new TABLE.
2. Create a TABLE `users_original` from the Kafka topic `users`, specifying the `value_format` of `JSON`. Describe the new TABLE.

```bash
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
Expand Down Expand Up @@ -146,7 +143,7 @@ This KSQL quick start shows examples querying data from Kafka topics called `pag
GENDER | VARCHAR(STRING)
```

3. Use `SELECT` to view query results as they come in. To stop viewing the query results, press `<ctrl-c>`. This stops printing to the console but it does not terminate the actual query. The query continues to run in the underyling KSQL application.
3. Use `SELECT` to view query results as they come in. To stop viewing the query results, press `<ctrl-c>`. This stops printing to the console but it does not terminate the actual query. The query continues to run in the underlying KSQL application.

```bash
ksql> SELECT * FROM pageviews_female;
Expand Down
8 changes: 4 additions & 4 deletions docs/quickstart/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ services:
cub sr-ready schema-registry 8081 20 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 2 && \
java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
java -jar /usr/share/java/ksql-examples/ksql-examples-4.0.0-SNAPSHOT-standalone.jar
quickstart=pageviews format=delimited topic=pageviews bootstrap-server=kafka:29092 maxInterval=100 iterations=1000 && \
java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
java -jar /usr/share/java/ksql-examples/ksql-examples-4.0.0-SNAPSHOT-standalone.jar
quickstart=pageviews format=delimited topic=pageviews bootstrap-server=kafka:29092 maxInterval=1000'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
Expand Down Expand Up @@ -98,9 +98,9 @@ services:
cub sr-ready schema-registry 8081 20 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 2 && \
java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
java -jar /usr/share/java/ksql-examples/ksql-examples-4.0.0-SNAPSHOT-standalone.jar
quickstart=users format=json topic=users bootstrap-server=kafka:29092 maxInterval=100 iterations=1000 && \
java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
java -jar /usr/share/java/ksql-examples/ksql-examples-4.0.0-SNAPSHOT-standalone.jar
quickstart=users format=json topic=users bootstrap-server=kafka:29092 maxInterval=1000'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
Expand Down
4 changes: 2 additions & 2 deletions ksql-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -205,4 +205,4 @@
</build>
</profile>
</profiles>
</project>
</project>
2 changes: 1 addition & 1 deletion ksql-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,4 @@
</resource>
</resources>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean equals(Object o) {

if (columns.size() != that.columns.size()) return false;

// For now string matching is used to compare the rows as double comparision will cause issues
// For now string matching is used to compare the rows as double comparison will cause issues
return this.toString().equals(that.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static int getFieldIndexByName(final Schema schema, final String fieldNam
}
for (int i = 0; i < schema.fields().size(); i++) {
Field field = schema.fields().get(i);
int dotIndex = field.name().indexOf(".");
int dotIndex = field.name().indexOf('.');
if (dotIndex == -1) {
if (field.name().equals(fieldName)) {
return i;
Expand Down Expand Up @@ -189,7 +189,7 @@ public static synchronized Schema removeImplicitRowTimeRowKeyFromSchema(Schema s
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
for (Field field: schema.fields()) {
String fieldName = field.name();
fieldName = fieldName.substring(fieldName.indexOf(".") + 1);
fieldName = fieldName.substring(fieldName.indexOf('.') + 1);
if (!fieldName.equalsIgnoreCase(SchemaUtil.ROWTIME_NAME)
&& !fieldName.equalsIgnoreCase(SchemaUtil.ROWKEY_NAME)) {
schemaBuilder.field(fieldName, field.schema());
Expand Down
2 changes: 1 addition & 1 deletion ksql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,4 @@
</resource>
</resources>
</build>
</project>
</project>
58 changes: 29 additions & 29 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
if (isBareQuery) {

physicalPlans.add(buildPlanForBareQuery(addUniqueTimeSuffix, statementPlanPair, overriddenStreamsProperties,
builder, ksqlConfigClone, (QueuedSchemaKStream) schemaKStream, (KsqlBareOutputNode) outputNode,
serviceId, transientQueryPrefix));
builder, ksqlConfigClone, (QueuedSchemaKStream) schemaKStream, (KsqlBareOutputNode) outputNode,
serviceId, transientQueryPrefix));

} else if (outputNode instanceof KsqlStructuredDataOutputNode) {

physicalPlans.add(buildPlanForStructuredOutputNode(addUniqueTimeSuffix, statementPlanPair,
overriddenStreamsProperties, updateMetastore, builder, ksqlConfigClone, schemaKStream,
(KsqlStructuredDataOutputNode) outputNode, serviceId, persistanceQueryPrefix));
overriddenStreamsProperties, updateMetastore, builder, ksqlConfigClone, schemaKStream,
(KsqlStructuredDataOutputNode) outputNode, serviceId, persistanceQueryPrefix));

} else {
throw new KsqlException("Sink data source is not correct.");
Expand Down Expand Up @@ -284,13 +284,13 @@ private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
statementPlanPair.getLeft(),
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable) ?
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM
statementPlanPair.getLeft(),
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable) ?
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM
);
}

Expand Down Expand Up @@ -326,21 +326,21 @@ private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuff
if (schemaKStream instanceof SchemaKTable) {
SchemaKTable schemaKTable = (SchemaKTable) schemaKStream;
sinkDataSource =
new KsqlTable(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic(),
outputNode.getId().toString() +
ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG),
schemaKTable.isWindowed());
new KsqlTable(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic(),
outputNode.getId().toString() +
ksqlEngine.getKsqlConfig().get(KsqlConfig.KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG),
schemaKTable.isWindowed());
} else {
sinkDataSource =
new KsqlStream(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic());
new KsqlStream(outputNode.getId().toString(),
outputNode.getSchema(),
schemaKStream.getKeyField(),
outputNode.getTimestampField(),
outputNode.getKsqlTopic());
}

if (updateMetastore) {
Expand All @@ -349,10 +349,10 @@ private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuff
KafkaStreams streams = buildStreams(builder, applicationId, ksqlConfigClone, overriddenStreamsProperties);

return new PersistentQueryMetadata(statementPlanPair.getLeft(),
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM);
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM);
}


Expand Down Expand Up @@ -443,7 +443,7 @@ private long getNextQueryId() {
// TODO: This should probably be changed
private String getBareQueryApplicationId(String serviceId, String transientQueryPrefix) {
return serviceId + transientQueryPrefix +
Math.abs(ThreadLocalRandom.current().nextLong());
Math.abs(ThreadLocalRandom.current().nextLong());
}

private String addTimeSuffix(String original) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public AbstractCreateStreamCommand(final AbstractStreamCreateStatement statement
}

private void checkTopicNameNotNull(Map<String, Expression> properties) {
// TODO: move the check to grammer
// TODO: move the check to grammar
KsqlPreconditions.checkNotNull(
properties.get(DdlConfig.TOPIC_NAME_PROPERTY),
"Topic name should be set in WITH clause.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.confluent.ksql.function;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.Merger;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,7 @@ public static void addFunction(KsqlFunction ksqlFunction) {
}

public static boolean isAnAggregateFunction(String functionName) {
if (ksqlAggregateFunctionMap.get(functionName) != null) {
return true;
}
return false;
return ksqlAggregateFunctionMap.get(functionName) != null;
}

public static KsqlAggregateFunction getAggregateFunction(String functionName, List<Expression>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class KsqlConfig extends AbstractConfig {
KSQL_SERVICE_ID_IMPORTANCE = ConfigDef.Importance.MEDIUM;
public static final String
KSQL_SERVICE_ID_DOC =
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queires in "
"Indicates the ID of the ksql service. It will be used as prefix for all KSQL queries in "
+ "this service.";

public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
public class StringUtil {

public static String cleanQuotes(final String stringWithQuotes) {
// TODO: move check to grammer
// TODO: move check to grammar
if (stringWithQuotes.startsWith("'") && stringWithQuotes.endsWith("'")) {
return stringWithQuotes.substring(1, stringWithQuotes.length() - 1);
} else {
Expand Down
6 changes: 3 additions & 3 deletions ksql-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<cli.skip-execute>false</cli.skip-execute>
<cli.main-class>${main-class}</cli.main-class>
</properties>

<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
Expand All @@ -44,7 +44,7 @@
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -156,4 +156,4 @@
</build>
</profile>
</profiles>
</project>
</project>
2 changes: 1 addition & 1 deletion ksql-metastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@
</resource>
</resources>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,6 @@ private String getKsqlTypeInJson(final Schema schemaType) {
return "STRING";
} else if (schemaType == Schema.FLOAT64_SCHEMA) {
return "DOUBLE";
} else if (schemaType == Schema.INT64_SCHEMA) {
return "INTEGER";
} else if (schemaType == Schema.BOOLEAN_SCHEMA) {
return "BOOL";
}
Expand Down Expand Up @@ -277,8 +275,7 @@ public void writeMetastoreToFile(String filePath, MetaStore metaStore) {
addSchemas(stringBuilder, metaStore.getAllStructuredDataSources());
stringBuilder.append("}");

try {
RandomAccessFile raf = new RandomAccessFile(filePath, "rw");
try (RandomAccessFile raf = new RandomAccessFile(filePath, "rw")) {
raf.writeBytes(stringBuilder.toString());
raf.close();
} catch (IOException e) {
Expand All @@ -302,8 +299,7 @@ private String getAvroSchema(final String schemaFilePath) throws IOException {

public void writeAvroSchemaFile(final String avroSchema, final String filePath) {

try {
RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw");
try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw")) {
randomAccessFile.writeBytes(avroSchema);
randomAccessFile.close();
} catch (IOException e) {
Expand Down
2 changes: 1 addition & 1 deletion ksql-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,4 @@
</resource>
</resources>
</build>
</project>
</project>
2 changes: 1 addition & 1 deletion ksql-rest-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@
</build>
</profile>
</profiles>
</project>
</project>
2 changes: 1 addition & 1 deletion ksql-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@
</resource>
</resources>
</build>
</project>
</project>
Loading