Skip to content

Commit

Permalink
GH-452 - Introduce EventPublicationRepository.findCompletedPublicatio…
Browse files Browse the repository at this point in the history
…ns().

The default EventPublicationRegistry previously erroneously called find*In*completePublications(), i.e. returned incomplete publications when it was supposed to look up the completed ones. To be able to do the latter we need to extend EventPublicationRepository and introduce a query execution for all completed publications.

As we'd normally only chance API in major versions, but the bugfix needing to go into a bugfix release, the new method is introduced as default method rejecting the execution in case a currently existing repository implementation does not implement it. This allows us to adapt the implementations we ship to support the bugfix, but will require other implementations to ship an adapted version. This means we can ship a release that's not breaking assuming one of the official store implementations is used.
  • Loading branch information
odrotbohm committed Jan 16, 2024
1 parent e3dbd02 commit 6afc00c
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.modulith.events.CompletedEventPublications;
import org.springframework.modulith.events.EventPublication;
import org.springframework.transaction.annotation.Propagation;
Expand Down Expand Up @@ -132,7 +131,7 @@ public void deleteCompletedPublicationsOlderThan(Duration duration) {
*/
@Override
public Collection<? extends TargetEventPublication> findAll() {
return findIncompletePublications();
return events.findCompletedPublications();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ default void markCompleted(TargetEventPublication publication, Instant completio
Optional<TargetEventPublication> findIncompletePublicationsByEventAndTargetIdentifier( //
Object event, PublicationTargetIdentifier targetIdentifier);

/**
* Returns all completed event publications currently found in the system.
*
* @return will never be {@literal null}.
* @since 1.1.2
*/
default List<TargetEventPublication> findCompletedPublications() {
throw new UnsupportedOperationException(
"Your store implementation does not support looking up completed publications!");
}

/**
* Deletes all publications with the given identifiers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.lang.Nullable;
import org.springframework.modulith.events.core.EventPublicationRepository;
import org.springframework.modulith.events.core.EventSerializer;
Expand All @@ -56,6 +54,13 @@ INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SE
VALUES (?, ?, ?, ?, ?)
""";

private static final String SQL_STATEMENT_FIND_COMPLETED = """
SELECT ID, COMPLETION_DATE, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SERIALIZED_EVENT
FROM EVENT_PUBLICATION
WHERE COMPLETION_DATE IS NOT NULL
ORDER BY PUBLICATION_DATE ASC
""";

private static final String SQL_STATEMENT_FIND_UNCOMPLETED = """
SELECT ID, COMPLETION_DATE, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SERIALIZED_EVENT
FROM EVENT_PUBLICATION
Expand Down Expand Up @@ -189,6 +194,18 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
return result == null ? Optional.empty() : result.stream().findFirst();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
*/
@Override
public List<TargetEventPublication> findCompletedPublications() {

var result = operations.query(SQL_STATEMENT_FIND_COMPLETED, this::resultSetToPublications);

return result == null ? Collections.emptyList() : result;
}

@Override
@Transactional(readOnly = true)
@SuppressWarnings("null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,21 @@ void findsPublicationsOlderThanReference() throws Exception {
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
}

@Test // GH-452
void findsCompletedPublications() {

var event = new TestEvent("first");
var publication = createPublication(event);

repository.markCompleted(publication, Instant.now());

assertThat(repository.findCompletedPublications())
.hasSize(1)
.element(0)
.extracting(TargetEventPublication::getEvent)
.isEqualTo(event);
}

private TargetEventPublication createPublication(Object event) {

var token = event.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
and p.completionDate is null
""";

private static String COMPLETE = """
select p
from JpaEventPublication p
where
p.completionDate is not null
order by
p.publicationDate asc
""";

private static String INCOMPLETE = """
select p
from JpaEventPublication p
Expand Down Expand Up @@ -186,6 +195,20 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
.map(this::entityToDomain);
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
*/
@Override
public List<TargetEventPublication> findCompletedPublications() {

return entityManager.createQuery(COMPLETE, JpaEventPublication.class)
.getResultList()
.stream()
.map(this::entityToDomain)
.toList();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,21 @@ void findsPublicationsOlderThanReference() throws Exception {
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
}

@Test // GH-452
void findsCompletedPublications() {

var event = new TestEvent("first");
var publication = createPublication(event);

repository.markCompleted(publication, Instant.now());

assertThat(repository.findCompletedPublications())
.hasSize(1)
.element(0)
.extracting(TargetEventPublication::getEvent)
.isEqualTo(event);
}

private TargetEventPublication createPublication(Object event) {

var token = event.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
*/
@Override
public List<TargetEventPublication> findCompletedPublications() {
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)));
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,21 @@ void findsPublicationsOlderThanReference() throws Exception {
.element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier());
}

@Test // GH-452
void findsCompletedPublications() {

var event = new TestEvent("first");
var publication = createPublication(event);

repository.markCompleted(publication, Instant.now());

assertThat(repository.findCompletedPublications())
.hasSize(1)
.element(0)
.extracting(TargetEventPublication::getEvent)
.isEqualTo(event);
}

private TargetEventPublication createPublication(Object event) {
return createPublication(event, TARGET_IDENTIFIER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -45,6 +46,7 @@
* A {@link Neo4jClient} based implementation of {@link EventPublicationRepository}.
*
* @author Gerrit Meier
* @author Oliver Drotbohm
* @since 1.1
*/
@Transactional
Expand Down Expand Up @@ -114,6 +116,12 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
.build();

private static final ResultStatement ALL_COMPLETED_STATEMENT = Cypher.match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNotNull())
.returning(EVENT_PUBLICATION_NODE)
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
.build();

private final Neo4jClient neo4jClient;
private final Renderer renderer;
private final EventSerializer eventSerializer;
Expand Down Expand Up @@ -225,6 +233,19 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
.one();
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#findCompletedPublications()
*/
@Override
public List<TargetEventPublication> findCompletedPublications() {

return new ArrayList<>(neo4jClient.query(renderer.render(ALL_COMPLETED_STATEMENT))
.fetchAs(TargetEventPublication.class)
.mappedBy(this::mapRecordToPublication)
.all());
}

/*
* (non-Javadoc)
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,31 @@ void deleteCompletedPublicationsBefore() throws Exception {
}
}

@Test // GH-452
void findsCompletedPublications() {

var event = new TestEvent("first");
var publication = createPublication(event);

repository.markCompleted(publication, Instant.now());

assertThat(repository.findCompletedPublications())
.hasSize(1)
.element(0)
.extracting(TargetEventPublication::getEvent)
.isEqualTo(event);
}

private TargetEventPublication createPublication(Object event) {

var token = event.toString();

doReturn(token).when(eventSerializer).serialize(event);
doReturn(event).when(eventSerializer).deserialize(token, event.getClass());

return repository.create(TargetEventPublication.of(event, TARGET_IDENTIFIER));
}

@Value
static class TestEvent {
String eventId;
Expand Down

0 comments on commit 6afc00c

Please sign in to comment.