diff --git a/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProducer.java b/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProducer.java index 7ef07c6347..a383d4192c 100644 --- a/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProducer.java +++ b/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProducer.java @@ -38,10 +38,18 @@ public class RegistryStorageProducer { @Inject Instance storages; + @Inject + Instance provider; + @Produces @ApplicationScoped @Current public RegistryStorage realImpl() { + + if (provider.isResolvable()) { + return provider.get().storage(); + } + List list = storages.stream().collect(Collectors.toList()); RegistryStorage impl = null; if (list.size() == 1) { diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/TestTypes.java b/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProvider.java similarity index 62% rename from storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/TestTypes.java rename to app/src/main/java/io/apicurio/registry/storage/RegistryStorageProvider.java index 4ba1ca71c9..7cedb52193 100644 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/TestTypes.java +++ b/app/src/main/java/io/apicurio/registry/storage/RegistryStorageProvider.java @@ -13,19 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package io.apicurio.registry.storage.impl.ksql; +package io.apicurio.registry.storage; /** - * @author eric.wittmann@gmail.com + * @author Fabian Martinez */ -public class TestTypes { - - public static void main(String[] args) { - System.out.println(long.class); - System.out.println(Long.class); - System.out.println(Long.class == long.class); - System.out.println(Long.class.equals(long.class)); - } +public interface RegistryStorageProvider { + + RegistryStorage storage(); } diff --git a/perftest/pom.xml b/perftest/pom.xml index fe82468d86..02c6a68bbd 100644 --- a/perftest/pom.xml +++ b/perftest/pom.xml @@ -14,6 +14,7 @@ io.gatling.highcharts gatling-charts-highcharts + 3.4.1 test diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecord.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecord.java deleted file mode 100644 index 516c69d196..0000000000 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecord.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2020 Red Hat - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.apicurio.registry.storage.impl.ksql; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.codec.binary.Base64; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import io.apicurio.registry.content.ContentHandle; - -/** - * @author eric.wittmann@gmail.com - */ -public class JournalRecord { - - private static ObjectMapper mapper = new ObjectMapper(); - - /** - * Creates a journal entry for the given method name and arguments. - * @param methodName - * @param arguments - */ - public static JournalRecord create(UUID uuid, String methodName, Object ...arguments) { - JournalRecord record = new JournalRecord(); - record.setUuid(uuid); - record.setMethod(methodName); - if (arguments != null && arguments.length > 0) { - List recordArgs = new ArrayList<>(arguments.length); - for (Object argument : arguments) { - JournalRecordArgument jra = new JournalRecordArgument(); - // Special handling for ContentHandle - if (argument instanceof ContentHandle) { - byte[] bytes = ((ContentHandle) argument).bytes(); - String b64 = Base64.encodeBase64String(bytes); - jra.setClassName(ContentHandle.class.getName()); - jra.setValue(mapper.valueToTree(b64)); - } else { - jra.setClassName(argument.getClass().getName()); - jra.setValue(mapper.valueToTree(argument)); - } - recordArgs.add(jra); - } - record.setArguments(recordArgs); - } else { - record.setArguments(Collections.emptyList()); - } - return record; - } - - private UUID uuid; - private String method; - private List arguments; - - /** - * Constructor. - */ - public JournalRecord() { - } - - /** - * @return the method - */ - public String getMethod() { - return method; - } - - /** - * @param method the method to set - */ - public void setMethod(String method) { - this.method = method; - } - - /** - * @return the arguments - */ - public List getArguments() { - return arguments; - } - - /** - * @param arguments the arguments to set - */ - public void setArguments(List arguments) { - this.arguments = arguments; - } - - /** - * @return the uuid - */ - public UUID getUuid() { - return uuid; - } - - /** - * @param uuid the uuid to set - */ - public void setUuid(UUID uuid) { - this.uuid = uuid; - } - -} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordArgument.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordArgument.java deleted file mode 100644 index 3f52fb16d9..0000000000 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordArgument.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2020 Red Hat - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.apicurio.registry.storage.impl.ksql; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * @author eric.wittmann@gmail.com - */ -public class JournalRecordArgument { - - private String className; - private JsonNode value; - - /** - * Constructor. - */ - public JournalRecordArgument() { - } - - /** - * @return the value - */ - public JsonNode getValue() { - return value; - } - - /** - * @param value the value to set - */ - public void setValue(JsonNode value) { - this.value = value; - } - - /** - * @return the className - */ - public String getClassName() { - return className; - } - - /** - * @param className the className to set - */ - public void setClassName(String className) { - this.className = className; - } - -} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordDeserializer.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordDeserializer.java deleted file mode 100644 index ee8317d891..0000000000 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordDeserializer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2020 Red Hat - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.apicurio.registry.storage.impl.ksql; - -import java.io.IOException; - -import org.apache.kafka.common.serialization.Deserializer; - -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * @author eric.wittmann@gmail.com - */ -public class JournalRecordDeserializer implements Deserializer { - - private ObjectMapper mapper = new ObjectMapper(); - - /** - * @see org.apache.kafka.common.serialization.Deserializer#deserialize(java.lang.String, byte[]) - */ - @Override - public JournalRecord deserialize(String topic, byte[] data) { - try { - return mapper.readValue(data, JournalRecord.class); - } catch (IOException e) { - // TODO figure something out here - e.printStackTrace(); - return null; - } - } - -} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordSerializer.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordSerializer.java deleted file mode 100644 index 9cb15abae1..0000000000 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/JournalRecordSerializer.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2020 Red Hat - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.apicurio.registry.storage.impl.ksql; - -import org.apache.kafka.common.serialization.Serializer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -/** - * @author eric.wittmann@gmail.com - */ -public class JournalRecordSerializer implements Serializer { - - private ObjectMapper mapper = new ObjectMapper(); - - /** - * @see org.apache.kafka.common.serialization.Serializer#serialize(java.lang.String, java.lang.Object) - */ - @Override - public byte[] serialize(String topic, JournalRecord data) { - try { - return mapper.writeValueAsBytes(data); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - -} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlCoordinator.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlCoordinator.java index 1d6c6a50ea..13a28c4820 100644 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlCoordinator.java +++ b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlCoordinator.java @@ -24,16 +24,18 @@ import javax.enterprise.context.ApplicationScoped; +import io.apicurio.registry.types.RegistryException; + /** * Coordinates "write" responses across threads in the Kafka-SQL storage implementation. Basically this is used * to communicate between the Kafka consumer thread and the waiting HTTP/API thread, where the HTTP thread is * waiting for an operation to be completed by the Kafka consumer thread. - * + * * @author eric.wittmann@gmail.com */ @ApplicationScoped public class KafkaSqlCoordinator { - + private static final Object NULL = new Object(); private Map latches = new ConcurrentHashMap<>(); private Map returnValues = new ConcurrentHashMap<>(); @@ -49,21 +51,28 @@ public UUID createUUID() { /** * Waits for a response to the operation with the given UUID. There is a countdown latch for each operation. The - * caller waiting for the response will wait for the countdown to happen and then proceed. We also remove + * caller waiting for the response will wait for the countdown to happen and then proceed. We also remove * the latch from the Map here since it's not needed anymore. - * + * * @param uuid - * @throws InterruptedException + * @throws InterruptedException */ - public Object waitForResponse(UUID uuid) throws InterruptedException { + public Object waitForResponse(UUID uuid) { // TODO timeout should be configurable - latches.get(uuid).await(30, TimeUnit.SECONDS); - latches.remove(uuid); - Object rval = returnValues.remove(uuid); - if (rval == NULL) { - return null; - } else { + try { + latches.get(uuid).await(30, TimeUnit.SECONDS); + + Object rval = returnValues.remove(uuid); + if (rval == NULL) { + return null; + } else if (rval instanceof RegistryException) { + throw (RegistryException) rval; + } return rval; + } catch (InterruptedException e) { + throw new RegistryException(e); + } finally { + latches.remove(uuid); } } @@ -71,9 +80,15 @@ public Object waitForResponse(UUID uuid) throws InterruptedException { * Countdown the latch for the given UUID. This will wake up the thread waiting for the response * so that it can proceed. * @param uuid - * @param returnValue + * @param returnValue */ public void notifyResponse(UUID uuid, Object returnValue) { + + //we are re-using the topic from a streams based registry instance + if (uuid == null) { + return; + } + // If there is no countdown latch, then there is no HTTP thread waiting for // a response. This means one of two possible things: // 1) We're in a cluster and the HTTP thread is on another node diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlDispatcher.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlDispatcher.java deleted file mode 100644 index 2f4fae977b..0000000000 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlDispatcher.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2020 Red Hat - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.apicurio.registry.storage.impl.ksql; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; - -import javax.enterprise.context.ApplicationScoped; - -import org.apache.commons.codec.binary.Base64; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import io.apicurio.registry.content.ContentHandle; -import io.apicurio.registry.storage.RegistryStorage; -import io.apicurio.registry.storage.RegistryStorageException; -import io.apicurio.registry.types.RegistryException; - -/** - * Responsible for dispatching a {@link JournalRecord} to the storage. This basically means analyzing the {@link JournalRecord} - * to determine which method on the storage should be invoked, unmarshalling the arguments, and then invoking the correct - * storage method. The proper value (or exception) should then be returned. - * @author eric.wittmann@gmail.com - */ -@ApplicationScoped -public class KafkaSqlDispatcher { - - private static final ObjectMapper mapper = new ObjectMapper(); - - /** - * Dispatch the given journal record back to the storage. - * @param record - * @param storage - */ - public Object dispatchTo(JournalRecord record, RegistryStorage storage) { - try { - Class [] types = new Class [record.getArguments().size()]; - Object [] args = new Object[record.getArguments().size()]; - List arguments = record.getArguments(); - int idx = 0; - for (JournalRecordArgument argument : arguments) { - Class c = Class.forName(argument.getClassName()); - // Hack: the storage has methods that take a long but not a Long. But when we serialized - // the arguments, auto-boxing converted from long to Long. Reverse that here. - if (c.equals(Long.class)) { - c = long.class; - } - types[idx] = c; - if (c.equals(ContentHandle.class)) { - String b64Data = mapper.treeToValue(argument.getValue(), String.class); - byte[] data = Base64.decodeBase64(b64Data); - args[idx] = ContentHandle.create(data); - } else { - args[idx] = mapper.treeToValue(argument.getValue(), c); - } - idx++; - } - Method method = storage.getClass().getMethod(record.getMethod(), types); - return method.invoke(storage, args); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException e) { - return new RegistryException(e); - } catch (InvocationTargetException e) { - return unwrapInvocationTargetException(e); - } catch (Throwable t) { - return new RegistryException(t); - } - } - - private static RegistryException unwrapInvocationTargetException(InvocationTargetException e) { - Throwable cause = e; - while (!(cause instanceof RegistryException) && cause != null) { - cause = e.getCause(); - } - if (cause != null) { - return (RegistryException) cause; - } - return new RegistryStorageException(e.getCause()); - } - -} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlRegistryStorage.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlRegistryStorage.java index c49cd14d7f..0b9ead3f4f 100644 --- a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlRegistryStorage.java +++ b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlRegistryStorage.java @@ -28,24 +28,30 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.SortedSet; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; - -import javax.annotation.PostConstruct; +import java.util.function.Function; import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; import javax.inject.Inject; +import javax.inject.Named; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.serialization.Serdes; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.metrics.annotation.ConcurrentGauge; import org.eclipse.microprofile.metrics.annotation.Counted; @@ -57,26 +63,42 @@ import io.apicurio.registry.logging.Logged; import io.apicurio.registry.metrics.PersistenceExceptionLivenessApply; import io.apicurio.registry.metrics.PersistenceTimeoutReadinessApply; +import io.apicurio.registry.rest.beans.ArtifactSearchResults; +import io.apicurio.registry.rest.beans.SearchOver; +import io.apicurio.registry.rest.beans.SortOrder; +import io.apicurio.registry.rest.beans.VersionSearchResults; import io.apicurio.registry.storage.ArtifactAlreadyExistsException; import io.apicurio.registry.storage.ArtifactMetaDataDto; import io.apicurio.registry.storage.ArtifactNotFoundException; +import io.apicurio.registry.storage.ArtifactStateExt; +import io.apicurio.registry.storage.ArtifactVersionMetaDataDto; import io.apicurio.registry.storage.EditableArtifactMetaDataDto; +import io.apicurio.registry.storage.RegistryStorage; import io.apicurio.registry.storage.RegistryStorageException; import io.apicurio.registry.storage.RuleAlreadyExistsException; import io.apicurio.registry.storage.RuleConfigurationDto; import io.apicurio.registry.storage.RuleNotFoundException; +import io.apicurio.registry.storage.StoredArtifact; import io.apicurio.registry.storage.VersionNotFoundException; -import io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage; +import io.apicurio.registry.storage.impl.AbstractRegistryStorage; +import io.apicurio.registry.storage.impl.ksql.sql.KafkaSQLSink; +import io.apicurio.registry.storage.proto.Str; import io.apicurio.registry.types.ArtifactState; import io.apicurio.registry.types.ArtifactType; -import io.apicurio.registry.types.RegistryException; import io.apicurio.registry.types.RuleType; +import io.apicurio.registry.util.DtoUtil; +import io.apicurio.registry.utils.ConcurrentUtil; +import io.apicurio.registry.utils.kafka.AsyncProducer; +import io.apicurio.registry.utils.kafka.ProducerActions; +import io.apicurio.registry.utils.kafka.ProtoSerde; +import io.apicurio.registry.utils.kafka.Submitter; +import io.quarkus.runtime.StartupEvent; /** * An implementation of a registry storage that extends the basic SQL storage but federates 'write' operations - * to other nodes in a cluster using a Kafka topic. As a result, all reads are performed locally but all + * to other nodes in a cluster using a Kafka topic. As a result, all reads are performed locally but all * writes are published to a topic for consumption by all nodes. - * + * * @author eric.wittmann@gmail.com */ @ApplicationScoped @@ -87,20 +109,33 @@ @Timed(name = STORAGE_OPERATION_TIME, description = STORAGE_OPERATION_TIME_DESC, tags = {"group=" + STORAGE_GROUP_TAG, "metric=" + STORAGE_OPERATION_TIME}, unit = MILLISECONDS) @Logged @SuppressWarnings("unchecked") -public class KafkaSqlRegistryStorage extends AbstractSqlRegistryStorage { +public class KafkaSqlRegistryStorage extends AbstractRegistryStorage { private static final Logger log = LoggerFactory.getLogger(KafkaSqlRegistryStorage.class); + /* Fake global rules as an artifact */ + public static final String GLOBAL_RULES_ID = "__GLOBAL_RULES__"; + + @Inject + KafkaSqlCoordinator coordinator; + + @Inject + KafkaSQLSink kafkaSqlSink; + + @Inject + @Named("SQLRegistryStorage") + RegistryStorage sqlStorage; + @Inject @ConfigProperty(name = "registry.ksql.globalRuleKey", defaultValue = "__global_rule") String globalRuleKey; - + @Inject @ConfigProperty(name = "registry.ksql.bootstrap.servers") String bootstrapServers; @Inject - @ConfigProperty(name = "registry.ksql.topic", defaultValue = "ksql-journal") + @ConfigProperty(name = "registry.ksql.topic", defaultValue = "storage-topic") String topic; @Inject @@ -110,44 +145,47 @@ public class KafkaSqlRegistryStorage extends AbstractSqlRegistryStorage { @Inject @ConfigProperty(name = "registry.ksql.consumer.poll.timeout", defaultValue = "1000") Integer pollTimeout; - - @Inject - KafkaSqlCoordinator coordinator; - @Inject - KafkaSqlDispatcher dispatcher; - private boolean stopped = true; - private KafkaProducer producer; - private KafkaConsumer consumer; - private ThreadLocal applying = ThreadLocal.withInitial(() -> Boolean.FALSE); + private ProducerActions storageProducer; + private KafkaConsumer consumer; + private Submitter submitter; - @PostConstruct - void onConstruct() { + void onConstruct(@Observes StartupEvent ev) { log.info("Using Kafka-SQL storage."); // Start the Kafka Consumer thread consumer = createKafkaConsumer(); startConsumerThread(consumer); - - producer = createKafkaProducer(); + + storageProducer = createKafkaProducer(); + submitter = new Submitter(this::send); } - + @PreDestroy void onDestroy() { - stop(); - } - - public void stop() { stopped = true; } + private CompletableFuture send(Str.StorageValue value) { + UUID requestId = coordinator.createUUID(); + RecordHeader h = new RecordHeader("req", requestId.toString().getBytes()); + ProducerRecord record = new ProducerRecord<>( + topic, + 0, + value.getArtifactId(), // MUST be set + value, + Collections.singletonList(h) + ); + return storageProducer.apply(record).thenApply(rm -> requestId); + } + /** - * Start the KSQL Kafka consumer thread which is responsible for subscribing to the kafka topic, + * Start the KSQL Kafka consumer thread which is responsible for subscribing to the kafka topic, * consuming JournalRecord entries found on that topic, and applying those journal entries to * the internal data model. * @param consumer */ - private void startConsumerThread(final KafkaConsumer consumer) { + private void startConsumerThread(final KafkaConsumer consumer) { log.info("Starting KSQL consumer thread on topic: {}", topic); log.info("Bootstrap servers: " + bootstrapServers); Runnable runner = () -> { @@ -165,13 +203,26 @@ private void startConsumerThread(final KafkaConsumer cons // Main consumer loop while (!stopped) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeout)); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeout)); if (records != null && !records.isEmpty()) { log.debug("Consuming {} journal records.", records.count()); records.forEach(record -> { - JournalRecord journalRecord = record.value(); - // TODO instead of processing the journal record directly on the consumer thread, instead queue them and have *another* thread process the queue? - processJournalRecord(journalRecord); + + UUID req = Optional.ofNullable(record.headers().headers("req")) + .map(Iterable::iterator) + .map(it -> { + return it.hasNext() ? it.next() : null; + }) + .map(Header::value) + .map(String::new) + .map(UUID::fromString) + .orElse(null); + + String artifactId = record.key(); + Str.StorageValue storageAction = record.value(); + + // TODO instead of processing the journal record directly on the consumer thread, instead queue them and have *another* thread process the queue + kafkaSqlSink.processStorageAction(req, artifactId, storageAction); }); } } @@ -186,43 +237,27 @@ private void startConsumerThread(final KafkaConsumer cons thread.start(); } - /** - * Process a single journal record found on the Kafka topic. - * @param journalRecord - */ - private void processJournalRecord(JournalRecord journalRecord) { - log.debug("[{}] Processing journal record of type {}", journalRecord.getUuid(), journalRecord.getMethod()); - applying.set(Boolean.TRUE); - try { - Object returnValueOrException = dispatcher.dispatchTo(journalRecord, this); - coordinator.notifyResponse(journalRecord.getUuid(), returnValueOrException); - } finally { - applying.set(Boolean.FALSE); - } - } - /** * Creates the Kafka producer. */ - private KafkaProducer createKafkaProducer() { + private ProducerActions createKafkaProducer() { Properties props = new Properties(); // Configure kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + topic); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); - props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JournalRecordSerializer.class.getName()); // Create the Kafka producer - KafkaProducer producer = new KafkaProducer<>(props); - return producer; + return new AsyncProducer(props, + Serdes.String().serializer(), + ProtoSerde.parsedWith(Str.StorageValue.parser())); } /** * Creates the Kafka consumer. */ - private KafkaConsumer createKafkaConsumer() { + private KafkaConsumer createKafkaConsumer() { Properties props = new Properties(); props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -230,289 +265,330 @@ private KafkaConsumer createKafkaConsumer() { props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JournalRecordDeserializer.class.getName()); // Create the Kafka Consumer - KafkaConsumer consumer = new KafkaConsumer<>(props); + KafkaConsumer consumer = new KafkaConsumer<>(props, + Serdes.String().deserializer(), + ProtoSerde.parsedWith(Str.StorageValue.parser())); return consumer; } - private boolean isApplying() { - return applying.get(); + private void updateArtifactState(ArtifactState currentState, String artifactId, Integer version, ArtifactState state) { + ArtifactStateExt.applyState( + s -> { + UUID reqId = ConcurrentUtil.get(submitter.submitState(artifactId, version.longValue(), s)); + coordinator.waitForResponse(reqId); + }, + currentState, + state + ); } - /** - * Create a journal record and publish it to the Kafka topic. - * @param journalKey - * @param methodName - * @param arguments - * @throws RegistryException - */ - private Object journalAndWait(String journalKey, String methodName, Object ...arguments) throws RegistryException { - UUID uuid = coordinator.createUUID(); - log.debug("[{}] Publishing journal record of type {}", uuid, methodName); - JournalRecord record = JournalRecord.create(uuid, methodName, arguments); - ProducerRecord producerRecord = new ProducerRecord(topic, journalKey, record); - producer.send(producerRecord); - Object rval; + //TODO implement is Ready and is alive checking if the state is fully updated + + @Override + public void updateArtifactState(String artifactId, ArtifactState state) { + ArtifactMetaDataDto metadata = sqlStorage.getArtifactMetaData(artifactId); + updateArtifactState(metadata.getState(), artifactId, metadata.getVersion(), state); + } + + @Override + public void updateArtifactState(String artifactId, ArtifactState state, Integer version) { + ArtifactVersionMetaDataDto metadata = sqlStorage.getArtifactVersionMetaData(artifactId, version); + updateArtifactState(metadata.getState(), artifactId, version, state); + } + + @Override + public CompletionStage createArtifact(String artifactId, ArtifactType artifactType, ContentHandle content) throws ArtifactAlreadyExistsException, RegistryStorageException { + try { - log.debug("[{}] Waiting for journal record response for type {}", uuid, methodName); - rval = coordinator.waitForResponse(uuid); - } catch (InterruptedException e) { - throw new RegistryStorageException(e); - } - if (rval instanceof RegistryException) { - throw (RegistryException) rval; - } else { - return rval; + sqlStorage.getArtifactMetaData(artifactId); + throw new ArtifactAlreadyExistsException(artifactId); + } catch (ArtifactNotFoundException e) { + //ignored + //artifact does not exist, we can create it } + + return submitter + .submitArtifact(Str.ActionType.CREATE, artifactId, -1, artifactType, content.bytes()) + .thenCompose(reqId -> { + return (CompletionStage) coordinator.waitForResponse(reqId); + }); } - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#createGlobalRule(io.apicurio.registry.types.RuleType, io.apicurio.registry.storage.RuleConfigurationDto) - */ @Override - public void createGlobalRule(RuleType rule, RuleConfigurationDto config) - throws RuleAlreadyExistsException, RegistryStorageException { - if (isApplying()) { - super.createGlobalRule(rule, config); - } else { - journalAndWait(globalRuleKey, "createGlobalRule", rule, config); - } + public CompletionStage createArtifactWithMetadata(String artifactId, ArtifactType artifactType, ContentHandle content, EditableArtifactMetaDataDto metaData) throws ArtifactAlreadyExistsException, RegistryStorageException { + return createArtifact(artifactId, artifactType, content) + .thenCompose(amdd -> submitter.submitMetadata(Str.ActionType.UPDATE, artifactId, -1, metaData.getName(), metaData.getDescription(), metaData.getLabels(), metaData.getProperties()) + .thenApply(v -> DtoUtil.setEditableMetaDataInArtifact(amdd, metaData))); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateGlobalRule(io.apicurio.registry.types.RuleType, io.apicurio.registry.storage.RuleConfigurationDto) - */ + @Override - public void updateGlobalRule(RuleType rule, RuleConfigurationDto config) - throws RuleNotFoundException, RegistryStorageException { - if (isApplying()) { - super.updateGlobalRule(rule, config); - } else { - journalAndWait(globalRuleKey, "updateGlobalRule", rule, config); - } + public SortedSet deleteArtifact(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + + //to verify artifact exists + //TODO implement a low level storage api that provides methods like, exists, ... + sqlStorage.getArtifactMetaData(artifactId); + + UUID reqId = ConcurrentUtil.get(submitter.submitArtifact(Str.ActionType.DELETE, artifactId, -1, null, null)); + SortedSet versionIds = (SortedSet) coordinator.waitForResponse(reqId); + + return versionIds; } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteGlobalRules() - */ + @Override - public void deleteGlobalRules() throws RegistryStorageException { - if (isApplying()) { - super.deleteGlobalRules(); - } else { - journalAndWait(globalRuleKey, "deleteGlobalRules"); - } + public StoredArtifact getArtifact(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifact(artifactId); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteGlobalRule(io.apicurio.registry.types.RuleType) - */ + @Override - public void deleteGlobalRule(RuleType rule) throws RuleNotFoundException, RegistryStorageException { - if (isApplying()) { - super.deleteGlobalRule(rule); - } else { - journalAndWait(globalRuleKey, "deleteGlobalRule", rule); + public CompletionStage updateArtifact(String artifactId, ArtifactType artifactType, ContentHandle content) throws ArtifactNotFoundException, RegistryStorageException { + + try { + sqlStorage.getArtifactMetaData(artifactId); + } catch (ArtifactNotFoundException e) { + throw e; } + + return submitter + .submitArtifact(Str.ActionType.UPDATE, artifactId, -1, artifactType, content.bytes()) + .thenCompose(reqId -> (CompletionStage) coordinator.waitForResponse(reqId)); + } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteArtifact(java.lang.String) - */ + + @Override - public SortedSet deleteArtifact(String artifactId) - throws ArtifactNotFoundException, RegistryStorageException { - if (isApplying()) { - return super.deleteArtifact(artifactId); - } else { - return (SortedSet) journalAndWait(artifactId, "deleteArtifact", artifactId); - } + public CompletionStage updateArtifactWithMetadata(String artifactId, ArtifactType artifactType, ContentHandle content, EditableArtifactMetaDataDto metaData) throws ArtifactAlreadyExistsException, RegistryStorageException { + return updateArtifact(artifactId, artifactType, content) + .thenCompose(amdd -> submitter.submitMetadata(Str.ActionType.UPDATE, artifactId, -1, metaData.getName(), metaData.getDescription(), metaData.getLabels(), metaData.getProperties()) + .thenApply(v -> DtoUtil.setEditableMetaDataInArtifact(amdd, metaData))); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteArtifactRule(java.lang.String, io.apicurio.registry.types.RuleType) - */ + + @Override - public void deleteArtifactRule(String artifactId, RuleType rule) - throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException { - if (isApplying()) { - super.deleteArtifactRule(artifactId, rule); - } else { - journalAndWait(artifactId, "deleteArtifactRule", artifactId, rule); - } + public Set getArtifactIds(Integer limit) { + return sqlStorage.getArtifactIds(limit); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteArtifactRules(java.lang.String) - */ + @Override - public void deleteArtifactRules(String artifactId) - throws ArtifactNotFoundException, RegistryStorageException { - if (isApplying()) { - super.deleteArtifactRules(artifactId); - } else { - journalAndWait(artifactId, "deleteArtifactRules", artifactId); - } + public ArtifactSearchResults searchArtifacts(String search, int offset, int limit, SearchOver searchOver, SortOrder sortOrder) { + return sqlStorage.searchArtifacts(search, offset, limit, searchOver, sortOrder); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteArtifactVersion(java.lang.String, long) - */ + @Override - public void deleteArtifactVersion(String artifactId, long version) - throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { - if (isApplying()) { - super.deleteArtifactVersion(artifactId, version); - } else { - journalAndWait(artifactId, "deleteArtifactVersion", artifactId, version); - } + public ArtifactMetaDataDto getArtifactMetaData(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactMetaData(artifactId); } - + /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#deleteArtifactVersionMetaData(java.lang.String, long) + * @see io.apicurio.registry.storage.RegistryStorage#getArtifactVersionMetaData(java.lang.String, boolean, io.apicurio.registry.content.ContentHandle) */ @Override - public void deleteArtifactVersionMetaData(String artifactId, long version) - throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { - if (isApplying()) { - super.deleteArtifactVersionMetaData(artifactId, version); - } else { - journalAndWait(artifactId, "deleteArtifactVersionMetaData", artifactId, version); - } + public ArtifactVersionMetaDataDto getArtifactVersionMetaData(String artifactId, boolean canonical, + ContentHandle content) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactVersionMetaData(artifactId, canonical, content); } - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#createArtifact(java.lang.String, io.apicurio.registry.types.ArtifactType, io.apicurio.registry.content.ContentHandle) - */ @Override - public CompletionStage createArtifact(String artifactId, ArtifactType artifactType, - ContentHandle content) throws ArtifactAlreadyExistsException, RegistryStorageException { - if (isApplying()) { - return super.createArtifact(artifactId, artifactType, content); - } else { - return (CompletionStage) journalAndWait(artifactId, "createArtifact", artifactId, artifactType, content); - } + public ArtifactMetaDataDto getArtifactMetaData(long id) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactMetaData(id); } - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#createArtifactWithMetadata(java.lang.String, io.apicurio.registry.types.ArtifactType, io.apicurio.registry.content.ContentHandle, io.apicurio.registry.storage.EditableArtifactMetaDataDto) - */ @Override - public CompletionStage createArtifactWithMetadata(String artifactId, - ArtifactType artifactType, ContentHandle content, EditableArtifactMetaDataDto metaData) - throws ArtifactAlreadyExistsException, RegistryStorageException { - if (isApplying()) { - return super.createArtifactWithMetadata(artifactId, artifactType, content, metaData); - } else { - return (CompletionStage) journalAndWait(artifactId, "createArtifactWithMetadata", artifactId, artifactType, content, metaData); + public void updateArtifactMetaData(String artifactId, EditableArtifactMetaDataDto metaData) throws ArtifactNotFoundException, RegistryStorageException { + + try { + sqlStorage.getArtifactMetaData(artifactId); + } catch (ArtifactNotFoundException e) { + throw e; } + + UUID reqId = ConcurrentUtil.get(submitter + .submitMetadata(Str.ActionType.UPDATE, artifactId, -1, metaData.getName(), metaData.getDescription(), metaData.getLabels(), metaData.getProperties())); + coordinator.waitForResponse(reqId); } - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#createArtifactRuleAsync(java.lang.String, io.apicurio.registry.types.RuleType, io.apicurio.registry.storage.RuleConfigurationDto) - */ @Override - public CompletionStage createArtifactRuleAsync(String artifactId, RuleType rule, - RuleConfigurationDto config) - throws ArtifactNotFoundException, RuleAlreadyExistsException, RegistryStorageException { - if (isApplying()) { - return super.createArtifactRuleAsync(artifactId, rule, config); - } else { - return (CompletionStage) journalAndWait(artifactId, "createArtifactRuleAsync", artifactId, rule, config); - } + public List getArtifactRules(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactRules(artifactId); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifact(java.lang.String, io.apicurio.registry.types.ArtifactType, io.apicurio.registry.content.ContentHandle) - */ + @Override - public CompletionStage updateArtifact(String artifactId, ArtifactType artifactType, - ContentHandle content) throws ArtifactNotFoundException, RegistryStorageException { - if (isApplying()) { - return super.updateArtifact(artifactId, artifactType, content); - } else { - return (CompletionStage) journalAndWait(artifactId, "updateArtifact", artifactId, artifactType, content); + public CompletionStage createArtifactRuleAsync(String artifactId, RuleType rule, RuleConfigurationDto config) throws ArtifactNotFoundException, RuleAlreadyExistsException, RegistryStorageException { + + try { + sqlStorage.getArtifactRule(artifactId, rule); + throw new RuleAlreadyExistsException(rule); + } catch (RuleNotFoundException e) { + //rule does not exist, we can create it } + + return submitter + .submitRule(Str.ActionType.CREATE, artifactId, rule, config.getConfiguration()) + .thenCompose(reqId -> (CompletionStage) coordinator.waitForResponse(reqId)); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactMetaData(java.lang.String, io.apicurio.registry.storage.EditableArtifactMetaDataDto) - */ + @Override - public void updateArtifactMetaData(String artifactId, EditableArtifactMetaDataDto metaData) - throws ArtifactNotFoundException, RegistryStorageException { - if (isApplying()) { - super.updateArtifactMetaData(artifactId, metaData); - } else { - journalAndWait(artifactId, "updateArtifactMetaData", artifactId, metaData); + public void deleteArtifactRules(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + try { + sqlStorage.getArtifactMetaData(artifactId); + } catch (ArtifactNotFoundException e) { + throw e; } + + deleteArtifactRulesInternal(artifactId); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactRule(java.lang.String, io.apicurio.registry.types.RuleType, io.apicurio.registry.storage.RuleConfigurationDto) - */ + + private void deleteArtifactRulesInternal(String artifactId) { + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.DELETE, artifactId, null, null)); + coordinator.waitForResponse(reqId); + } + @Override - public void updateArtifactRule(String artifactId, RuleType rule, RuleConfigurationDto config) - throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException { - if (isApplying()) { - super.updateArtifactRule(artifactId, rule, config); - } else { - journalAndWait(artifactId, "updateArtifactRule", artifactId, rule, config); - } + public RuleConfigurationDto getArtifactRule(String artifactId, RuleType rule) throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactRule(artifactId, rule); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactState(java.lang.String, io.apicurio.registry.types.ArtifactState) - */ + @Override - public void updateArtifactState(String artifactId, ArtifactState state) { - if (isApplying()) { - super.updateArtifactState(artifactId, state); - } else { - journalAndWait(artifactId, "updateArtifactState", artifactId, state); + public void updateArtifactRule(String artifactId, RuleType rule, RuleConfigurationDto config) throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException { + + try { + sqlStorage.getArtifactRule(artifactId, rule); + } catch (RuleNotFoundException e) { + throw e; } + + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.UPDATE, artifactId, rule, config.getConfiguration())); + coordinator.waitForResponse(reqId); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactState(java.lang.String, io.apicurio.registry.types.ArtifactState, java.lang.Integer) - */ + @Override - public void updateArtifactState(String artifactId, ArtifactState state, Integer version) { - if (isApplying()) { - super.updateArtifactState(artifactId, state, version); - } else { - journalAndWait(artifactId, "updateArtifactState", artifactId, state, version); + public void deleteArtifactRule(String artifactId, RuleType rule) throws ArtifactNotFoundException, RuleNotFoundException, RegistryStorageException { + try { + sqlStorage.getArtifactRule(artifactId, rule); + } catch (RuleNotFoundException e) { + throw e; } + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.DELETE, artifactId, rule, null)); + coordinator.waitForResponse(reqId); } - - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactVersionMetaData(java.lang.String, long, io.apicurio.registry.storage.EditableArtifactMetaDataDto) - */ + + @Override + public SortedSet getArtifactVersions(String artifactId) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactVersions(artifactId); + } + + @Override + public VersionSearchResults searchVersions(String artifactId, int offset, int limit) { + return sqlStorage.searchVersions(artifactId, offset, limit); + } + @Override - public void updateArtifactVersionMetaData(String artifactId, long version, EditableArtifactMetaDataDto metaData) - throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { - if (isApplying()) { - super.updateArtifactVersionMetaData(artifactId, version, metaData); - } else { - journalAndWait(artifactId, "updateArtifactVersionMetaData", artifactId, version, metaData); + public StoredArtifact getArtifactVersion(long id) throws ArtifactNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactVersion(id); + } + + @Override + public StoredArtifact getArtifactVersion(String artifactId, long version) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactVersion(artifactId, version); + } + + @Override + public ArtifactVersionMetaDataDto getArtifactVersionMetaData(String artifactId, long version) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { + return sqlStorage.getArtifactVersionMetaData(artifactId, version); + } + + @Override + public void deleteArtifactVersion(String artifactId, long version) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { + handleVersion(artifactId, version, null, value -> { + UUID reqId = ConcurrentUtil.get(submitter.submitArtifact(Str.ActionType.DELETE, artifactId, version, null, null)); + coordinator.waitForResponse(reqId); + return null; + }); + } + + @Override + public void updateArtifactVersionMetaData(String artifactId, long version, EditableArtifactMetaDataDto metaData) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { + handleVersion( + artifactId, + version, + ArtifactStateExt.ACTIVE_STATES, + value -> { + UUID reqId = ConcurrentUtil.get(submitter + .submitMetadata(Str.ActionType.UPDATE, artifactId, version, metaData.getName(), metaData.getDescription(), metaData.getLabels(), metaData.getProperties())); + coordinator.waitForResponse(reqId); + return null; + } + ); + } + + @Override + public void deleteArtifactVersionMetaData(String artifactId, long version) throws ArtifactNotFoundException, VersionNotFoundException, RegistryStorageException { + handleVersion( + artifactId, + version, + null, + value -> { + UUID reqId = ConcurrentUtil.get(submitter.submitMetadata(Str.ActionType.DELETE, artifactId, version, null, null, Collections.emptyList(), Collections.emptyMap())); + coordinator.waitForResponse(reqId); + return null; + } + ); + } + + private T handleVersion(String artifactId, long version, EnumSet states, Function handler) throws ArtifactNotFoundException, RegistryStorageException { + ArtifactVersionMetaDataDto metadata = sqlStorage.getArtifactVersionMetaData(artifactId, version); + + ArtifactState state = metadata.getState(); + ArtifactStateExt.validateState(states, state, artifactId, version); + return handler.apply(metadata); + } + + @Override + public List getGlobalRules() throws RegistryStorageException { + return sqlStorage.getGlobalRules(); + } + + @Override + public void createGlobalRule(RuleType rule, RuleConfigurationDto config) throws RuleAlreadyExistsException, RegistryStorageException { + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.CREATE, GLOBAL_RULES_ID, rule, config.getConfiguration())); + coordinator.waitForResponse(reqId); + } + + @Override + public void deleteGlobalRules() throws RegistryStorageException { + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.DELETE, GLOBAL_RULES_ID, null, null)); + coordinator.waitForResponse(reqId); + + } + + @Override + public RuleConfigurationDto getGlobalRule(RuleType rule) throws RuleNotFoundException, RegistryStorageException { + return sqlStorage.getGlobalRule(rule); + } + + @Override + public void updateGlobalRule(RuleType rule, RuleConfigurationDto config) throws RuleNotFoundException, RegistryStorageException { + try { + sqlStorage.getGlobalRule(rule); + } catch (RuleNotFoundException e) { + throw e; } + + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.UPDATE, GLOBAL_RULES_ID, rule, config.getConfiguration())); + coordinator.waitForResponse(reqId); + } - /** - * @see io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage#updateArtifactWithMetadata(java.lang.String, io.apicurio.registry.types.ArtifactType, io.apicurio.registry.content.ContentHandle, io.apicurio.registry.storage.EditableArtifactMetaDataDto) - */ @Override - public CompletionStage updateArtifactWithMetadata(String artifactId, - ArtifactType artifactType, ContentHandle content, EditableArtifactMetaDataDto metaData) - throws ArtifactNotFoundException, RegistryStorageException { - if (isApplying()) { - return super.updateArtifactWithMetadata(artifactId, artifactType, content, metaData); - } else { - return (CompletionStage) journalAndWait(artifactId, "updateArtifactWithMetadata", artifactId, artifactType, content, metaData); + public void deleteGlobalRule(RuleType rule) throws RuleNotFoundException, RegistryStorageException { + try { + sqlStorage.getGlobalRule(rule); + } catch (RuleNotFoundException e) { + throw e; } + UUID reqId = ConcurrentUtil.get(submitter.submitRule(Str.ActionType.DELETE, GLOBAL_RULES_ID, rule, null)); + coordinator.waitForResponse(reqId); } } diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlStorageProvider.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlStorageProvider.java new file mode 100644 index 0000000000..fe25e02606 --- /dev/null +++ b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/KafkaSqlStorageProvider.java @@ -0,0 +1,20 @@ +package io.apicurio.registry.storage.impl.ksql; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import io.apicurio.registry.storage.RegistryStorage; +import io.apicurio.registry.storage.RegistryStorageProvider; + +@ApplicationScoped +public class KafkaSqlStorageProvider implements RegistryStorageProvider { + + @Inject + KafkaSqlRegistryStorage storage; + + @Override + public RegistryStorage storage() { + return storage; + } + +} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/KafkaSQLSink.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/KafkaSQLSink.java new file mode 100644 index 0000000000..1924bf6099 --- /dev/null +++ b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/KafkaSQLSink.java @@ -0,0 +1,233 @@ +package io.apicurio.registry.storage.impl.ksql.sql; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.inject.Named; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.apicurio.registry.content.ContentHandle; +import io.apicurio.registry.storage.EditableArtifactMetaDataDto; +import io.apicurio.registry.storage.RuleConfigurationDto; +import io.apicurio.registry.storage.impl.ksql.KafkaSqlCoordinator; +import io.apicurio.registry.storage.impl.ksql.KafkaSqlRegistryStorage; +import io.apicurio.registry.storage.proto.Str; +import io.apicurio.registry.storage.proto.Str.ActionType; +import io.apicurio.registry.storage.proto.Str.ArtifactValue; +import io.apicurio.registry.storage.proto.Str.MetaDataValue; +import io.apicurio.registry.storage.proto.Str.RuleValue; +import io.apicurio.registry.storage.proto.Str.StorageValue; +import io.apicurio.registry.storage.proto.Str.StorageValue.ValueCase; +import io.apicurio.registry.types.ArtifactState; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.types.RegistryException; +import io.apicurio.registry.types.RuleType; + +//ArtifactValue artifact = 5; +//MetaDataValue metadata = 6; +//RuleValue rule = 7; +//SnapshotValue snapshot = 8; +//ArtifactState state = 9; + +/** + * @author Fabian Martinez + */ +@ApplicationScoped +public class KafkaSQLSink { + + private final Logger log = LoggerFactory.getLogger(this.getClass().getName()); + + @Inject + KafkaSqlCoordinator coordinator; + + @Inject + @Named("SQLRegistryStorage") + SQLRegistryStorage sqlStorage; + + public void processStorageAction(UUID requestId, String artifactId, Str.StorageValue storageAction) { + try { + Object result = internalProcessStorageAction(artifactId, storageAction); + coordinator.notifyResponse(requestId, result); + } catch (RegistryException e ) { + coordinator.notifyResponse(requestId, e); + } catch (RuntimeException e ) { + coordinator.notifyResponse(requestId, new RegistryException(e)); + } + } + + public Object internalProcessStorageAction(String artifactId, Str.StorageValue storageAction) { + ValueCase valueCase = storageAction.getValueCase(); + switch (valueCase) { + case ARTIFACT: + return handleArtifact(artifactId, storageAction); + case METADATA: + return handleMetadata(artifactId, storageAction); + case RULE: + return handleRule(artifactId, storageAction); + case STATE: + return handleArtifactState(artifactId, storageAction); + default : + log.warn("Unrecognized value artifact: %s", artifactId); + return null; + } + } + + private Object handleArtifact(String artifactId, StorageValue storageAction) { + ArtifactValue artifactValue = storageAction.getArtifact(); + + ArtifactType artifactType = ArtifactType.values()[artifactValue.getArtifactType()]; + + + ActionType actionType = storageAction.getType(); + switch (actionType) { + case CREATE: + return sqlStorage.createArtifact(artifactId, artifactType, ContentHandle.create(artifactValue.getContent().toByteArray())); + case UPDATE: + return sqlStorage.updateArtifact(artifactId, artifactType, ContentHandle.create(artifactValue.getContent().toByteArray())); + case DELETE: + return sqlStorage.deleteArtifact(artifactId); + case READ: + log.warn("Read storage action, nothing to do. artifact: %s", artifactId); + break; + case UNDEFINED: + case UNRECOGNIZED: + log.warn("Unrecognized storage action artifact: %s", artifactId); + break; + } + return null; + } + + private Object handleMetadata(String artifactId, StorageValue storageAction) { + MetaDataValue metadata = storageAction.getMetadata(); + + long artifactVersion = storageAction.getVersion(); + + ActionType actionType = storageAction.getType(); + switch (actionType) { + case CREATE: + log.warn("Create metadata storage action, this action does not exist. nothing to do. artifact: %s", artifactId); + break; + case UPDATE: + List labels = Optional.ofNullable(metadata.getLabels()) + .map(l -> l.split(",")) + .map(l -> Stream.of(l)) + .orElseGet(Stream::empty) + .collect(Collectors.toList()); + EditableArtifactMetaDataDto editablemeta = new EditableArtifactMetaDataDto(metadata.getName(), metadata.getDescription(), labels, metadata.getPropertiesMap()); + if (artifactVersion == -1) { + sqlStorage.updateArtifactMetaData(artifactId, editablemeta); + } else { + sqlStorage.updateArtifactVersionMetaData(artifactId, artifactVersion, editablemeta); + } + break; + case DELETE: + if (artifactVersion == -1) { + log.warn("Delete artifact metadata storage action, this action does not exist. nothing to do. artifact: %s", artifactId); + } else { + sqlStorage.deleteArtifactVersionMetaData(artifactId, artifactVersion); + } + break; + case READ: + log.warn("Read storage action, nothing to do. artifact: %s", artifactId); + break; + case UNDEFINED: + case UNRECOGNIZED: + log.warn("Unrecognized storage action artifact: %s", artifactId); + break; + } + return null; + } + + private Object handleArtifactState(String artifactId, StorageValue storageAction) { + Str.ArtifactState state = storageAction.getState(); + ArtifactState newState = ArtifactState.valueOf(state.name()); + + long version = storageAction.getVersion(); + + ActionType actionType = storageAction.getType(); + switch (actionType) { + case CREATE: + log.warn("Create artifact state storage action, this action does not exist. nothing to do. artifact: %s", artifactId); + break; + case UPDATE: + sqlStorage.updateArtifactState(artifactId, newState, Long.valueOf(version).intValue()); + break; + case DELETE: + log.warn("Delete artifact state storage action, this action does not exist. nothing to do. artifact: %s", artifactId); + break; + case READ: + log.warn("Read storage action, nothing to do. artifact: %s", artifactId); + break; + case UNDEFINED: + case UNRECOGNIZED: + log.warn("Unrecognized storage action artifact: %s", artifactId); + break; + } + return null; + } + + private Object handleRule(String artifactId, StorageValue storageAction) { + RuleValue ruleValue = storageAction.getRule(); + + Str.RuleType rtv = ruleValue.getType(); + RuleType rule = (rtv != null && rtv != Str.RuleType.__NONE) ? RuleType.valueOf(rtv.name()) : null; + + RuleConfigurationDto config = null; + if (ruleValue.getConfiguration() != null) { + config = new RuleConfigurationDto(ruleValue.getConfiguration()); + } + + ActionType actionType = storageAction.getType(); + switch (actionType) { + case CREATE: + if (isGlobalRules(artifactId)) { + sqlStorage.createGlobalRule(rule, config); + } else { + return sqlStorage.createArtifactRuleAsync(artifactId, rule, config); + } + break; + case UPDATE: + if (isGlobalRules(artifactId)) { + sqlStorage.updateGlobalRule(rule, config); + } else { + sqlStorage.updateArtifactRule(artifactId, rule, config); + } + break; + case DELETE: + if (isGlobalRules(artifactId)) { + if (rule == null) { + sqlStorage.deleteGlobalRules(); + } else { + sqlStorage.deleteGlobalRule(rule); + } + } else { + if (rule == null) { + sqlStorage.deleteArtifactRules(artifactId); + } else { + sqlStorage.deleteArtifactRule(artifactId, rule); + } + } + break; + case READ: + log.warn("Read storage action, nothing to do. artifact: %s", artifactId); + break; + case UNDEFINED: + case UNRECOGNIZED: + log.warn("Unrecognized storage action artifact: %s", artifactId); + break; + } + return null; + } + + private boolean isGlobalRules(String artifactId) { + return artifactId.equals(KafkaSqlRegistryStorage.GLOBAL_RULES_ID); + } + +} diff --git a/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/SQLRegistryStorage.java b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/SQLRegistryStorage.java new file mode 100644 index 0000000000..5747b6e43c --- /dev/null +++ b/storage/ksql/src/main/java/io/apicurio/registry/storage/impl/ksql/sql/SQLRegistryStorage.java @@ -0,0 +1,22 @@ +package io.apicurio.registry.storage.impl.ksql.sql; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage; + +@ApplicationScoped +@Named("SQLRegistryStorage") +public class SQLRegistryStorage extends AbstractSqlRegistryStorage{ + + private static final Logger log = LoggerFactory.getLogger(SQLRegistryStorage.class); + + @PostConstruct + void onConstruct() { + log.info("Using internal SQL storage."); + } + +} diff --git a/storage/ksql/src/main/resources/overlay.properties b/storage/ksql/src/main/resources/overlay.properties index 45e70145b0..55f116c864 100644 --- a/storage/ksql/src/main/resources/overlay.properties +++ b/storage/ksql/src/main/resources/overlay.properties @@ -13,7 +13,7 @@ %prod.quarkus.datasource.username=${REGISTRY_DATASOURCE_USERNAME:sa} %prod.quarkus.datasource.password=${REGISTRY_DATASOURCE_PASSWORD:sa} -%prod.registry.ksql.bootstrap.servers=localhost:9092 +%prod.registry.ksql.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} %prod.registry.ksql.topic=ksql-journal %prod.registry.ksql.consumer.startupLag=100 %prod.registry.ksql.consumer.poll.timeout=100 \ No newline at end of file diff --git a/storage/streams/src/main/java/io/apicurio/registry/streams/StreamsRegistryStorage.java b/storage/streams/src/main/java/io/apicurio/registry/streams/StreamsRegistryStorage.java index 1b129b1eb9..f0bb30ba3b 100644 --- a/storage/streams/src/main/java/io/apicurio/registry/streams/StreamsRegistryStorage.java +++ b/storage/streams/src/main/java/io/apicurio/registry/streams/StreamsRegistryStorage.java @@ -142,7 +142,7 @@ public class StreamsRegistryStorage extends AbstractRegistryStorage { @Inject ArtifactTypeUtilProviderFactory factory; - private final Submitter submitter = new Submitter(this::send); + private final Submitter submitter = new Submitter<>(this::send); private CompletableFuture send(Str.StorageValue value) { ProducerRecord record = new ProducerRecord<>( diff --git a/tests/pom.xml b/tests/pom.xml index c42ab09a50..da0470d8e9 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -422,6 +422,13 @@ + + ksql + + ksql + + + infinispan diff --git a/tests/src/main/java/io/apicurio/tests/KafkaFacade.java b/tests/src/main/java/io/apicurio/tests/KafkaFacade.java index 955eb2eb14..43cc87dce2 100644 --- a/tests/src/main/java/io/apicurio/tests/KafkaFacade.java +++ b/tests/src/main/java/io/apicurio/tests/KafkaFacade.java @@ -63,7 +63,7 @@ public String bootstrapServers() { public void startIfNeeded() { if (!TestUtils.isExternalRegistry() && - (RegistryUtils.REGISTRY_STORAGE == RegistryStorageType.kafka || RegistryUtils.REGISTRY_STORAGE == RegistryStorageType.streams) && + (RegistryUtils.REGISTRY_STORAGE == RegistryStorageType.kafka || RegistryUtils.REGISTRY_STORAGE == RegistryStorageType.streams || RegistryUtils.REGISTRY_STORAGE == RegistryStorageType.ksql) && kafkaContainer != null) { LOGGER.info("Skipping deployment of kafka, because it's already deployed as registry storage"); } else { diff --git a/tests/src/main/java/io/apicurio/tests/RegistryFacade.java b/tests/src/main/java/io/apicurio/tests/RegistryFacade.java index 765f5833fa..3ed1c41bed 100644 --- a/tests/src/main/java/io/apicurio/tests/RegistryFacade.java +++ b/tests/src/main/java/io/apicurio/tests/RegistryFacade.java @@ -103,6 +103,9 @@ public void start() throws Exception { case sql: setupSQLStorage(appEnv); break; + case ksql: + setupKafkaStorage(appEnv); + break; } runRegistry(path, appEnv); diff --git a/tests/src/main/java/io/apicurio/tests/RegistryStorageType.java b/tests/src/main/java/io/apicurio/tests/RegistryStorageType.java index 214667d2f9..a45c4e10cc 100644 --- a/tests/src/main/java/io/apicurio/tests/RegistryStorageType.java +++ b/tests/src/main/java/io/apicurio/tests/RegistryStorageType.java @@ -20,5 +20,6 @@ public enum RegistryStorageType { kafka, streams, sql, - infinispan; + infinispan, + ksql; } diff --git a/utils/kafka/src/main/java/io/apicurio/registry/utils/kafka/Submitter.java b/utils/kafka/src/main/java/io/apicurio/registry/utils/kafka/Submitter.java index 150994cb89..b68fee432c 100644 --- a/utils/kafka/src/main/java/io/apicurio/registry/utils/kafka/Submitter.java +++ b/utils/kafka/src/main/java/io/apicurio/registry/utils/kafka/Submitter.java @@ -31,16 +31,15 @@ /** * @author Ales Justin */ -public class Submitter { - private Function> submitFn; +public class Submitter { + private Function> submitFn; - public Submitter(Function> submitFn) { + public Submitter(Function> submitFn) { this.submitFn = submitFn; } - private CompletableFuture submit(Str.StorageValue value) { - //noinspection unchecked - return (CompletableFuture) submitFn.apply(value); + private CompletableFuture submit(Str.StorageValue value) { + return submitFn.apply(value); } private Str.StorageValue.Builder getRVBuilder(Str.ValueType vt, Str.ActionType actionType, String artifactId, long version) { @@ -55,7 +54,7 @@ private Str.StorageValue.Builder getRVBuilder(Str.ValueType vt, Str.ActionType a return builder; } - public CompletableFuture submitArtifact(Str.ActionType actionType, String artifactId, long version, ArtifactType artifactType, byte[] content) { + public CompletableFuture submitArtifact(Str.ActionType actionType, String artifactId, long version, ArtifactType artifactType, byte[] content) { Str.ArtifactValue.Builder builder = Str.ArtifactValue.newBuilder(); if (artifactType != null) { builder.setArtifactType(artifactType.ordinal()); @@ -68,7 +67,7 @@ public CompletableFuture submitArtifact(Str.ActionType actionType, String return submit(rvb.build()); } - public CompletableFuture submitMetadata(Str.ActionType actionType, String artifactId, long version, String name, String description, List labels, Map properties) { + public CompletableFuture submitMetadata(Str.ActionType actionType, String artifactId, long version, String name, String description, List labels, Map properties) { Str.MetaDataValue.Builder builder = Str.MetaDataValue.newBuilder(); if (name != null) { builder.setName(name); @@ -89,7 +88,7 @@ public CompletableFuture submitMetadata(Str.ActionType actionType, String return submit(rvb.build()); } - public CompletableFuture submitRule(Str.ActionType actionType, String artifactId, RuleType type, String configuration) { + public CompletableFuture submitRule(Str.ActionType actionType, String artifactId, RuleType type, String configuration) { Str.RuleValue.Builder builder = Str.RuleValue.newBuilder(); if (type != null) { builder.setType(Str.RuleType.valueOf(type.name())); @@ -102,13 +101,13 @@ public CompletableFuture submitRule(Str.ActionType actionType, String art return submit(rvb.build()); } - public CompletableFuture submitSnapshot(long timestamp) { + public CompletableFuture submitSnapshot(long timestamp) { Str.SnapshotValue.Builder builder = Str.SnapshotValue.newBuilder().setTimestamp(timestamp); Str.StorageValue.Builder rvb = getRVBuilder(Str.ValueType.SNAPSHOT, Str.ActionType.CREATE, null, -1).setSnapshot(builder); return submit(rvb.build()); } - public CompletableFuture submitState(String artifactId, Long version, ArtifactState state) { + public CompletableFuture submitState(String artifactId, Long version, ArtifactState state) { Str.StorageValue.Builder rvb = getRVBuilder(Str.ValueType.STATE, Str.ActionType.UPDATE, artifactId, version != null ? version : -1L)