Skip to content

Commit

Permalink
Merge pull request #4 from MISBMS/main
Browse files Browse the repository at this point in the history
Upgraded test containers & introduced duration for supplying seconds
  • Loading branch information
tshan10 authored Oct 8, 2024
2 parents 9bf3c7b + 490bf54 commit a9c3ea9
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 53 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ its MVCC (MultiVersion Concurrency Control) capability.
```xml
<dependency>
<groupId>com.phonepe</groupId>
<artifactId>distributed-lock-manager</artifactId>
<version>3.0.6</version>
<artifactId>DLM</artifactId>
<version>1.0.0</version>
</dependency>
```

Expand Down
69 changes: 39 additions & 30 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.phonepe</groupId>
<artifactId>distributed-lock-manager</artifactId>
<version>3.0.7-SNAPSHOT</version>
<artifactId>DLM</artifactId>
<version>1.0.0-SNAPSHOT</version>

<url>https://github.com/PhonePe/DLM</url>
<description>Distributed Lock Manager</description>
Expand Down Expand Up @@ -95,7 +95,7 @@

<!-- Test related properties -->
<mockito.version>4.3.1</mockito.version>
<junit.testcontainer.version>1.0.6</junit.testcontainer.version>
<junit.testcontainer.version>1.0.14</junit.testcontainer.version>
<jna.version>5.7.0</jna.version>
<awaitility.version>4.1.1</awaitility.version>
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -174,6 +174,12 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.appform.testcontainer</groupId>
<artifactId>junit-testcontainer-commons</artifactId>
<version>${junit.testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.appform.testcontainer</groupId>
<artifactId>junit-testcontainer-aerospike</artifactId>
Expand Down Expand Up @@ -259,33 +265,36 @@

<profiles>
<profile>
<id>coverage</id>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.maven.plugin.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<id>coverage</id>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.maven.plugin.version}</version>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<goals>
<goal>report</goal>
</goals>
<configuration>
<formats>
<format>XML</format>
</formats>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>release</id>
<build>
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/phonepe/dlm/DistributedLockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.phonepe.dlm.lock.Lock;
import com.phonepe.dlm.lock.base.LockBase;
import com.phonepe.dlm.lock.level.LockLevel;

import java.time.Duration;

import com.phonepe.dlm.exception.DLMException;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -56,7 +59,7 @@ public void tryAcquireLock(final Lock lock) {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
*/
public void tryAcquireLock(final Lock lock, final int duration) {
public void tryAcquireLock(final Lock lock, final Duration duration) {
lockBase.tryAcquireLock(lock, duration);
}

Expand Down Expand Up @@ -86,7 +89,7 @@ public void acquireLock(final Lock lock) {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
public void acquireLock(final Lock lock, final int duration) {
public void acquireLock(final Lock lock, final Duration duration) {
lockBase.acquireLock(lock, duration);
}

Expand All @@ -101,7 +104,7 @@ public void acquireLock(final Lock lock, final int duration) {
* @param timeout The timeout(wait duration in seconds) for a lock to become available
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
public void acquireLock(final Lock lock, final int duration, final int timeout) {
public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) {
lockBase.acquireLock(lock, duration, timeout);
}

Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/phonepe/dlm/lock/ILockable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.lock.base.LockBase;

import java.time.Duration;

import com.phonepe.dlm.exception.DLMException;

public interface ILockable {
Expand All @@ -41,7 +44,7 @@ public interface ILockable {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
*/
void tryAcquireLock(final Lock lock, final int duration);
void tryAcquireLock(final Lock lock, final Duration duration);

/**
* This method tries to acquire the lock, and if the lock is currently held by another thread,
Expand All @@ -67,7 +70,7 @@ public interface ILockable {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
void acquireLock(final Lock lock, final int duration);
void acquireLock(final Lock lock, final Duration duration);

/**
* This method attempts to acquire the lock and waits for a limited time for the lock to become available.
Expand All @@ -80,7 +83,7 @@ public interface ILockable {
* @param timeout The timeout(wait duration in seconds) for a lock to become available
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
void acquireLock(final Lock lock, final int duration, final int timeout);
void acquireLock(final Lock lock, final Duration duration, final Duration timeout);

/**
* This method releases the acquired lock, allowing other threads to acquire it.
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/com/phonepe/dlm/lock/base/LockBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@AllArgsConstructor
@Builder
@Getter
public class LockBase implements ILockable {
public static final int DEFAULT_LOCK_TTL_SECONDS = 90;
public static final int DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = 90;
public static final Duration DEFAULT_LOCK_TTL_SECONDS = Duration.ofSeconds(90);
public static final Duration DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = Duration.ofSeconds(90);
public static final int WAIT_TIME_FOR_NEXT_RETRY = 1000; // 1 second

private final ILockStore lockStore;
Expand All @@ -49,7 +50,7 @@ public void tryAcquireLock(final Lock lock) {
}

@Override
public void tryAcquireLock(final Lock lock, final int duration) {
public void tryAcquireLock(final Lock lock, final Duration duration) {
writeToStore(lock, duration);
}

Expand All @@ -59,14 +60,14 @@ public void acquireLock(final Lock lock) {
}

@Override
public void acquireLock(final Lock lock, final int duration) {
public void acquireLock(final Lock lock, final Duration duration) {
acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS);
}

@SneakyThrows
@Override
public void acquireLock(final Lock lock, final int duration, final int timeout) {
final Timer timer = new Timer(System.currentTimeMillis(), timeout);
public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) {
final Timer timer = new Timer(System.currentTimeMillis(), timeout.getSeconds());
final AtomicBoolean success = new AtomicBoolean(false);
do {
try {
Expand Down Expand Up @@ -96,7 +97,7 @@ public boolean releaseLock(final Lock lock) {
return false;
}

private void writeToStore(final Lock lock, final int ttlSeconds) {
private void writeToStore(final Lock lock, final Duration ttlSeconds) {
lockStore.write(lock.getLockId(), lock.getLockLevel(), lock.getFarmId(), ttlSeconds);
lock.getAcquiredStatus().compareAndSet(false, true);
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.phonepe.dlm.lock.storage;

import java.time.Duration;

import com.phonepe.dlm.lock.level.LockLevel;

public interface ILockStore {
void initialize();

void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds);
void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds);

void remove(String lockId, LockLevel lockLevel, String farmId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Builder;
import lombok.Data;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -52,11 +53,11 @@ public void initialize() {
}

@Override
public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) {
public void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds) {
final WritePolicy writePolicy = new WritePolicy(aerospikeClient.getWritePolicyDefault());
writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
writePolicy.generation = 0;
writePolicy.expiration = ttlSeconds;
writePolicy.expiration = Long.valueOf(ttlSeconds.getSeconds()).intValue(); // as only int is supported
writePolicy.commitLevel = CommitLevel.COMMIT_MASTER; // Committing to master only, as there is no read required so there is no chance of dirty reads.
try {
final List<Bin> binList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.time.Duration;

@Data
@Builder
Expand Down Expand Up @@ -70,14 +71,14 @@ public void initialize() {
}

@Override
public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) {
public void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds) {
final byte[] normalisedRowKey = getNormalisedRowKey(lockId, lockLevel, farmId);

try (final Table table = getTable()) {
final boolean result = table.checkAndMutate(normalisedRowKey, COLUMN_FAMILY)
.qualifier(COLUMN_NAME)
.ifNotExists()
.thenPut(new Put(normalisedRowKey, System.currentTimeMillis()).setTTL(ttlSeconds * 1_000L)
.thenPut(new Put(normalisedRowKey, System.currentTimeMillis()).setTTL(ttlSeconds.getSeconds() * 1_000L)
.addColumn(COLUMN_FAMILY, COLUMN_NAME, COLUMN_DATA));
if (!result) {
throw DLMException.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void lockTestPositiveSiloDC() {
@Test
public void lockTestPositiveXDC() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC);
lockManager.tryAcquireLock(lock, 90);
lockManager.tryAcquireLock(lock, Duration.ofSeconds(90));
Assert.assertTrue(lock.getAcquiredStatus()
.get());

Expand All @@ -127,7 +128,7 @@ public void lockTestPositiveXDC() {
@Test
public void testAcquireLockWithWait() {
final Lock lock = lockManager.getLockInstance("NEW_LOCK_ID", LockLevel.DC);
lockManager.acquireLock(lock, 2); // Lock acquired for 1 seconds
lockManager.acquireLock(lock, Duration.ofSeconds(2)); // Lock acquired for 2 seconds
Assert.assertTrue(lock.getAcquiredStatus().get());

try {
Expand All @@ -140,7 +141,7 @@ public void testAcquireLockWithWait() {
Assert.assertTrue(lock.getAcquiredStatus().get());

try {
lockManager.acquireLock(lock, 2, 2); // Wait for 2 seconds only for acquiring the lock
lockManager.acquireLock(lock, Duration.ofSeconds(2), Duration.ofSeconds(2)); // Wait for 2 seconds only for acquiring the lock
Assert.fail("Flow should not have reached here");
} catch (DLMException e) {
Assert.assertEquals(ErrorCode.LOCK_UNAVAILABLE, e.getErrorCode()); // As it won't be released for next 90 secs default
Expand Down

0 comments on commit a9c3ea9

Please sign in to comment.