Skip to content

Commit

Permalink
[controller] Fix store creation and handle schema evolution for push …
Browse files Browse the repository at this point in the history
…job details and heartbeat system stores
  • Loading branch information
nisargthakkar committed Jul 13, 2023
1 parent fabfe89 commit e6bc2fe
Show file tree
Hide file tree
Showing 56 changed files with 1,007 additions and 813 deletions.
16 changes: 0 additions & 16 deletions .github/workflows/gh-ci-pulsar-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,6 @@ name: PulsarVeniceIntegrationCI
on: [push, pull_request, workflow_dispatch]

jobs:
ValidateGradleWrapper:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
timeout-minutes: 5
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
- name: Set up Docker
uses: crazy-max/ghaction-setup-docker@v1

PulsarVeniceIntegrationTests:
strategy:
fail-fast: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public PushJobHeartbeatSender createHeartbeatSender(
StoreInfo storeInfo = heartBeatStoreResponse.getStore();
PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig();
int partitionNum = storeInfo.getPartitionCount();
LOGGER.info("Got [heartbeat store: {}] Store Info: {}", heartbeatStoreName, storeInfo);
String heartbeatKafkaTopicName = Version.composeRealTimeTopic(heartbeatStoreName);
VeniceWriter<byte[], byte[], byte[]> veniceWriter = getVeniceWriter(
heartbeatKafkaTopicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,23 @@ public static void executeUnderConditionalLock(Runnable action, BooleanSupplier
}
}
}

public static void executeUnderConditionalLock(
Runnable action,
Runnable orElse,
BooleanSupplier lockCondition,
Object lock) {
if (lockCondition.getAsBoolean()) {
synchronized (lock) {
// Check it again
if (lockCondition.getAsBoolean()) {
action.run();
} else {
orElse.run();
}
}
} else {
orElse.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,74 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;


public class ConcurrencyUtilsTest {
@Test
public void testExecuteUnderConditionalLock() throws InterruptedException {
int numRunnables = 1000;
AtomicReference<Boolean> globalState = new AtomicReference<>(true);
AtomicInteger actionCount = new AtomicInteger();

Runnable action = Mockito.mock(Runnable.class);
Mockito.doAnswer(invocation -> {
globalState.set(false);
actionCount.incrementAndGet();
return null;
}).when(action).run();

BooleanSupplier lockConditionOnGlobalState = Mockito.mock(BooleanSupplier.class);
doAnswer(invocation -> globalState.get()).when(lockConditionOnGlobalState).getAsBoolean();
doAnswer(invocation -> actionCount.get() == 0).when(lockConditionOnGlobalState).getAsBoolean();

runTest(numRunnables, action, lockConditionOnGlobalState);

Mockito.verify(action, times(1)).run();
Assert.assertEquals(actionCount.get(), 1);
Mockito.verify(lockConditionOnGlobalState, atLeast(numRunnables + 1)).getAsBoolean();
Mockito.verify(lockConditionOnGlobalState, atMost(2 * numRunnables)).getAsBoolean();

// Lock will always return false now. The action should never get executed and the condition should only be checked
// in the outer block
Mockito.reset(action);
Mockito.reset(lockConditionOnGlobalState);

runTest(numRunnables, action, lockConditionOnGlobalState);
Mockito.verify(action, never()).run();
Mockito.verify(lockConditionOnGlobalState, times(numRunnables)).getAsBoolean();
}

@Test
public void testExecuteUnderConditionalLockWithElse() throws InterruptedException {
int numRunnables = 1000;
AtomicInteger actionCount = new AtomicInteger();
AtomicInteger orElseActionCount = new AtomicInteger();

Runnable action = Mockito.mock(Runnable.class);
Mockito.doAnswer(invocation -> {
actionCount.incrementAndGet();
return null;
}).when(action).run();

Runnable orElseAction = () -> orElseActionCount.incrementAndGet();

Runnable orElseAction = Mockito.mock(Runnable.class);
Mockito.doAnswer(invocation -> {
orElseActionCount.incrementAndGet();
return null;
}).when(orElseAction).run();

BooleanSupplier lockConditionOnGlobalState = Mockito.mock(BooleanSupplier.class);
doAnswer(invocation -> actionCount.get() == 0).when(lockConditionOnGlobalState).getAsBoolean();

runTest(numRunnables, action, orElseAction, lockConditionOnGlobalState);

Mockito.verify(action, times(1)).run();
Assert.assertEquals(actionCount.get(), 1);
Mockito.verify(orElseAction, times(numRunnables - 1)).run();
Assert.assertEquals(orElseActionCount.get(), numRunnables - 1);
Mockito.verify(lockConditionOnGlobalState, atLeast(numRunnables + 1)).getAsBoolean();
Mockito.verify(lockConditionOnGlobalState, atMost(2 * numRunnables)).getAsBoolean();

Expand Down Expand Up @@ -60,4 +104,20 @@ private void runTest(int numRunnables, Runnable action, BooleanSupplier lockCond
latch.await();
executorService.shutdownNow();
}

private void runTest(int numRunnables, Runnable action, Runnable orElseAction, BooleanSupplier lockCondition)
throws InterruptedException {
int threadPoolSize = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
CountDownLatch latch = new CountDownLatch(numRunnables);
Object lockObject = new Object();
for (int i = 0; i < numRunnables; i++) {
executorService.submit(() -> {
ConcurrencyUtils.executeUnderConditionalLock(action, orElseAction, lockCondition, lockObject);
latch.countDown();
});
}
latch.await();
executorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void createTopic(
* Create a topic, and block until the topic is created, with a default timeout of
* {@value #DEFAULT_KAFKA_OPERATION_TIMEOUT_MS}, after which this function will throw a VeniceException.
*
* @param topicName Name for the new topic
* @param pubSubTopic Name for the new topic
* @param numPartitions number of partitions
* @param replication replication factor
* @param retentionTimeMs Retention time, in ms, for the topic
Expand All @@ -244,7 +244,7 @@ public void createTopic(
* if true, a much shorter timeout will be used to make topic creation non-blocking.
*/
public void createTopic(
PubSubTopic topicName,
PubSubTopic pubSubTopic,
int numPartitions,
int replication,
long retentionTimeMs,
Expand All @@ -259,36 +259,37 @@ public void createTopic(
new PubSubTopicConfiguration(Optional.of(retentionTimeMs), logCompaction, minIsr, topicMinLogCompactionLagMs);
logger.info(
"Creating topic: {} partitions: {} replication: {}, configuration: {}",
topicName,
pubSubTopic,
numPartitions,
replication,
pubSubTopicConfiguration);

try {
RetryUtils.executeWithMaxAttemptAndExponentialBackoff(
() -> kafkaWriteOnlyAdmin.get().createTopic(topicName, numPartitions, replication, pubSubTopicConfiguration),
() -> kafkaWriteOnlyAdmin.get()
.createTopic(pubSubTopic, numPartitions, replication, pubSubTopicConfiguration),
10,
Duration.ofMillis(200),
Duration.ofSeconds(1),
Duration.ofMillis(useFastKafkaOperationTimeout ? FAST_KAFKA_OPERATION_TIMEOUT_MS : kafkaOperationTimeoutMs),
CREATE_TOPIC_RETRIABLE_EXCEPTIONS);
} catch (Exception e) {
if (ExceptionUtils.recursiveClassEquals(e, TopicExistsException.class)) {
logger.info("Topic: {} already exists, will update retention policy.", topicName);
waitUntilTopicCreated(topicName, numPartitions, deadlineMs);
updateTopicRetention(topicName, retentionTimeMs);
logger.info("Updated retention policy to be {}ms for topic: {}", retentionTimeMs, topicName);
logger.info("Topic: {} already exists, will update retention policy.", pubSubTopic);
waitUntilTopicCreated(pubSubTopic, numPartitions, deadlineMs);
updateTopicRetention(pubSubTopic, retentionTimeMs);
logger.info("Updated retention policy to be {}ms for topic: {}", retentionTimeMs, pubSubTopic);
return;
} else {
throw new VeniceOperationAgainstKafkaTimedOut(
"Timeout while creating topic: " + topicName + ". Topic still does not exist after "
"Timeout while creating topic: " + pubSubTopic + ". Topic still does not exist after "
+ (deadlineMs - startTime) + "ms.",
e);
}
}
waitUntilTopicCreated(topicName, numPartitions, deadlineMs);
waitUntilTopicCreated(pubSubTopic, numPartitions, deadlineMs);
boolean eternal = retentionTimeMs == ETERNAL_TOPIC_RETENTION_POLICY_MS;
logger.info("Successfully created {}topic: {}", eternal ? "eternal " : "", topicName);
logger.info("Successfully created {}topic: {}", eternal ? "eternal " : "", pubSubTopic);
}

protected void waitUntilTopicCreated(PubSubTopic topicName, int partitionCount, long deadlineMs) {
Expand All @@ -309,7 +310,6 @@ protected void waitUntilTopicCreated(PubSubTopic topicName, int partitionCount,
* @param topicName
*/
private Future<Void> ensureTopicIsDeletedAsync(PubSubTopic topicName) {
// TODO: Stop using Kafka APIs which depend on ZK.
logger.info("Deleting topic: {}", topicName);
return kafkaWriteOnlyAdmin.get().deleteTopic(topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.metadata.response.MetadataResponseRecord;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.pubsub.api.PubSubPositionWireFormat;
import com.linkedin.venice.pushstatus.PushStatusKey;
import com.linkedin.venice.pushstatus.PushStatusValue;
Expand Down Expand Up @@ -147,6 +148,11 @@ public enum AvroProtocolDefinition {
*/
PUSH_STATUS_SYSTEM_SCHEMA_STORE(1, PushStatusValue.class),

/**
* Value schema for participant system stores.
*/
PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE(1, ParticipantMessageValue.class),

/**
* Response record for admin request v1.
*/
Expand Down Expand Up @@ -230,7 +236,7 @@ private static Set<Byte> validateMagicBytes() {
* extra header prepended in front. These are either un-evolvable or have their
* schema ID stored out of band from the record.
*
* For example, everything that goes inside of the Put of a {@link KafkaMessageEnvelope}
* For example, everything that goes inside the Put message of a {@link KafkaMessageEnvelope}
* and uses the {@link com.linkedin.venice.kafka.protocol.Put#schemaId} to store
* the protocol version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public class VeniceWriter<K, V, U> extends AbstractVeniceWriter<K, V, U> {
private final VeniceKafkaSerializer<K> keySerializer;
private final VeniceKafkaSerializer<V> valueSerializer;
private final VeniceKafkaSerializer<U> writeComputeSerializer;
private final PubSubProducerAdapter producerAdapter;
public final PubSubProducerAdapter producerAdapter;
private final GUID producerGUID;
private final Time time;
private final VenicePartitioner partitioner;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.venice.consumer;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand Down Expand Up @@ -32,13 +31,15 @@ void extraBeforeClassSetUp(VeniceClusterWrapper cluster, ControllerClient contro
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion());
});

((VeniceHelixAdmin) cluster.getRandomVeniceController().getVeniceAdmin()).addValueSchema(
cluster.getClusterName(),
systemStoreName,
NEW_PROTOCOL_SCHEMA.toString(),
NEW_PROTOCOL_VERSION,
DirectionalSchemaCompatibilityType.NONE,
false);
cluster.getRandomVeniceController()
.getVeniceAdmin()
.addValueSchemaInternal(
cluster.getClusterName(),
systemStoreName,
NEW_PROTOCOL_SCHEMA.toString(),
NEW_PROTOCOL_VERSION,
DirectionalSchemaCompatibilityType.NONE,
false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -56,7 +55,7 @@ public void setUp() {
2,
2,
1,
Optional.of(new VeniceProperties(parentControllerProperties)),
Optional.of(parentControllerProperties),
Optional.empty(),
Optional.empty(),
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -63,7 +62,7 @@ public void setUp() {
1,
Optional.empty(),
Optional.of(childControllerProperties),
Optional.of(new VeniceProperties(serverProperties)),
Optional.of(serverProperties),
false);

childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -71,7 +70,7 @@ public void setUp() {
1,
1,
1,
Optional.of(new VeniceProperties(controllerProps)),
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.empty());

Expand Down
Loading

0 comments on commit e6bc2fe

Please sign in to comment.