Skip to content

Commit

Permalink
Ensure that any event is sent only once
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Oct 9, 2024
1 parent b20387a commit afb1b36
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 138 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.apicurio.registry.storage.impl.kafkasql;

import io.apicurio.registry.storage.dto.OutboxEvent;
import io.apicurio.registry.utils.ConcurrentUtil;
import io.apicurio.registry.utils.kafka.ProducerActions;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Collections;

@ApplicationScoped
public class KafkaSqlEventsProcessor {

@Inject
KafkaSqlConfiguration configuration;

@Inject
@Named("KafkaSqlEventsProducer")
ProducerActions<String, String> eventsProducer;

public void processEvent(@Observes KafkaSqlOutboxEvent event) {
OutboxEvent outboxEvent = event.getOutboxEvent();
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.eventsTopic(), 0,
outboxEvent.getAggregateId(), outboxEvent.getPayload().toString(), Collections.emptyList());
ConcurrentUtil.get(eventsProducer.apply(record));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.apicurio.registry.storage.impl.kafkasql;

import io.apicurio.registry.storage.dto.OutboxEvent;

public class KafkaSqlOutboxEvent {

private final OutboxEvent outboxEvent;

private KafkaSqlOutboxEvent(OutboxEvent outboxEvent) {
this.outboxEvent = outboxEvent;
}

public static KafkaSqlOutboxEvent of(OutboxEvent outboxEvent) {
return new KafkaSqlOutboxEvent(outboxEvent);
}

public OutboxEvent getOutboxEvent() {
return outboxEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,14 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas
@Named("KafkaSqlSnapshotsProducer")
ProducerActions<String, String> snapshotsProducer;

@Inject
@Named("KafkaSqlEventsProducer")
ProducerActions<String, String> eventsProducer;

@Inject
KafkaSqlSubmitter submitter;

@Inject
Event<StorageEvent> storageEvent;

@Inject
Event<OutboxEvent> outboxEvent;
Event<KafkaSqlOutboxEvent> outboxEvent;

private volatile boolean bootstrapped = false;
private volatile boolean stopped = true;
Expand Down Expand Up @@ -401,7 +397,8 @@ public Pair<ArtifactMetaDataDto, ArtifactVersionMetaDataDto> createArtifact(Stri
Pair<ArtifactMetaDataDto, ArtifactVersionMetaDataDto> createdArtifact = (Pair<ArtifactMetaDataDto, ArtifactVersionMetaDataDto>) coordinator
.waitForResponse(uuid);

outboxEvent.fire(ArtifactCreated.of(createdArtifact.getLeft()));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactCreated.of(createdArtifact.getLeft())));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionCreated.of(createdArtifact.getRight())));
return createdArtifact;
}

Expand All @@ -415,7 +412,7 @@ public List<String> deleteArtifact(String groupId, String artifactId)
var message = new DeleteArtifact2Message(groupId, artifactId);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
List<String> versions = (List<String>) coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactDeleted.of(groupId, artifactId));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactDeleted.of(groupId, artifactId)));
return versions;
}

Expand All @@ -441,7 +438,7 @@ public ArtifactVersionMetaDataDto createArtifactVersion(String groupId, String a
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
ArtifactVersionMetaDataDto versionMetaDataDto = (ArtifactVersionMetaDataDto) coordinator
.waitForResponse(uuid);
outboxEvent.fire(ArtifactVersionCreated.of(versionMetaDataDto));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionCreated.of(versionMetaDataDto)));
return versionMetaDataDto;
}

Expand All @@ -455,7 +452,7 @@ public void updateArtifactMetaData(String groupId, String artifactId,
var message = new UpdateArtifactMetaData3Message(groupId, artifactId, metaData);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactMetadataUpdated.of(groupId, artifactId, metaData));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactMetadataUpdated.of(groupId, artifactId, metaData)));
}

@Override
Expand All @@ -464,7 +461,8 @@ public void createArtifactRule(String groupId, String artifactId, RuleType rule,
var message = new CreateArtifactRule4Message(groupId, artifactId, rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactRuleConfigured.of(groupId, artifactId, rule, config));
outboxEvent
.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config)));
}

/**
Expand All @@ -491,7 +489,8 @@ public void updateArtifactRule(String groupId, String artifactId, RuleType rule,
var message = new UpdateArtifactRule4Message(groupId, artifactId, rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactRuleConfigured.of(groupId, artifactId, rule, config));
outboxEvent
.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule, config)));
}

/**
Expand All @@ -506,12 +505,15 @@ public void deleteArtifactRule(String groupId, String artifactId, RuleType rule)
coordinator.waitForResponse(uuid);

switch (rule) {
case VALIDITY -> outboxEvent.fire(ArtifactRuleConfigured.of(groupId, artifactId, rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()));
case COMPATIBILITY -> outboxEvent.fire(ArtifactRuleConfigured.of(groupId, artifactId, rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()));
case INTEGRITY -> outboxEvent.fire(ArtifactRuleConfigured.of(groupId, artifactId, rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()));
case VALIDITY ->
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build())));
case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId,
artifactId, rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build())));
case INTEGRITY ->
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactRuleConfigured.of(groupId, artifactId, rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build())));
}
}

Expand All @@ -521,7 +523,7 @@ public void createGroupRule(String groupId, RuleType rule, RuleConfigurationDto
var message = new CreateGroupRule3Message(groupId, rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(GroupRuleConfigured.of(groupId, rule, config));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config)));
}

@Override
Expand All @@ -530,7 +532,7 @@ public void updateGroupRule(String groupId, RuleType rule, RuleConfigurationDto
var message = new UpdateGroupRule3Message(groupId, rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(GroupRuleConfigured.of(groupId, rule, config));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule, config)));
}

@Override
Expand All @@ -539,12 +541,13 @@ public void deleteGroupRule(String groupId, RuleType rule) throws RegistryStorag
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
switch (rule) {
case VALIDITY -> outboxEvent.fire(GroupRuleConfigured.of(groupId, rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()));
case COMPATIBILITY -> outboxEvent.fire(GroupRuleConfigured.of(groupId, rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()));
case INTEGRITY -> outboxEvent.fire(GroupRuleConfigured.of(groupId, rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()));
case VALIDITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build())));
case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId,
rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build())));
case INTEGRITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupRuleConfigured.of(groupId, rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build())));
}
}

Expand All @@ -565,7 +568,7 @@ public void deleteArtifactVersion(String groupId, String artifactId, String vers
var message = new DeleteArtifactVersion3Message(groupId, artifactId, version);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactVersionDeleted.of(groupId, artifactId, version));
outboxEvent.fire(KafkaSqlOutboxEvent.of(ArtifactVersionDeleted.of(groupId, artifactId, version)));
}

/**
Expand All @@ -579,7 +582,8 @@ public void updateArtifactVersionMetaData(String groupId, String artifactId, Str
var message = new UpdateArtifactVersionMetaData4Message(groupId, artifactId, version, metaData);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(ArtifactVersionMetadataUpdated.of(groupId, artifactId, version, metaData));
outboxEvent.fire(KafkaSqlOutboxEvent
.of(ArtifactVersionMetadataUpdated.of(groupId, artifactId, version, metaData)));
}

/**
Expand All @@ -592,7 +596,7 @@ public void createGlobalRule(RuleType rule, RuleConfigurationDto config)
var message = new CreateGlobalRule2Message(rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(GlobalRuleConfigured.of(rule, config));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config)));
}

/**
Expand All @@ -615,7 +619,7 @@ public void updateGlobalRule(RuleType rule, RuleConfigurationDto config)
var message = new UpdateGlobalRule2Message(rule, config);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);
outboxEvent.fire(GlobalRuleConfigured.of(rule, config));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule, config)));
}

/**
Expand All @@ -628,12 +632,12 @@ public void deleteGlobalRule(RuleType rule) throws RuleNotFoundException, Regist
coordinator.waitForResponse(uuid);

switch (rule) {
case VALIDITY -> outboxEvent.fire(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build()));
case COMPATIBILITY -> outboxEvent.fire(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build()));
case INTEGRITY -> outboxEvent.fire(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build()));
case VALIDITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(ValidityLevel.NONE.name()).build())));
case COMPATIBILITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(CompatibilityLevel.NONE.name()).build())));
case INTEGRITY -> outboxEvent.fire(KafkaSqlOutboxEvent.of(GlobalRuleConfigured.of(rule,
RuleConfigurationDto.builder().configuration(IntegrityLevel.NONE.name()).build())));
}
}

Expand All @@ -647,7 +651,7 @@ public void createGroup(GroupMetaDataDto group)
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);

outboxEvent.fire(GroupCreated.of(group));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupCreated.of(group)));
}

/**
Expand All @@ -659,7 +663,7 @@ public void deleteGroup(String groupId) throws GroupNotFoundException, RegistryS
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);

outboxEvent.fire(GroupDeleted.of(groupId));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupDeleted.of(groupId)));
}

/**
Expand All @@ -671,8 +675,7 @@ public void updateGroupMetaData(String groupId, EditableGroupMetaDataDto dto) {
var message = new UpdateGroupMetaData2Message(groupId, dto);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
coordinator.waitForResponse(uuid);

outboxEvent.fire(GroupMetadataUpdated.of(groupId, dto));
outboxEvent.fire(KafkaSqlOutboxEvent.of(GroupMetadataUpdated.of(groupId, dto)));
}

/**
Expand Down Expand Up @@ -1068,9 +1071,7 @@ public String createSnapshot(String snapshotLocation) throws RegistryStorageExce

@Override
public String createEvent(OutboxEvent event) {
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.eventsTopic(), 0,
event.getAggregateId(), event.getPayload().toString(), Collections.emptyList());
ConcurrentUtil.get(eventsProducer.apply(record));
// No op, the event is created by the event processor.
return event.getId();
}
}
Loading

0 comments on commit afb1b36

Please sign in to comment.