From ec042aabe1dcf5f68d7511d30c795501f3aec965 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 19 Jun 2024 00:10:47 +0530 Subject: [PATCH] Add TestCapturingListener Signed-off-by: Shivansh Arora --- .../RemoteGlobalMetadataManagerTests.java | 334 +++++++++--------- .../gateway/remote/RemoteStateTestUtil.java | 28 ++ 2 files changed, 188 insertions(+), 174 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteStateTestUtil.java diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 61baf93d63bc9..1b361e7818bf0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -20,7 +20,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; @@ -54,14 +53,14 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.Stream; -import org.mockito.ArgumentCaptor; - import static java.util.stream.Collectors.toList; import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; @@ -92,8 +91,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteGlobalMetadataManagerTests extends OpenSearchTestCase { @@ -171,19 +168,20 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( coordinationMetadataForDownload, COORDINATION_METADATA, - listener - ); - runnable.run(); - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(coordinationMetadata, savedResult.getValue().getObj()); - assertEquals(COORDINATION_METADATA, savedResult.getValue().getComponent()); - assertEquals(COORDINATION_METADATA, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(coordinationMetadata, listener.result.getObj()); + assertEquals(COORDINATION_METADATA, listener.result.getComponent()); + assertEquals(COORDINATION_METADATA, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { @@ -200,27 +198,22 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - remoteCoordinationMetadata, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(COORDINATION_METADATA, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -243,19 +236,20 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception RemotePersistentSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) .streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( persistentSettings, SETTING_METADATA, - listener - ); - runnable.run(); - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(settingsMetadata, savedResult.getValue().getObj()); - assertEquals(SETTING_METADATA, savedResult.getValue().getComponent()); - assertEquals(SETTING_METADATA, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(settingsMetadata, listener.result.getObj()); + assertEquals(SETTING_METADATA, listener.result.getComponent()); + assertEquals(SETTING_METADATA, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { @@ -272,23 +266,21 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, listener); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(SETTING_METADATA, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -311,19 +303,20 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception RemoteTransientSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) .streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( transientSettings, TRANSIENT_SETTING_METADATA, - listener - ); - runnable.run(); - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(settingsMetadata, savedResult.getValue().getObj()); - assertEquals(TRANSIENT_SETTING_METADATA, savedResult.getValue().getComponent()); - assertEquals(TRANSIENT_SETTING_METADATA, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(settingsMetadata, listener.result.getObj()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.result.getComponent()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { @@ -340,22 +333,20 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, listener); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(TRANSIENT_SETTING_METADATA, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -376,19 +367,20 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncMetadataReadAction( hashesOfConsistentSettingsForDownload, HASHES_OF_CONSISTENT_SETTINGS, - listener - ); - runnable.run(); - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(hashesOfConsistentSettings, savedResult.getValue().getObj()); - assertEquals(HASHES_OF_CONSISTENT_SETTINGS, savedResult.getValue().getComponent()); - assertEquals(HASHES_OF_CONSISTENT_SETTINGS, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(hashesOfConsistentSettings, listener.result.getObj()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.result.getComponent()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { @@ -404,25 +396,23 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( hashesOfConsistentSettingsForUpload, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(HASHES_OF_CONSISTENT_SETTINGS, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -444,19 +434,19 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( TEMPLATES_METADATA_FORMAT.serialize(templatesMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( templatesMetadataForDownload, TEMPLATES_METADATA, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(templatesMetadata, savedResult.getValue().getObj()); - assertEquals(TEMPLATES_METADATA, savedResult.getValue().getComponent()); - assertEquals(TEMPLATES_METADATA, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(templatesMetadata, listener.result.getObj()); + assertEquals(TEMPLATES_METADATA, listener.result.getComponent()); + assertEquals(TEMPLATES_METADATA, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { @@ -473,25 +463,21 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - templateMetadataForUpload, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(TEMPLATES_METADATA, uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -514,19 +500,19 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( customMetadataForDownload.customBlobStoreFormat.serialize(customMetadata, fileName, compressor).streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( customMetadataForDownload, IndexGraveyard.TYPE, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertEquals(customMetadata, savedResult.getValue().getObj()); - assertEquals(CUSTOM_METADATA, savedResult.getValue().getComponent()); - assertEquals(IndexGraveyard.TYPE, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertEquals(customMetadata, listener.result.getObj()); + assertEquals(CUSTOM_METADATA, listener.result.getComponent()); + assertEquals(IndexGraveyard.TYPE, listener.result.getComponentName()); } public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { @@ -544,22 +530,21 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, listener); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass( - ClusterMetadataManifest.UploadedMetadata.class - ); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - ClusterMetadataManifest.UploadedMetadata uploadedMetadata = savedResult.getValue(); - assertNotNull(uploadedMetadata); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.result; assertEquals(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, IndexGraveyard.TYPE), uploadedMetadata.getComponent()); String uploadedFileName = uploadedMetadata.getUploadedFilename(); String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); assertEquals(5, pathTokens.length); assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); - assertEquals("cluster-state", pathTokens[1]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); assertEquals(CLUSTER_UUID, pathTokens[2]); assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); String[] splitFileName = pathTokens[4].split(DELIMITER); @@ -576,19 +561,19 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( GLOBAL_METADATA_FORMAT.serialize(metadata, fileName, compressor, FORMAT_PARAMS).streamInput() ); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( globalMetadataForDownload, GLOBAL_METADATA, - listener - ); - runnable.run(); - - ArgumentCaptor savedResult = ArgumentCaptor.forClass(RemoteReadResult.class); - assertBusy(() -> verify(listener, times(1)).onResponse(savedResult.capture())); - assertTrue(isGlobalStateEquals(metadata, (Metadata) savedResult.getValue().getObj())); - assertEquals(GLOBAL_METADATA, savedResult.getValue().getComponent()); - assertEquals(GLOBAL_METADATA, savedResult.getValue().getComponentName()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.failure); + assertNotNull(listener.result); + assertTrue(isGlobalStateEquals(metadata, (Metadata) listener.result.getObj())); + assertEquals(GLOBAL_METADATA, listener.result.getComponent()); + assertEquals(GLOBAL_METADATA, listener.result.getComponentName()); } public void testGetReadMetadataAsyncAction_IOException() throws Exception { @@ -601,16 +586,17 @@ public void testGetReadMetadataAsyncAction_IOException() throws Exception { ); IOException ioException = new IOException("mock test exception"); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataReadAction( + RemoteStateTestUtil.TestCapturingListener listener = new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( coordinationMetadataForDownload, COORDINATION_METADATA, - listener - ); - runnable.run(); - ArgumentCaptor savedException = ArgumentCaptor.forClass(Exception.class); - assertBusy(() -> verify(listener, times(1)).onFailure(savedException.capture())); - assertEquals(ioException, savedException.getValue()); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.result); + assertNotNull(listener.failure); + assertEquals(ioException, listener.failure); } public void testGetAsyncMetadataWriteAction_IOException() throws Exception { @@ -622,22 +608,22 @@ public void testGetAsyncMetadataWriteAction_IOException() throws Exception { compressor, xContentRegistry ); + IOException ioException = new IOException("mock test exception"); doAnswer(invocationOnMock -> { - invocationOnMock.getArgument(4, ActionListener.class).onFailure(new IOException("mock test exception")); + invocationOnMock.getArgument(4, ActionListener.class).onFailure(ioException); return null; }).when(blobStoreTransferService) .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); - LatchedActionListener listener = mock(LatchedActionListener.class); - CheckedRunnable runnable = remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - remoteCoordinationMetadata, - listener - ); - runnable.run(); - - ArgumentCaptor savedException = ArgumentCaptor.forClass(Exception.class); - assertBusy(() -> verify(listener, times(1)).onFailure(savedException.capture())); - assertTrue(savedException.getValue() instanceof RemoteStateTransferException); + RemoteStateTestUtil.TestCapturingListener listener = + new RemoteStateTestUtil.TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + assertNull(listener.result); + assertNotNull(listener.failure); + assertTrue(listener.failure instanceof RemoteStateTransferException); + assertEquals(ioException, listener.failure.getCause()); } public void testGetUpdatedCustoms() { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteStateTestUtil.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteStateTestUtil.java new file mode 100644 index 0000000000000..50d17651602bf --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteStateTestUtil.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.core.action.ActionListener; + +public class RemoteStateTestUtil { + public static class TestCapturingListener implements ActionListener { + T result; + Exception failure; + + @Override + public void onResponse(T result) { + this.result = result; + } + + @Override + public void onFailure(Exception e) { + this.failure = e; + } + } +}