Skip to content

Commit

Permalink
[controller] Fix store creation and handle schema evolution for push …
Browse files Browse the repository at this point in the history
…job details and heartbeat system stores
  • Loading branch information
nisargthakkar committed Jul 10, 2023
1 parent 6ef7955 commit 1ab791e
Show file tree
Hide file tree
Showing 46 changed files with 854 additions and 863 deletions.
16 changes: 0 additions & 16 deletions .github/workflows/gh-ci-pulsar-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,6 @@ name: PulsarVeniceIntegrationCI
on: [push, pull_request, workflow_dispatch]

jobs:
ValidateGradleWrapper:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
runs-on: ${{ matrix.os }}
timeout-minutes: 5
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
- name: Set up Docker
uses: crazy-max/ghaction-setup-docker@v1

PulsarVeniceIntegrationTests:
strategy:
fail-fast: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.metadata.response.MetadataResponseRecord;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.pubsub.api.PubSubPositionWireFormat;
import com.linkedin.venice.pushstatus.PushStatusKey;
import com.linkedin.venice.pushstatus.PushStatusValue;
Expand Down Expand Up @@ -147,6 +148,11 @@ public enum AvroProtocolDefinition {
*/
PUSH_STATUS_SYSTEM_SCHEMA_STORE(1, PushStatusValue.class),

/**
* Value schema for participant system stores.
*/
PARTICIPANT_MESSAGE_SYSTEM_STORE_VALUE(1, ParticipantMessageValue.class),

/**
* Response record for admin request v1.
*/
Expand Down Expand Up @@ -230,7 +236,7 @@ private static Set<Byte> validateMagicBytes() {
* extra header prepended in front. These are either un-evolvable or have their
* schema ID stored out of band from the record.
*
* For example, everything that goes inside of the Put of a {@link KafkaMessageEnvelope}
* For example, everything that goes inside the Put message of a {@link KafkaMessageEnvelope}
* and uses the {@link com.linkedin.venice.kafka.protocol.Put#schemaId} to store
* the protocol version.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.venice.consumer;

import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand Down Expand Up @@ -32,13 +31,15 @@ void extraBeforeClassSetUp(VeniceClusterWrapper cluster, ControllerClient contro
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion());
});

((VeniceHelixAdmin) cluster.getRandomVeniceController().getVeniceAdmin()).addValueSchema(
cluster.getClusterName(),
systemStoreName,
NEW_PROTOCOL_SCHEMA.toString(),
NEW_PROTOCOL_VERSION,
DirectionalSchemaCompatibilityType.NONE,
false);
cluster.getRandomVeniceController()
.getVeniceAdmin()
.addValueSchemaInternal(
cluster.getClusterName(),
systemStoreName,
NEW_PROTOCOL_SCHEMA.toString(),
NEW_PROTOCOL_VERSION,
DirectionalSchemaCompatibilityType.NONE,
false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -56,7 +55,7 @@ public void setUp() {
2,
2,
1,
Optional.of(new VeniceProperties(parentControllerProperties)),
Optional.of(parentControllerProperties),
Optional.empty(),
Optional.empty(),
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -63,7 +62,7 @@ public void setUp() {
1,
Optional.empty(),
Optional.of(childControllerProperties),
Optional.of(new VeniceProperties(serverProperties)),
Optional.of(serverProperties),
false);

childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -71,7 +70,7 @@ public void setUp() {
1,
1,
1,
Optional.of(new VeniceProperties(controllerProps)),
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.empty());

Expand Down
Loading

0 comments on commit 1ab791e

Please sign in to comment.