Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Kafka + SQL storage variant, re-using existing kafka storage utils/data model #1026

Merged
merged 19 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
20d2060
Added a Kafka+SQL storage variant
EricWittmann Nov 10, 2020
e76c3d8
introduced overlays for application.properties to avoid putting all p…
EricWittmann Nov 11, 2020
31f2098
Fixed the ksql tests - they all pass!
EricWittmann Nov 11, 2020
5863a97
added some logging to the merge properties mojo
EricWittmann Nov 11, 2020
402db21
push the UUID into the payload and make the kafka message key the art…
EricWittmann Nov 13, 2020
1b874f3
fix selectArtifactMetaDataByGlobalId query bug and add reproducer test
Nov 15, 2020
f4240cb
Merge pull request #10 from famartinrh/learning/kafka-sql/fix-sql-bug
EricWittmann Nov 16, 2020
371a49d
some tweaks based on perf testing
EricWittmann Nov 23, 2020
66105cf
kafka + sql storage variant, reusing streams variant datamodel (#1012)
famarting Nov 23, 2020
7054b23
Merge branch 'learning/kafka-sql' of github.com:Apicurio/apicurio-reg…
EricWittmann Nov 23, 2020
004edd3
Merged changes from master
EricWittmann Nov 23, 2020
163a77d
fixed some bugs in the ksql modified impl, and modified some tests to…
EricWittmann Nov 23, 2020
df0c072
minor TODO
EricWittmann Nov 23, 2020
8e43971
run integration tests and fix storage bug (#1028)
famarting Nov 24, 2020
3ea5801
fix ksql storage - error create/update artifact with metadata (#1029)
famarting Nov 24, 2020
463252b
update after some PR feedback
EricWittmann Nov 24, 2020
9d01a5b
remove some debug methods
EricWittmann Nov 24, 2020
2b2621e
Merge remote-tracking branch 'upstream/master' into learning/kafka-sql
EricWittmann Nov 24, 2020
dda622a
updated the perftest readme
EricWittmann Nov 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Build All Variants
run: mvn clean install -Pprod -Psql -Pinfinispan -Pstreams -Pasyncmem -DskipTests -pl !tests
run: mvn clean install -Pprod -Psql -Pinfinispan -Pstreams -Pasyncmem -Pksql -DskipTests -pl !tests
- name: Run Integration Tests - streams
run: mvn verify -Pacceptance -Pstreams -pl tests -Dmaven.javadoc.skip=true --no-transfer-progress -PdisableSerdesTest
- name: Run Integration Tests - sql
run: mvn verify -Pacceptance -Psql -pl tests -Dmaven.javadoc.skip=true --no-transfer-progress -PdisableSerdesTest
- name: Run Integration Tests - infinispan
run: mvn verify -Pacceptance -Pinfinispan -pl tests -Dmaven.javadoc.skip=true --no-transfer-progress -PdisableSerdesTest
- name: Run Integration Tests - ksql
run: mvn verify -Pacceptance -Pksql -pl tests -Dmaven.javadoc.skip=true --no-transfer-progress -PdisableSerdesTest
- name: Collect logs
if: failure()
run: ./.github/scripts/collect_logs.sh
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Build All Variants
run: mvn clean install -Pprod -Psql -Pinfinispan -Pstreams -Pasyncmem -pl !tests
run: mvn clean install -Pprod -Psql -Pinfinispan -Pstreams -Pasyncmem -Pksql -pl !tests
- name: Login to DockerHub Registry
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
run: echo ${{ secrets.DOCKERHUB_PASSWORD }} | docker login -u ${{ secrets.DOCKERHUB_USERNAME }} --password-stdin
Expand All @@ -51,6 +51,7 @@ jobs:
mvn package -Pprod -Pinfinispan -DskipTests -Ddocker -Ddocker.tag.name=latest-snapshot
mvn package -Pprod -Pstreams -DskipTests -Ddocker -Ddocker.tag.name=latest-snapshot
mvn package -Pprod -Pasyncmem -DskipTests -Ddocker -Ddocker.tag.name=latest-snapshot
mvn package -Pprod -Pksql -DskipTests -Ddocker -Ddocker.tag.name=latest-snapshot
- name: Push The Tagged Docker Images
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
run: |
Expand All @@ -59,6 +60,7 @@ jobs:
docker push apicurio/apicurio-registry-infinispan:latest-snapshot
docker push apicurio/apicurio-registry-streams:latest-snapshot
docker push apicurio/apicurio-registry-asyncmem:latest-snapshot
docker push apicurio/apicurio-registry-ksql:latest-snapshot

kubernetes-tests:
name: Kubernetes Tests
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ The following parameters are available for executable files:

|Option|Command argument|Env. variable|
|---|---|---|
|Data Source URL|`-Dquarkus.datasource.jdbc.url`|`QUARKUS_DATASOURCE_URL`|
|DS Username|`-Dquarkus.datasource.username`|`QUARKUS_DATASOURCE_USERNAME`|
|DS Password|`-Dquarkus.datasource.password`|`QUARKUS_DATASOURCE_PASSWORD`|
|Data Source URL|`-Dquarkus.datasource.jdbc.url`|`REGISTRY_DATASOURCE_URL`|
|DS Username|`-Dquarkus.datasource.username`|`REGISTRY_DATASOURCE_USERNAME`|
|DS Password|`-Dquarkus.datasource.password`|`REGISTRY_DATASOURCE_PASSWORD`|

To see additional options, visit:
- [Data Source config](https://quarkus.io/guides/datasource)
Expand Down Expand Up @@ -141,9 +141,9 @@ services:
ports:
- 8080:8080
environment:
QUARKUS_DATASOURCE_URL: 'jdbc:postgresql://postgres/apicurio-registry'
QUARKUS_DATASOURCE_USERNAME: apicurio-registry
QUARKUS_DATASOURCE_PASSWORD: password
REGISTRY_DATASOURCE_URL: 'jdbc:postgresql://postgres/apicurio-registry'
REGISTRY_DATASOURCE_USERNAME: apicurio-registry
REGISTRY_DATASOURCE_PASSWORD: password
```
- Run `docker-compose -f test.yml up`

Expand Down
2 changes: 2 additions & 0 deletions app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/.apt_generated/
/.apt_generated_tests/
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,24 @@ public Object logMethodEntry(InvocationContext context) throws Exception {
} catch (Throwable t) {
}

logger.debug("ENTERING method [{}] with {} parameters", context.getMethod().getName(), context.getParameters().length);
logEnter(context, logger);
Object rval = context.proceed();
logger.debug("LEAVING method [{}]", context.getMethod().getName());
logLeave(context, logger);
return rval;
}

private void logEnter(InvocationContext context, Logger logger) {
if (context != null && context.getMethod() != null && context.getMethod().getName() != null) {
logger.debug("ENTERING method [{}] with {} parameters", context.getMethod().getName(), context.getParameters().length);
}
}

private void logLeave(InvocationContext context, Logger logger) {
if (context != null && context.getMethod() != null && context.getMethod().getName() != null) {
logger.debug("LEAVING method [{}]", context.getMethod().getName());
}
}

/**
* Gets a logger for the given target class.
* @param targetClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ public class ArtifactNotFoundException extends NotFoundException {

private String artifactId;

public ArtifactNotFoundException(Throwable cause) {
super(cause);
}

/**
* Constructor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,35 @@ public class RegistryStorageProducer {
@Inject
Instance<RegistryStorage> storages;

@Inject
Instance<RegistryStorageProvider> provider;

@Inject
EventsService eventsService;

@Produces
@ApplicationScoped
@Current
public RegistryStorage realImpl() {
List<RegistryStorage> list = storages.stream().collect(Collectors.toList());

RegistryStorage impl = null;
if (list.size() == 1) {
impl = list.get(0);

if (provider.isResolvable()) {
impl= provider.get().storage();
} else {
for (RegistryStorage rs : list) {
if (rs instanceof InMemoryRegistryStorage == false) {
impl = rs;
break;
List<RegistryStorage> list = storages.stream().collect(Collectors.toList());
if (list.size() == 1) {
impl = list.get(0);
} else {
for (RegistryStorage rs : list) {
if (rs instanceof InMemoryRegistryStorage == false) {
impl = rs;
break;
}
}
}
}

if (impl != null) {
log.info(String.format("Using RegistryStore: %s", impl.getClass().getName()));
if (eventsService.isConfigured()) {
Expand All @@ -67,6 +77,7 @@ public RegistryStorage realImpl() {
return impl;
}
}
throw new IllegalStateException("Should not be here ... ?!");

throw new IllegalStateException("No RegistryStorage available on the classpath!");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.apicurio.registry.storage;

/**
* @author Fabian Martinez
*/
public interface RegistryStorageProvider {

RegistryStorage storage();

}
96 changes: 3 additions & 93 deletions app/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,9 @@ quarkus.index-dependency.jaxrs.artifact-id=jboss-jaxrs-api_2.1_spec

# === Dev profile - see README
%dev.quarkus.http.port=${HTTP_PORT:8080}
%dev.quarkus.log.level=${LOG_LEVEL:DEBUG}
%dev.quarkus.log.level=${LOG_LEVEL:INFO}
%dev.quarkus.log.category."io.apicurio".level=${REGISTRY_LOG_LEVEL:DEBUG}
%dev.quarkus.log.console.enable=true
%dev.quarkus.datasource.db-kind=postgresql
%dev.quarkus.datasource.jdbc.url=${QUARKUS_DATASOURCE_URL:jdbc:postgresql://localhost:5432/registry}
%dev.quarkus.datasource.username=${QUARKUS_DATASOURCE_USERNAME:postgres}
%dev.quarkus.datasource.password=${QUARKUS_DATASOURCE_PASSWORD:postgres}
%dev.quarkus.datasource.jdbc.initial-size=20
%dev.quarkus.datasource.jdbc.min-size=20
%dev.quarkus.datasource.jdbc.max-size=100
%dev.registry.sql.init=true

# Kafka -- when used
%dev.registry.kafka.common.bootstrap.servers=${bootstrap.servers:localhost:9092}
# Kafka - Registry producer
%dev.registry.kafka.storage-producer.enable.idempotence=true
#%dev.registry.kafka.storage-producer.max.in.flight.requests.per.connection=5
%dev.registry.kafka.storage-producer.retries=3
%dev.registry.kafka.storage-producer.acks=all
# Kafka - Snapshot handling
%dev.registry.kafka.snapshot-producer.enable.idempotence=true
%dev.registry.kafka.snapshot-producer.retries=3
%dev.registry.kafka.snapshot-producer.acks=all
%dev.registry.kafka.snapshot.requests=1000

# Streams -- when used
%dev.registry.streams.common.bootstrap.servers=${bootstrap.servers:localhost:9092}
%dev.registry.streams.topology.application.id=apicurio-registry
%dev.registry.streams.topology.application.server=localhost:9000
%dev.registry.streams.topology.num.stream.threads=2
%dev.registry.streams.topology.num.standby.replicas=1
%dev.registry.streams.topology.processing.guarantee=exactly_once
%dev.registry.streams.topology.replication.factor=1
%dev.registry.streams.topology.global.id.topic=global-id-topic
%dev.registry.streams.topology.storage.topic=storage-topic
%dev.registry.streams.storage-producer.enable.idempotence=true
#%dev.registry.streams.storage-producer.max.in.flight.requests.per.connection=5
%dev.registry.streams.storage-producer.retries=3
%dev.registry.streams.storage-producer.acks=all
%dev.registry.streams.events.enable.idempotence=true
%dev.registry.streams.events.retries=3
%dev.registry.streams.events.acks=all

# Asyncmem -- when used
%dev.registry.asyncmem.delays.create=${ASYNC_DELAYS_CREATE:1000}
%dev.registry.asyncmem.delays.update=${ASYNC_DELAYS_UPDATE:1000}

# Infinispan
%dev.registry.infinispan.cluster.name=${INFINISPAN_CLUSTER_NAME:apicurio-registry}

# === Prod profile - see README
# HTTP Port
Expand All @@ -61,52 +16,7 @@ quarkus.index-dependency.jaxrs.artifact-id=jboss-jaxrs-api_2.1_spec
# Log
%prod.quarkus.log.level=${LOG_LEVEL:INFO}
%prod.quarkus.log.console.enable=true

# SQL
%prod.quarkus.datasource.db-kind=postgresql
%prod.quarkus.datasource.jdbc.url=${QUARKUS_DATASOURCE_URL}
%prod.quarkus.datasource.username=${QUARKUS_DATASOURCE_USERNAME:postgres}
%prod.quarkus.datasource.password=${QUARKUS_DATASOURCE_PASSWORD:postgres}
%prod.quarkus.datasource.jdbc.initial-size=20
%prod.quarkus.datasource.jdbc.min-size=20
%prod.quarkus.datasource.jdbc.max-size=100
%prod.registry.sql.init=true

# Kafka
%prod.registry.kafka.common.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
# Kafka - Registry producer
%prod.registry.kafka.storage-producer.enable.idempotence=true
#%prod.registry.kafka.storage-producer.max.in.flight.requests.per.connection=5
%prod.registry.kafka.storage-producer.retries=3
%prod.registry.kafka.storage-producer.acks=all

# Kafka - Snapshot handling
%prod.registry.kafka.snapshot-producer.enable.idempotence=true
%prod.registry.kafka.snapshot-producer.retries=3
%prod.registry.kafka.snapshot-producer.acks=all
%prod.registry.kafka.snapshot.requests=1000

# Streams -- when used
%prod.registry.streams.common.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
%prod.registry.streams.topology.application.id=${APPLICATION_ID}
%prod.registry.streams.topology.application.server=${APPLICATION_SERVER_HOST:localhost}:${APPLICATION_SERVER_PORT:9000}
%prod.registry.streams.topology.num.stream.threads=2
%prod.registry.streams.topology.num.standby.replicas=1
%prod.registry.streams.topology.processing.guarantee=exactly_once
%prod.registry.streams.topology.replication.factor=1
%prod.registry.streams.topology.global.id.topic=global-id-topic
%prod.registry.streams.topology.storage.topic=storage-topic
%prod.registry.streams.storage-producer.enable.idempotence=true
#%prod.registry.streams.storage-producer.max.in.flight.requests.per.connection=5
%prod.registry.streams.storage-producer.retries=3
%prod.registry.streams.storage-producer.acks=all

# Asyncmem -- when used
%prod.registry.asyncmem.delays.create=${ASYNC_DELAYS_CREATE:1000}
%prod.registry.asyncmem.delays.update=${ASYNC_DELAYS_UPDATE:1000}

# Infinispan
%prod.registry.infinispan.cluster.name=${INFINISPAN_CLUSTER_NAME:apicurio-registry}
%prod.quarkus.log.category."io.apicurio".level=${REGISTRY_LOG_LEVEL:INFO}

# Search
# This currently disables search --> noop, remove once search is enabled
Expand Down
15 changes: 5 additions & 10 deletions app/src/test/java/io/apicurio/registry/RegistryClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void testAsyncCRUD(RegistryRestClient restClient) throws Exception {

@RegistryRestClientTest
void testSearchArtifact(RegistryRestClient restClient) throws Exception {
try {
// warm-up
restClient.listArtifacts();

Expand All @@ -106,11 +107,6 @@ void testSearchArtifact(RegistryRestClient restClient) throws Exception {

this.waitForGlobalId(id);

retry(() -> {
ArtifactMetaData artifactMetaData = restClient.getArtifactMetaDataByGlobalId(id);
Assertions.assertNotNull(artifactMetaData);
});

ArtifactSearchResults results = restClient.searchArtifacts(name, SearchOver.name, SortOrder.asc, 0, 2);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());
Expand All @@ -121,6 +117,10 @@ void testSearchArtifact(RegistryRestClient restClient) throws Exception {
results = restClient.searchArtifacts(null, null, null, null, null);
Assertions.assertNotNull(results);
Assertions.assertTrue(results.getCount() > 0);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}

@RegistryRestClientTest
Expand Down Expand Up @@ -293,13 +293,8 @@ public void testLabels(RegistryRestClient restClient) throws Exception {
});

retry((() -> {
System.out.println("==============================");
ArtifactSearchResults results = client
.searchArtifacts("registry-client-test-testLabels", SearchOver.labels, SortOrder.asc, 0, 2);
results.getArtifacts().forEach(arty -> {
System.out.println(arty);
});
System.out.println("==============================");
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());
Assertions.assertEquals(1, results.getArtifacts().size());
Expand Down
Loading