Skip to content

Commit

Permalink
Merge PR branches #143 + #141 for release candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
ePaul committed Nov 26, 2021
2 parents 9e587fb + 908a03a commit c3ffd72
Show file tree
Hide file tree
Showing 12 changed files with 485 additions and 127 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ If you do not use the STUPS Tokens library, you can implement token retrieval yo

The typical use case for this library is to publish events like creating or updating of some objects.

In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java) service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.
In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.

To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.

You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional).

Expand Down Expand Up @@ -203,6 +206,34 @@ For business events, you have just two parameters, the **eventType** and the eve
You usually should fire those also in the same transaction as you are storing the results of the
process step the event is reporting.

Example of using `fireCreateEvents`:

```java
@Service
public class SomeYourService {
@Autowired
private EventLogWriter eventLogWriter;
@Autowired
private WarehouseRepository repository;
@Transactional
public void createObjects(Collections<Warehouse> data) {
// here we store an object in a database table
repository.saveAll(data);
// then we group the data by dataType
Map<String, Collection<Object>> groupedData = Map.of("wholesale:warehouse", data);
// and then in the same transaction we save the events about the object creation
eventLogWriter.fireCreateEvents("wholesale.warehouse-change-event", groupedData);
}
}
```



### Event snapshots (optional)

Expand Down
4 changes: 2 additions & 2 deletions nakadi-producer-loadtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>21.0.0</version>
<version>21.0.0-RC1</version>
</parent>

<dependencies>
Expand Down Expand Up @@ -61,4 +61,4 @@
</dependency>
</dependencies>

</project>
</project>
4 changes: 2 additions & 2 deletions nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>21.0.0</version>
<version>21.0.0-RC1</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down Expand Up @@ -203,4 +203,4 @@
</license>
</licenses>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;

public class EventLogRepositoryImpl implements EventLogRepository {
private NamedParameterJdbcTemplate jdbcTemplate;
Expand Down Expand Up @@ -55,27 +55,34 @@ public void delete(EventLog eventLog) {

@Override
public void persist(EventLog eventLog) {
Timestamp now = toSqlTimestamp(Instant.now());
MapSqlParameterSource namedParameterMap = new MapSqlParameterSource();
namedParameterMap.addValue("eventType", eventLog.getEventType());
namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData());
namedParameterMap.addValue("flowId", eventLog.getFlowId());
namedParameterMap.addValue("created", now);
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
namedParameterMap,
generatedKeyHolder
);
persist(Collections.singleton(eventLog));
}

@Override
public void persist(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog -> {
Timestamp now = toSqlTimestamp(Instant.now());
MapSqlParameterSource namedParameterMap = new MapSqlParameterSource();
namedParameterMap.addValue("eventType", eventLog.getEventType());
namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData());
namedParameterMap.addValue("flowId", eventLog.getFlowId());
namedParameterMap.addValue("created", now);
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
return namedParameterMap;
})
.toArray(MapSqlParameterSource[]::new);

eventLog.setId((Integer) generatedKeyHolder.getKeys().get("id"));
jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
namedParameterMaps
);
}

private Timestamp toSqlTimestamp(Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT;

@Transactional
Expand All @@ -16,11 +17,16 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@Autowired
private EventLogRepository eventLogRepository;

@Autowired
private JdbcTemplate jdbcTemplate;

private static final String WAREHOUSE_EVENT_BODY_DATA = (
"{'self':'http://WAREHOUSE_DOMAIN',"
"{"
+ " 'self':'http://WAREHOUSE_DOMAIN',"
+ " 'code':'WH-DE-EF',"
+ " 'name':'Erfurt',"
+ " 'address':{'name':'Zalando Logistics SE & Co.KG',"
+ " 'address':{"
+ " 'name':'Zalando Logistics SE & Co.KG',"
+ " 'street':'In der Hochstedter Ecke 1',"
+ " 'city':'Erfurt',"
+ " 'zip':'99098',"
Expand All @@ -44,11 +50,13 @@ public void setUp() throws Exception {
.eventType(WAREHOUSE_EVENT_TYPE)
.flowId("FLOW_ID").build();
eventLogRepository.persist(eventLog);
id = eventLog.getId();
}

@Test
public void findEventRepositoryId() {
Integer id = jdbcTemplate.queryForObject(
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
Integer.class);
final EventLog eventLog = eventLogRepository.findOne(id);
compareWithPersistedEvent(eventLog);
}
Expand Down
4 changes: 2 additions & 2 deletions nakadi-producer-starter-spring-boot-2-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>21.0.0</version>
<version>21.0.0-RC1</version>
</parent>

<dependencies>
Expand Down Expand Up @@ -92,4 +92,4 @@
</plugins>
</build>

</project>
</project>
4 changes: 2 additions & 2 deletions nakadi-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>21.0.0</version>
<version>21.0.0-RC1</version>
</parent>

<artifactId>nakadi-producer</artifactId>
Expand Down Expand Up @@ -176,4 +176,4 @@
</license>
</licenses>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.zalando.nakadiproducer.eventlog;

import java.util.Collection;
import java.util.Map;
import javax.transaction.Transactional;

import org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator;
Expand Down Expand Up @@ -42,6 +44,24 @@ public interface EventLogWriter {
@Transactional
void fireCreateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the creation of some resources (objects), see
* {@link #fireCreateEvent(String, String, Object) fireCreateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireCreateEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event about an update of some resource (object).
*
Expand All @@ -62,6 +82,24 @@ public interface EventLogWriter {
@Transactional
void fireUpdateEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the update of some resources (objects), see
* {@link #fireUpdateEvent(String, String, Object) fireUpdateEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireUpdateEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event about the deletion of some resource (object).
*
Expand All @@ -83,6 +121,24 @@ public interface EventLogWriter {
@Transactional
void fireDeleteEvent(String eventType, String dataType, Object data);

/**
* Fires data change events about the deletion of some resources (objects), see
* {@link #fireDeleteEvent(String, String, Object) fireDeleteEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireDeleteEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a data change event with a snapshot of some resource (object).
* <p>
Expand Down Expand Up @@ -115,6 +171,24 @@ public interface EventLogWriter {
@Transactional
void fireSnapshotEvent(String eventType, String dataType, Object data);

/**
* Fires data change events, see {@link #fireSnapshotEvent(String, String, Object)
* fireSnapshotEvent} for more details.
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param dataTypeToData
* the content of the {@code data_type} field of the Nakadi
* event mapped to some POJOs that can be serialized into JSON (required
* parameter). This is meant to be a representation of the
* current state of the resource. It will be used as content of
* the {@code data} field of the Nakadi event.
*/
@Transactional
void fireSnapshotEvents(String eventType, Map<String, Collection<Object>> dataTypeToData);

/**
* Fires a business event, i.e. an event communicating the fact that some
* business process step happened. The payload object will be used as the
Expand All @@ -132,4 +206,19 @@ public interface EventLogWriter {
*/
@Transactional
void fireBusinessEvent(String eventType, Object payload);

/**
* Fires business events, see {@link #fireBusinessEvent(String, Object) fireBusinessEvent} for
* more details
*
* @param eventType
* the Nakadi event type of the event. This is roughly equivalent
* to an event channel or topic.
*
* @param payloads
* some POJOs that can be serialized into JSON (required
* parameter)
*/
@Transactional
void fireBusinessEvents(String eventType, Collection<Object> payloads);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ public interface EventLogRepository {

void persist(EventLog eventLog);

default void persist(Collection<EventLog> eventLogs) {
for (EventLog eventLog : eventLogs) {
persist(eventLog);
}
}

void deleteAll();

EventLog findOne(Integer id);
Expand Down
Loading

0 comments on commit c3ffd72

Please sign in to comment.