Skip to content

Commit

Permalink
[controller] Improve update store workflow and reduce Zk metadata upd…
Browse files Browse the repository at this point in the history
…ates
  • Loading branch information
nisargthakkar committed Jul 8, 2024
1 parent 1894f95 commit 817f0fc
Show file tree
Hide file tree
Showing 99 changed files with 5,416 additions and 5,027 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,6 @@ public static void main(String[] args) throws Exception {
case REMOVE_FROM_STORE_ACL:
removeFromStoreAcl(cmd);
break;
case ENABLE_NATIVE_REPLICATION_FOR_CLUSTER:
enableNativeReplicationForCluster(cmd);
break;
case DISABLE_NATIVE_REPLICATION_FOR_CLUSTER:
disableNativeReplicationForCluster(cmd);
break;
case ENABLE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER:
enableActiveActiveReplicationForCluster(cmd);
break;
Expand Down Expand Up @@ -1115,7 +1109,11 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
booleanParam(cmd, Arg.RMD_CHUNKING_ENABLED, p -> params.setRmdChunkingEnabled(p), argSet);
integerParam(cmd, Arg.BATCH_GET_LIMIT, p -> params.setBatchGetLimit(p), argSet);
integerParam(cmd, Arg.NUM_VERSIONS_TO_PRESERVE, p -> params.setNumVersionsToPreserve(p), argSet);
booleanParam(cmd, Arg.INCREMENTAL_PUSH_ENABLED, p -> params.setIncrementalPushEnabled(p), argSet);
booleanParam(cmd, Arg.INCREMENTAL_PUSH_ENABLED, p -> {
System.out.println(
"Setting incremental push config is deprecated. Please set the appropriate source-of-truth configs.");
params.setIncrementalPushEnabled(p);
}, argSet);
booleanParam(cmd, Arg.WRITE_COMPUTATION_ENABLED, p -> params.setWriteComputationEnabled(p), argSet);
booleanParam(cmd, Arg.READ_COMPUTATION_ENABLED, p -> params.setReadComputationEnabled(p), argSet);
integerParam(
Expand Down Expand Up @@ -1807,7 +1805,7 @@ public static void checkMigrationStatus(
printSystemStoreMigrationStatus(destControllerClient, storeName, printFunction);
} else {
// This is a parent controller
System.err.println("\n=================== Parent Controllers ====================");
printFunction.apply("\n=================== Parent Controllers ====================");
printMigrationStatus(srcControllerClient, storeName, printFunction);
printMigrationStatus(destControllerClient, storeName, printFunction);

Expand All @@ -1818,7 +1816,7 @@ public static void checkMigrationStatus(
Map<String, ControllerClient> destChildControllerClientMap = getControllerClientMap(destClusterName, response);

for (Map.Entry<String, ControllerClient> entry: srcChildControllerClientMap.entrySet()) {
System.err.println("\n\n=================== Child Datacenter " + entry.getKey() + " ====================");
printFunction.apply("\n\n=================== Child Datacenter " + entry.getKey() + " ====================");

ControllerClient srcChildController = entry.getValue();
ControllerClient destChildController = destChildControllerClientMap.get(entry.getKey());
Expand Down Expand Up @@ -2583,34 +2581,6 @@ private static void removeFromStoreAcl(CommandLine cmd) throws Exception {
}
}

private static void enableNativeReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String sourceRegionParam = getOptionalArgument(cmd, Arg.NATIVE_REPLICATION_SOURCE_FABRIC);
Optional<String> sourceRegion =
StringUtils.isEmpty(sourceRegionParam) ? Optional.empty() : Optional.of(sourceRegionParam);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Optional<String> regionsFilter =
StringUtils.isEmpty(regionsFilterParam) ? Optional.empty() : Optional.of(regionsFilterParam);

ControllerResponse response =
controllerClient.configureNativeReplicationForCluster(true, storeType, sourceRegion, regionsFilter);
printObject(response);
}

private static void disableNativeReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String sourceFabricParam = getOptionalArgument(cmd, Arg.NATIVE_REPLICATION_SOURCE_FABRIC);
Optional<String> sourceFabric =
StringUtils.isEmpty(sourceFabricParam) ? Optional.empty() : Optional.of(sourceFabricParam);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Optional<String> regionsFilter =
StringUtils.isEmpty(regionsFilterParam) ? Optional.empty() : Optional.of(regionsFilterParam);

ControllerResponse response =
controllerClient.configureNativeReplicationForCluster(false, storeType, sourceFabric, regionsFilter);
printObject(response);
}

private static void enableActiveActiveReplicationForCluster(CommandLine cmd) {
String storeType = getRequiredArgument(cmd, Arg.STORE_TYPE);
String regionsFilterParam = getOptionalArgument(cmd, Arg.REGIONS_FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,6 @@ public enum Command {
"remove-from-store-acl", "Remove a principal from ACL's for an existing store",
new Arg[] { URL, STORE, PRINCIPAL }, new Arg[] { CLUSTER, READABILITY, WRITEABILITY }
),
ENABLE_NATIVE_REPLICATION_FOR_CLUSTER(
"enable-native-replication-for-cluster",
"enable native replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
new Arg[] { URL, STORE_TYPE }, new Arg[] { CLUSTER, REGIONS_FILTER, NATIVE_REPLICATION_SOURCE_FABRIC }
),
DISABLE_NATIVE_REPLICATION_FOR_CLUSTER(
"disable-native-replication-for-cluster",
"disable native replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
new Arg[] { URL, CLUSTER, STORE_TYPE }, new Arg[] { REGIONS_FILTER, NATIVE_REPLICATION_SOURCE_FABRIC }
),
ENABLE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER(
"enable-active-active-replication-for-cluster",
"enable active active replication for certain stores (batch-only, hybrid-only, incremental-push, hybrid-or-incremental, all) in a cluster",
Expand Down
2 changes: 1 addition & 1 deletion clients/venice-pulsar/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ cd ~/src/venice
```shell
java -jar bin/venice-admin-tool-all.jar --new-store --url http://venice-controller:5555 --cluster venice-cluster0 --store t1_n1_s1 --key-schema-file /tmp/key.asvc --value-schema-file /tmp/value.asvc

java -jar bin/venice-admin-tool-all.jar --update-store --url http://venice-controller:5555 --cluster venice-cluster0 --store t1_n1_s1 --storage-quota -1 --incremental-push-enabled true
java -jar bin/venice-admin-tool-all.jar --update-store --url http://venice-controller:5555 --cluster venice-cluster0 --store t1_n1_s1 --storage-quota -1 --hybrid-rewind-seconds 86400 --hybrid-offset-lag 1000 --hybrid-data-replication-policy NONE
java -jar bin/venice-admin-tool-all.jar --update-store --url http://venice-controller:5555 --cluster venice-cluster0 --store t1_n1_s1 --read-quota 1000000

java -jar bin/venice-admin-tool-all.jar --empty-push --url http://venice-controller:5555 --cluster venice-cluster0 --store t1_n1_s1 --push-id init --store-size 1000
Expand Down
4 changes: 2 additions & 2 deletions docker/venice-client/create-store.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ jar=/opt/venice/bin/venice-admin-tool-all.jar
# create store
java -jar $jar --new-store --url $url --cluster $clusterName --store $storeName --key-schema-file $keySchema --value-schema-file $valueSchema

# update quota and enabled incremental push
java -jar $jar --update-store --url $url --cluster $clusterName --store $storeName --storage-quota -1 --incremental-push-enabled true
# update quota and enable hybrid to allow incremental push
java -jar $jar --update-store --url $url --cluster $clusterName --store $storeName --storage-quota -1 --hybrid-rewind-seconds 86400 --hybrid-offset-lag 1000 --hybrid-data-replication-policy NONE
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ public class VeniceConstants {

public static final String SYSTEM_PROPERTY_FOR_APP_RUNNING_REGION = "com.linkedin.app.env";

// public static final String TIMESTAMP_FIELD_NAME = "timestamp"; //
//
// public static final String REPLICATION_CHECKPOINT_VECTOR_FIELD = "replication_checkpoint_vector";

/**
* This is a sentinel value to be used in TopicSwitch message rewindStartTimestamp field between controller and server.
* When controller specifies this, Leader server nodes will calculate the rewind start time itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,39 +144,6 @@ private ConfigKeys() {
*/
public static final String KAFKA_REPLICATION_FACTOR_RT_TOPICS = "kafka.replication.factor.rt.topics";

/**
* TODO: the following 3 configs will be deprecated after the native replication migration is changed to a two-step
* process: 1. Turn on the cluster level config that takes care of newly created stores; 2. Run admin command
* to convert existing stores to native replication.
*/
/**
* Cluster-level config to enable native replication for all batch-only stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_FOR_BATCH_ONLY = "enable.native.replication.for.batch.only";

/**
* Cluster-level config to enable native replication for all hybrid stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_FOR_HYBRID = "enable.native.replication.for.hybrid";

/**
* Cluster-level config to enable native replication for new batch-only stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY =
"enable.native.replication.as.default.for.batch.only";

/**
* Cluster-level config to enable native replication for new hybrid stores.
*/
public static final String ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_HYBRID =
"enable.native.replication.as.default.for.hybrid";

/**
* Cluster-level config to enable active-active replication for new batch-only stores.
*/
public static final String ENABLE_ACTIVE_ACTIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY_STORE =
"enable.active.active.replication.as.default.for.batch.only.store";

/**
* Cluster-level config to enable active-active replication for new hybrid stores.
*/
Expand All @@ -189,7 +156,7 @@ private ConfigKeys() {
public static final String ENABLE_BLOB_TRANSFER = "enable.blob.transfer";

/**
* Sets the default for whether or not do schema validation for all stores
* Sets the default for whether to do schema validation or not for all stores
*/
public static final String CONTROLLER_SCHEMA_VALIDATION_ENABLED = "controller.schema.validation.enabled";

Expand Down Expand Up @@ -304,17 +271,18 @@ private ConfigKeys() {
public static final String CONTROLLER_ENFORCE_SSL = "controller.enforce.ssl";

/**
* Whether child controllers will directly consume the source admin topic in the parent Kafka cluster.
* This config specifies if Venice is deployed in a multi-region mode
*/
public static final String ADMIN_TOPIC_REMOTE_CONSUMPTION_ENABLED = "admin.topic.remote.consumption.enabled";
public static final String MULTI_REGION = "multi.region";

/**
* This config defines the source region name of the admin topic
*/
public static final String ADMIN_TOPIC_SOURCE_REGION = "admin.topic.source.region";

/**
* This following config defines whether admin consumption should be enabled or not, and this config will only control the behavior in Child Controller.
* This following config defines whether admin consumption should be enabled or not, and this config will only control
* the behavior in Child Controller. This is used for store migration.
*/
public static final String CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED =
"child.controller.admin.topic.consumption.enabled";
Expand Down Expand Up @@ -359,8 +327,15 @@ private ConfigKeys() {
"controller.store.graveyard.cleanup.sleep.interval.between.list.fetch.minutes";

/**
* Whether the superset schema generation in Parent Controller should be done via passed callback or not.
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
public static final String CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.external.superset.schema.generation.enabled";

/**
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
@Deprecated
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

Expand Down Expand Up @@ -1182,12 +1157,6 @@ private ConfigKeys() {
public static final String NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_HYBRID_STORES =
"native.replication.source.fabric.as.default.for.hybrid.stores";

/**
* We will use this config to determine whether we should enable incremental push for hybrid active-active user stores.
* If this config is set to true, we will enable incremental push for hybrid active-active user stores.
*/
public static final String ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES =
"enable.incremental.push.for.hybrid.active.active.user.stores";
/**
* We will use this config to determine whether we should enable partial update for hybrid active-active user stores.
* If this config is set to true, we will enable partial update for hybrid active-active user stores whose latest value
Expand All @@ -1211,7 +1180,7 @@ private ConfigKeys() {

// go/inclusivecode deprecated(alias="child.cluster.allowlist")
@Deprecated
public static final String CHILD_CLUSTER_WHITELIST = "child.cluster.whitelist";
public static final String CHILD_CLUSTER_ALLOWLIST_LEGACY = "child.cluster.whitelist";

/**
* Only required when controller.parent.mode=true
Expand All @@ -1236,7 +1205,7 @@ private ConfigKeys() {

// go/inclusivecode deprecated(alias="native.replication.fabric.allowlist")
@Deprecated
public static final String NATIVE_REPLICATION_FABRIC_WHITELIST = "native.replication.fabric.whitelist";
public static final String NATIVE_REPLICATION_FABRIC_ALLOWLIST_LEGACY = "native.replication.fabric.whitelist";

/**
* Previously {@link #CHILD_CLUSTER_ALLOWLIST} is used to also represent the allowlist of source fabrics
Expand All @@ -1255,15 +1224,9 @@ private ConfigKeys() {
public static final String PARENT_KAFKA_CLUSTER_FABRIC_LIST = "parent.kafka.cluster.fabric.list";

/**
* Whether A/A is enabled on the controller. When it is true, all A/A required config (e.g.
* {@link #ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST}) must be set.
*/
public static final String ACTIVE_ACTIVE_ENABLED_ON_CONTROLLER = "active.active.enabled.on.controller";

/**
* A list of fabrics that are source(s) of the active active real time replication. When active-active replication
* is enabled on the controller {@link #ACTIVE_ACTIVE_ENABLED_ON_CONTROLLER} is true, this list should contain fabrics
* where the Venice server should consume from when it accepts the TS (TopicSwitch) message.
* A list of regions that are source(s) of the Active/Active real time replication. When running in a multi-region
* mode, this list should contain region names where the Venice server should consume from when it accepts the
* TS (TopicSwitch) message.
* Example value of this config: "dc-0, dc-1, dc-2".
*/
public static final String ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST = "active.active.real.time.source.fabric.list";
Expand Down Expand Up @@ -1561,16 +1524,6 @@ private ConfigKeys() {
*/
public static final String CONTROLLER_HAAS_SUPER_CLUSTER_NAME = "controller.haas.super.cluster.name";

/**
* Whether to enable batch push (including GF job) from Admin in Child Controller.
* In theory, we should disable batch push in Child Controller no matter what, but the fact is that today there are
* many tests, which are doing batch pushes to an individual cluster setup (only Child Controller), so disabling batch push from Admin
* in Child Controller will require a lot of refactoring.
* So the current strategy is to enable it by default, but disable it in EI and PROD.
*/
public static final String CONTROLLER_ENABLE_BATCH_PUSH_FROM_ADMIN_IN_CHILD =
"controller.enable.batch.push.from.admin.in.child";

/**
* A config that turns the key/value profiling stats on and off. This config can be placed in both Router and SNs and it
* is off by default. When switching it on, We will emit a fine grained histogram that reflects the distribution of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public class ControllerApiConstants {

public static final String UNUSED_SCHEMA_DELETION_ENABLED = "unused_schema_deletion_enabled";

public static final String BLOB_TRANSFER_ENABLED = "blob.transfer.enabled";
public static final String BLOB_TRANSFER_ENABLED = "blob_transfer_enabled";

public static final String HEARTBEAT_TIMESTAMP = "heartbeat_timestamp";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LOCKED_NODE_ID_LIST_SEPARATOR;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.LOCKED_STORAGE_NODE_IDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.NATIVE_REPLICATION_SOURCE_FABRIC;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.OFFSET;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.OPERATION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.OWNER;
Expand Down Expand Up @@ -1101,19 +1100,6 @@ public SystemStoreHeartbeatResponse getHeartbeatFromSystemStore(String storeName
SystemStoreHeartbeatResponse.class);
}

public ControllerResponse configureNativeReplicationForCluster(
boolean enableNativeReplication,
String storeType,
Optional<String> sourceFabric,
Optional<String> regionsFilter) {
// Verify the input storeType is valid
VeniceUserStoreType.valueOf(storeType.toUpperCase());
QueryParams params = newParams().add(STATUS, enableNativeReplication).add(STORE_TYPE, storeType);
sourceFabric.ifPresent(s -> params.add(NATIVE_REPLICATION_SOURCE_FABRIC, s));
regionsFilter.ifPresent(f -> params.add(REGIONS_FILTER, f));
return request(ControllerRoute.CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER, params, ControllerResponse.class);
}

public ControllerResponse configureActiveActiveReplicationForCluster(
boolean enableActiveActiveReplication,
String storeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,6 @@ public enum ControllerRoute {
UPDATE_ACL("/update_acl", HttpMethod.POST, Arrays.asList(CLUSTER, NAME, ACCESS_PERMISSION)),
GET_ACL("/get_acl", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)),
DELETE_ACL("/delete_acl", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)),
CONFIGURE_NATIVE_REPLICATION_FOR_CLUSTER(
"/configure_native_replication_for_cluster", HttpMethod.POST, Arrays.asList(CLUSTER, STORE_TYPE, STATUS)
),
CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER(
"/configure_active_active_replication_for_cluster", HttpMethod.POST, Arrays.asList(CLUSTER, STORE_TYPE, STATUS)
), GET_DELETABLE_STORE_TOPICS("/get_deletable_store_topics", HttpMethod.GET, Collections.emptyList()),
Expand Down
Loading

0 comments on commit 817f0fc

Please sign in to comment.