Skip to content

Commit

Permalink
[router][venice-thin-client] Add safeguard against invalid schema ID (l…
Browse files Browse the repository at this point in the history
…inkedin#1178)

If schema repo returns invalid schema id, that fails client requests. This PR adds safeguard against such invalid schemas in meta data handler and client side schema reader.
---------
Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 authored and m-nagarajan committed Sep 17, 2024
1 parent a82b684 commit 7263e5c
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,13 @@ private void updateAllValueSchemas(boolean forceRefresh) {
valueSchemaIdSet = fetchAllValueSchemaIdsFromRouter();
} catch (Exception e) {
LOGGER.warn(
"Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead.");
"Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead.",
e);
// Fall back to fetch all value schema.
for (SchemaEntry valueSchemaEntry: fetchAllValueSchemaEntriesFromRouter()) {
if (!isValidSchemaEntry(valueSchemaEntry)) {
continue;
}
valueSchemaEntryMap.put(valueSchemaEntry.getId(), valueSchemaEntry);
cacheValueAndCanonicalSchemas(valueSchemaEntry.getSchema(), valueSchemaEntry.getId());
}
Expand Down Expand Up @@ -441,7 +445,7 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() {
* one active value schema.
*/
synchronized (this) {
if (latest != null && !shouldRefreshLatestValueSchemaEntry.get()) {
if (latest != null && !shouldRefreshLatestValueSchemaEntry.get() && isValidSchemaEntry(latest)) {
return latest;
}
updateAllValueSchemaEntriesAndLatestValueSchemaEntry(false);
Expand All @@ -450,6 +454,9 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() {
latest = latestValueSchemaEntry.get();
}
}
if (latest == null || !isValidSchemaEntry(latest)) {
throw new VeniceClientException("Failed to get latest value schema for store: " + storeName);
}
return latest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.venice.compute.protocol.request.DotProduct;
import com.linkedin.venice.compute.protocol.request.HadamardProduct;
import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -80,6 +81,9 @@ public abstract class AbstractAvroComputeRequestBuilder<K> implements ComputeReq

public AbstractAvroComputeRequestBuilder(AvroGenericReadComputeStoreClient storeClient, SchemaReader schemaReader) {
this.latestValueSchemaId = schemaReader.getLatestValueSchemaId();
if (latestValueSchemaId == SchemaData.INVALID_VALUE_SCHEMA_ID) {
throw new VeniceClientException("Invalid value schema ID: " + latestValueSchemaId);
}
this.latestValueSchema = schemaReader.getValueSchema(latestValueSchemaId);
if (latestValueSchema.getType() != Schema.Type.RECORD) {
throw new VeniceClientException("Only value schema with 'RECORD' type is supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public void testGetLatestValueSchemaWhenNoValueSchema()
0);

try (SchemaReader schemaReader = new RouterBackedSchemaReader(() -> mockClient)) {
Assert.assertNull(schemaReader.getLatestValueSchema());
Assert.assertThrows(VeniceClientException.class, () -> schemaReader.getLatestValueSchema());
Mockito.verify(mockClient, Mockito.timeout(TIMEOUT).times(1)).getRaw(Mockito.anyString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,19 @@ private void handleValueSchemaLookup(ChannelHandlerContext ctx, VenicePathParser
responseObject.setSuperSetSchemaId(superSetSchemaId);
}
Collection<SchemaEntry> valueSchemaEntries = schemaRepo.getValueSchemas(storeName);
int schemaNum = valueSchemaEntries.size();
int schemaNum = (int) valueSchemaEntries.stream().filter(schemaEntry -> schemaEntry.getId() > 0).count();
MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[schemaNum];
int index = 0;
for (SchemaEntry entry: valueSchemaEntries) {
int schemaId = entry.getId();
if (schemaId < 1) {
LOGGER.warn(
"Got an invalid schema id ({}) for store {} in handleValueSchemaLookup; will not include this in the {}.",
entry.getId(),
storeName,
responseObject.getClass().getSimpleName());
continue;
}
schemas[index] = new MultiSchemaResponse.Schema();
schemas[index].setId(schemaId);
schemas[index].setSchemaStr(entry.getSchema().toString());
Expand Down Expand Up @@ -385,6 +393,14 @@ private void handleValueSchemaIdsLookup(ChannelHandlerContext ctx, VenicePathPar
}
Set<Integer> schemaIdSet = new HashSet<>();
for (SchemaEntry entry: schemaRepo.getValueSchemas(storeName)) {
if (entry.getId() < 1) {
LOGGER.warn(
"Got an invalid schema id ({}) for store {} in handleValueSchemaIdsLookup; will not include this in the {}.",
entry.getId(),
storeName,
responseObject.getClass().getSimpleName());
continue;
}
schemaIdSet.add(entry.getId());
}
responseObject.setSchemaIdSet(schemaIdSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ public void testAllValueSchemaIdLookup() throws IOException {
ReadOnlySchemaRepository schemaRepo = Mockito.mock(ReadOnlySchemaRepository.class);
SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1);
SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2)).when(schemaRepo).getValueSchemas(storeName);
SchemaEntry valueSchemaEntry3 = new SchemaEntry(-1, valueSchemaStr2);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3))
.when(schemaRepo)
.getValueSchemas(storeName);
FullHttpResponse response = passRequestToMetadataHandler(
"http://myRouterHost:4567/all_value_schema_ids/" + storeName,
null,
Expand Down Expand Up @@ -412,7 +415,8 @@ public void testAllValueSchemaLookup() throws IOException {
SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1);
SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2);
SchemaEntry valueSchemaEntry3 = new SchemaEntry(valueSchemaId3, valueSchemaStr3);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3))
SchemaEntry valueSchemaEntry4 = new SchemaEntry(-1, valueSchemaStr3);
Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3, valueSchemaEntry4))
.when(schemaRepo)
.getValueSchemas(storeName);

Expand Down

0 comments on commit 7263e5c

Please sign in to comment.