Skip to content

Commit

Permalink
[improve] [auto-recovery] [branch-3.1] Migrate the replication testin…
Browse files Browse the repository at this point in the history
…g from BookKeeper to Pulsar. (apache#21340)

There is no testing for AutoRecovery replication in Pulsar's current test suite, and we need to cover it. So migrate the replication testing from BookKeeper to Pulsar.
  • Loading branch information
horizonzy authored and srinath-ctds committed Dec 20, 2023
1 parent e290abb commit 8811d5b
Show file tree
Hide file tree
Showing 21 changed files with 6,982 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieException;
Expand All @@ -33,8 +30,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;


class PulsarLayoutManager implements LayoutManager {
public class PulsarLayoutManager implements LayoutManager {

@Getter(AccessLevel.PACKAGE)
private final MetadataStoreExtended store;
Expand All @@ -44,7 +40,7 @@ class PulsarLayoutManager implements LayoutManager {

private final String layoutPath;

PulsarLayoutManager(MetadataStoreExtended store, String ledgersRootPath) {
public PulsarLayoutManager(MetadataStoreExtended store, String ledgersRootPath) {
this.ledgersRootPath = ledgersRootPath;
this.store = store;
this.layoutPath = ledgersRootPath + "/" + BookKeeperConstants.LAYOUT_ZNODE;
Expand All @@ -53,14 +49,14 @@ class PulsarLayoutManager implements LayoutManager {
@Override
public LedgerLayout readLedgerLayout() throws IOException {
try {
byte[] layoutData = store.get(layoutPath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)
byte[] layoutData = store.get(layoutPath).get()
.orElseThrow(() -> new BookieException.MetadataStoreException("Layout node not found"))
.getValue();
return LedgerLayout.parseLayout(layoutData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (BookieException | ExecutionException | TimeoutException e) {
} catch (BookieException | ExecutionException e) {
throw new IOException(e);
}
}
Expand All @@ -70,13 +66,10 @@ public void storeLedgerLayout(LedgerLayout ledgerLayout) throws IOException {
try {
byte[] layoutData = ledgerLayout.serialize();

store.put(layoutPath, layoutData, Optional.of(-1L))
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
store.put(layoutPath, layoutData, Optional.of(-1L)).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof MetadataStoreException.BadVersionException) {
throw new LedgerLayoutExistsException(e);
Expand All @@ -89,12 +82,11 @@ public void storeLedgerLayout(LedgerLayout ledgerLayout) throws IOException {
@Override
public void deleteLedgerLayout() throws IOException {
try {
store.delete(layoutPath, Optional.empty())
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
store.delete(layoutPath, Optional.empty()).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException | TimeoutException e) {
} catch (ExecutionException e) {
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;

@Slf4j
class PulsarLedgerAuditorManager implements LedgerAuditorManager {
public class PulsarLedgerAuditorManager implements LedgerAuditorManager {

private static final String ELECTION_PATH = "leader";
public static final String ELECTION_PATH = "leader";

private final CoordinationService coordinationService;
private final LeaderElection<String> leaderElection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public void close() throws IOException {
}
}

private String getLedgerPath(long ledgerId) {
public String getLedgerPath(long ledgerId) {
return this.ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.replication;

import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertTrue;
import com.beust.jcommander.internal.Lists;
import com.beust.jcommander.internal.Sets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* Unit test {@link AuditorBookieCheckTask}.
*/
public class AuditorBookieCheckTaskTest {

private AuditorStats auditorStats;
private BookKeeperAdmin admin;
private LedgerManager ledgerManager;
private LedgerUnderreplicationManager underreplicationManager;
private BookieLedgerIndexer ledgerIndexer;
private AuditorBookieCheckTask bookieCheckTask;
private final AtomicBoolean shutdownCompleted = new AtomicBoolean(false);
private final AuditorTask.ShutdownTaskHandler shutdownTaskHandler = () -> shutdownCompleted.set(true);
private long startLedgerId = 0;

@BeforeMethod
public void setup() {
ServerConfiguration conf = mock(ServerConfiguration.class);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
final AuditorStats auditorStats = new AuditorStats(statsLogger);
this.auditorStats = spy(auditorStats);
admin = mock(BookKeeperAdmin.class);
ledgerManager = mock(LedgerManager.class);
underreplicationManager = mock(LedgerUnderreplicationManager.class);
ledgerIndexer = mock(BookieLedgerIndexer.class);
AuditorBookieCheckTask bookieCheckTask1 = new AuditorBookieCheckTask(
conf, this.auditorStats, admin, ledgerManager, underreplicationManager,
shutdownTaskHandler, ledgerIndexer, null, null);
bookieCheckTask = spy(bookieCheckTask1);
}

@Test
public void testShutdownAuditBookiesException()
throws BKException, ReplicationException.BKAuditException, InterruptedException {
doThrow(new ReplicationException.BKAuditException("test failed"))
.when(bookieCheckTask)
.auditBookies();
bookieCheckTask.startAudit(true);

assertTrue("shutdownTaskHandler should be execute.", shutdownCompleted.get());
}

@Test
public void testAuditBookies()
throws ReplicationException.UnavailableException, ReplicationException.BKAuditException, BKException {
final String bookieId1 = "127.0.0.1:1000";
final String bookieId2 = "127.0.0.1:1001";
final long bookie1LedgersCount = 10;
final long bookie2LedgersCount = 20;

final Map<String, Set<Long>> bookiesAndLedgers = new HashMap<>();
bookiesAndLedgers.put(bookieId1, getLedgers(bookie1LedgersCount));
bookiesAndLedgers.put(bookieId2, getLedgers(bookie2LedgersCount));
when(ledgerIndexer.getBookieToLedgerIndex()).thenReturn(bookiesAndLedgers);
when(underreplicationManager.isLedgerReplicationEnabled()).thenReturn(true);

CompletableFuture<Versioned<LedgerMetadata>> metaPromise = new CompletableFuture<>();
final LongVersion version = mock(LongVersion.class);
final LedgerMetadata metadata = mock(LedgerMetadata.class);
metaPromise.complete(new Versioned<>(metadata, version));
when(ledgerManager.readLedgerMetadata(anyLong())).thenReturn(metaPromise);

CompletableFuture<Void> markPromise = new CompletableFuture<>();
markPromise.complete(null);
when(underreplicationManager.markLedgerUnderreplicatedAsync(anyLong(), anyCollection()))
.thenReturn(markPromise);

OpStatsLogger numUnderReplicatedLedgerStats = mock(OpStatsLogger.class);
when(auditorStats.getNumUnderReplicatedLedger()).thenReturn(numUnderReplicatedLedgerStats);

final List<BookieId> availableBookies = Lists.newArrayList();
final List<BookieId> readOnlyBookies = Lists.newArrayList();
// test bookie1 lost
availableBookies.add(BookieId.parse(bookieId2));
when(admin.getAvailableBookies()).thenReturn(availableBookies);
when(admin.getReadOnlyBookies()).thenReturn(readOnlyBookies);
bookieCheckTask.startAudit(true);
verify(numUnderReplicatedLedgerStats, times(1))
.registerSuccessfulValue(eq(bookie1LedgersCount));

// test bookie2 lost
numUnderReplicatedLedgerStats = mock(OpStatsLogger.class);
when(auditorStats.getNumUnderReplicatedLedger()).thenReturn(numUnderReplicatedLedgerStats);
availableBookies.clear();
availableBookies.add(BookieId.parse(bookieId1));
bookieCheckTask.startAudit(true);
verify(numUnderReplicatedLedgerStats, times(1))
.registerSuccessfulValue(eq(bookie2LedgersCount));

}

private Set<Long> getLedgers(long count) {
final Set<Long> ledgers = Sets.newHashSet();
for (int i = 0; i < count; i++) {
ledgers.add(i + startLedgerId++);
}
return ledgers;
}
}
Loading

0 comments on commit 8811d5b

Please sign in to comment.