Skip to content

Commit

Permalink
Add wrapper class of job scheduler lock service (opensearch-project#290)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 21, 2023
1 parent 932e1e1 commit fde10e1
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;

import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;

/**
* A wrapper of job scheduler's lock service for datasource
*/
public class Ip2GeoLockService {
private final ClusterService clusterService;
private final Client client;
private final LockService lockService;

/**
* Constructor
*
* @param clusterService the cluster service
* @param client the client
*/
@Inject
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
this.clusterService = clusterService;
this.client = client;
this.lockService = new LockService(client, clusterService);
}

/**
* Wrapper method of LockService#acquireLockWithId
*
* Datasource use its name as doc id in job scheduler. Therefore, we can use datasource name to acquire
* a lock on a datasource.
*
* @param datasourceName datasourceName to acquire lock on
* @param lockDurationSeconds the lock duration in seconds
* @param listener the listener
*/
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
}

/**
* Wrapper method of LockService#release
*
* @param lockModel the lock model
* @param listener the listener
*/
public void releaseLock(final LockModel lockModel, final ActionListener<Boolean> listener) {
lockService.release(lockModel, listener);
}

/**
* Synchronous method of LockService#renewLock
*
* @param lockModel lock to renew
* @param timeout timeout in milliseconds precise
* @return renewed lock if renew succeed and null otherwise
*/
public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) {
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.renewLock(lockModel, new ActionListener<>() {
@Override
public void onResponse(final LockModel lockModel) {
lockReference.set(lockModel);
countDownLatch.countDown();
}

@Override
public void onFailure(final Exception e) {
lockReference.set(null);
countDownLatch.countDown();
}
});

try {
countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS);
return lockReference.get();
} catch (InterruptedException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;

import static org.mockito.Mockito.mock;

import java.time.Instant;

import org.junit.Before;
import org.opensearch.action.ActionListener;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;

public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
private Ip2GeoLockService ip2GeoLockService;

@Before
public void init() {
ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
}

public void testAcquireLock_whenValidInput_thenSucceed() {
// Cannot test because LockService is final class
// Simply calling method to increase coverage
ip2GeoLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class));
}

public void testReleaseLock_whenValidInput_thenSucceed() {
// Cannot test because LockService is final class
// Simply calling method to increase coverage
ip2GeoLockService.releaseLock(null, mock(ActionListener.class));
}

public void testRenewLock_whenCalled_thenNotBlocked() {
long timeoutInMillis = 10000;
long expectedDurationInMillis = 1000;
Instant before = Instant.now();
assertNull(ip2GeoLockService.renewLock(null, TimeValue.timeValueMillis(timeoutInMillis)));
Instant after = Instant.now();
assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis);
}
}

0 comments on commit fde10e1

Please sign in to comment.