Skip to content

Commit

Permalink
Merge pull request #152 from fbrns/issues/112-do-not-lock-all-availab…
Browse files Browse the repository at this point in the history
…le-events

#112 Do not lock all available events
  • Loading branch information
fbrns authored Nov 16, 2022
2 parents 48974d4 + 46982da commit dc01695
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 14 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,15 @@ until the lock expires. The default is currently 600 seconds but might change in
buffer is included. During the last x seconds before the expiration of the lock the events are not considered for
transmission. The default is currently 60 seconds but might change in future releases.

* **lock-size**: Defines the maximum amount of events which are loaded into memory and published in one run
(in one submission per event type). By default, all events are loaded into memory. In future releases, this
property will become mandatory.

```yaml
nakadi-producer:
lock-duration: 600
lock-duration-buffer: 60
lock-size: 5000
```

## Contributing
Expand Down
4 changes: 2 additions & 2 deletions nakadi-producer-loadtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.1</version>
<version>20.4.0</version>
</parent>

<dependencies>
Expand Down Expand Up @@ -62,4 +62,4 @@
</dependency>
</dependencies>

</project>
</project>
4 changes: 2 additions & 2 deletions nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.1</version>
<version>20.4.0</version>
</parent>

<artifactId>nakadi-producer-spring-boot-starter</artifactId>
Expand Down Expand Up @@ -194,4 +194,4 @@
</license>
</licenses>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, Obje
}

@Bean
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate);
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate,
@Value("${nakadi-producer.lock-size:0}") int lockSize) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate, lockSize);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.springframework.jdbc.support.GeneratedKeyHolder;

public class EventLogRepositoryImpl implements EventLogRepository {

private NamedParameterJdbcTemplate jdbcTemplate;
private int lockSize;

public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate) {
public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate, int lockSize) {
this.jdbcTemplate = jdbcTemplate;
this.lockSize = lockSize;
}

@Override
Expand All @@ -37,8 +40,21 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {
namedParameterMap.put("lockId", lockId);
namedParameterMap.put("now", toSqlTimestamp(now));
namedParameterMap.put("lockExpires", toSqlTimestamp(lockExpires));

StringBuilder optionalLockSizeClause = new StringBuilder();
if (lockSize > 0) {
optionalLockSizeClause.append("LIMIT :lockSize");
namedParameterMap.put("lockSize", lockSize);
}

jdbcTemplate.update(
"UPDATE nakadi_events.event_log SET locked_by = :lockId, locked_until = :lockExpires where locked_until is null or locked_until < :now",
"UPDATE nakadi_events.event_log "
+ "SET locked_by = :lockId, locked_until = :lockExpires "
+ "WHERE id IN (SELECT id "
+ " FROM nakadi_events.event_log "
+ " WHERE locked_until IS null OR locked_until < :now "
+ optionalLockSizeClause
+ " FOR UPDATE SKIP LOCKED) ",
namedParameterMap
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.zalando.nakadiproducer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
import org.zalando.nakadiproducer.util.Fixture;

@SpringBootTest(
properties = {"nakadi-producer.lock-size=3"}
)
public class EventLockSizeConfiguredIT extends BaseMockedExternalCommunicationIT {

@Autowired
private EventLogWriter eventLogWriter;

@Autowired
private EventTransmissionService eventTransmissionService;

@Test
public void eventLockSizeIsRespected() {

for (int i = 1; i <= 8; i++) {
eventLogWriter.fireBusinessEvent( "myEventType", Fixture.mockPayload(i, "code123"));
}

assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
assertThat(eventTransmissionService.lockSomeEvents(), hasSize(2));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.zalando.nakadiproducer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
import org.zalando.nakadiproducer.util.Fixture;

public class EventLockSizeDefaultIT extends BaseMockedExternalCommunicationIT {

@Autowired
private EventLogWriter eventLogWriter;

@Autowired
private EventTransmissionService eventTransmissionService;

@Test
public void defaultEventLockSizeIsUsed() {

for (int i = 1; i <= 8; i++) {
eventLogWriter.fireBusinessEvent("myEventType", Fixture.mockPayload(i, "code123"));
}

assertThat(eventTransmissionService.lockSomeEvents(), hasSize(8));
}

}
4 changes: 2 additions & 2 deletions nakadi-producer-starter-spring-boot-2-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.1</version>
<version>20.4.0</version>
</parent>

<dependencies>
Expand Down Expand Up @@ -92,4 +92,4 @@
</plugins>
</build>

</project>
</project>
4 changes: 2 additions & 2 deletions nakadi-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.zalando</groupId>
<artifactId>nakadi-producer-reactor</artifactId>
<version>20.3.1</version>
<version>20.4.0</version>
</parent>

<artifactId>nakadi-producer</artifactId>
Expand Down Expand Up @@ -161,4 +161,4 @@
</license>
</licenses>

</project>
</project>
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<artifactId>nakadi-producer-reactor</artifactId>
<groupId>org.zalando</groupId>
<version>20.3.1</version>
<version>20.4.0</version>
<packaging>pom</packaging>
<name>Nakadi Event Producer Reactor</name>

Expand Down Expand Up @@ -78,4 +78,4 @@
</repository>
</distributionManagement>

</project>
</project>

0 comments on commit dc01695

Please sign in to comment.