diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBackedSchemaReader.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBackedSchemaReader.java index 51d18e3fb6..9ca49460a8 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBackedSchemaReader.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBackedSchemaReader.java @@ -324,9 +324,13 @@ private void updateAllValueSchemas(boolean forceRefresh) { valueSchemaIdSet = fetchAllValueSchemaIdsFromRouter(); } catch (Exception e) { LOGGER.warn( - "Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead."); + "Caught exception when trying to fetch all value schema IDs from router, will fetch all value schema entries instead.", + e); // Fall back to fetch all value schema. for (SchemaEntry valueSchemaEntry: fetchAllValueSchemaEntriesFromRouter()) { + if (!isValidSchemaEntry(valueSchemaEntry)) { + continue; + } valueSchemaEntryMap.put(valueSchemaEntry.getId(), valueSchemaEntry); cacheValueAndCanonicalSchemas(valueSchemaEntry.getSchema(), valueSchemaEntry.getId()); } @@ -441,7 +445,7 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() { * one active value schema. */ synchronized (this) { - if (latest != null && !shouldRefreshLatestValueSchemaEntry.get()) { + if (latest != null && !shouldRefreshLatestValueSchemaEntry.get() && isValidSchemaEntry(latest)) { return latest; } updateAllValueSchemaEntriesAndLatestValueSchemaEntry(false); @@ -450,6 +454,9 @@ private SchemaEntry maybeFetchLatestValueSchemaEntry() { latest = latestValueSchemaEntry.get(); } } + if (latest == null || !isValidSchemaEntry(latest)) { + throw new VeniceClientException("Failed to get latest value schema for store: " + storeName); + } return latest; } diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AbstractAvroComputeRequestBuilder.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AbstractAvroComputeRequestBuilder.java index be265614cd..ca8ef1cbb9 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AbstractAvroComputeRequestBuilder.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AbstractAvroComputeRequestBuilder.java @@ -21,6 +21,7 @@ import com.linkedin.venice.compute.protocol.request.DotProduct; import com.linkedin.venice.compute.protocol.request.HadamardProduct; import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType; +import com.linkedin.venice.schema.SchemaData; import com.linkedin.venice.schema.SchemaReader; import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -80,6 +81,9 @@ public abstract class AbstractAvroComputeRequestBuilder implements ComputeReq public AbstractAvroComputeRequestBuilder(AvroGenericReadComputeStoreClient storeClient, SchemaReader schemaReader) { this.latestValueSchemaId = schemaReader.getLatestValueSchemaId(); + if (latestValueSchemaId == SchemaData.INVALID_VALUE_SCHEMA_ID) { + throw new VeniceClientException("Invalid value schema ID: " + latestValueSchemaId); + } this.latestValueSchema = schemaReader.getValueSchema(latestValueSchemaId); if (latestValueSchema.getType() != Schema.Type.RECORD) { throw new VeniceClientException("Only value schema with 'RECORD' type is supported"); diff --git a/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/schema/RouterBackedSchemaReaderTest.java b/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/schema/RouterBackedSchemaReaderTest.java index 07d545319d..9ce30dc5d4 100644 --- a/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/schema/RouterBackedSchemaReaderTest.java +++ b/clients/venice-thin-client/src/test/java/com/linkedin/venice/client/schema/RouterBackedSchemaReaderTest.java @@ -386,7 +386,7 @@ public void testGetLatestValueSchemaWhenNoValueSchema() 0); try (SchemaReader schemaReader = new RouterBackedSchemaReader(() -> mockClient)) { - Assert.assertNull(schemaReader.getLatestValueSchema()); + Assert.assertThrows(VeniceClientException.class, () -> schemaReader.getLatestValueSchema()); Mockito.verify(mockClient, Mockito.timeout(TIMEOUT).times(1)).getRaw(Mockito.anyString()); } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java index 55a3465025..4c52b1e821 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java @@ -311,11 +311,19 @@ private void handleValueSchemaLookup(ChannelHandlerContext ctx, VenicePathParser responseObject.setSuperSetSchemaId(superSetSchemaId); } Collection valueSchemaEntries = schemaRepo.getValueSchemas(storeName); - int schemaNum = valueSchemaEntries.size(); + int schemaNum = (int) valueSchemaEntries.stream().filter(schemaEntry -> schemaEntry.getId() > 0).count(); MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[schemaNum]; int index = 0; for (SchemaEntry entry: valueSchemaEntries) { int schemaId = entry.getId(); + if (schemaId < 1) { + LOGGER.warn( + "Got an invalid schema id ({}) for store {} in handleValueSchemaLookup; will not include this in the {}.", + entry.getId(), + storeName, + responseObject.getClass().getSimpleName()); + continue; + } schemas[index] = new MultiSchemaResponse.Schema(); schemas[index].setId(schemaId); schemas[index].setSchemaStr(entry.getSchema().toString()); @@ -385,6 +393,14 @@ private void handleValueSchemaIdsLookup(ChannelHandlerContext ctx, VenicePathPar } Set schemaIdSet = new HashSet<>(); for (SchemaEntry entry: schemaRepo.getValueSchemas(storeName)) { + if (entry.getId() < 1) { + LOGGER.warn( + "Got an invalid schema id ({}) for store {} in handleValueSchemaIdsLookup; will not include this in the {}.", + entry.getId(), + storeName, + responseObject.getClass().getSimpleName()); + continue; + } schemaIdSet.add(entry.getId()); } responseObject.setSchemaIdSet(schemaIdSet); diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java b/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java index fb3276826b..23cbf5c173 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/TestMetaDataHandler.java @@ -348,7 +348,10 @@ public void testAllValueSchemaIdLookup() throws IOException { ReadOnlySchemaRepository schemaRepo = Mockito.mock(ReadOnlySchemaRepository.class); SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1); SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2); - Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2)).when(schemaRepo).getValueSchemas(storeName); + SchemaEntry valueSchemaEntry3 = new SchemaEntry(-1, valueSchemaStr2); + Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3)) + .when(schemaRepo) + .getValueSchemas(storeName); FullHttpResponse response = passRequestToMetadataHandler( "http://myRouterHost:4567/all_value_schema_ids/" + storeName, null, @@ -412,7 +415,8 @@ public void testAllValueSchemaLookup() throws IOException { SchemaEntry valueSchemaEntry1 = new SchemaEntry(valueSchemaId1, valueSchemaStr1); SchemaEntry valueSchemaEntry2 = new SchemaEntry(valueSchemaId2, valueSchemaStr2); SchemaEntry valueSchemaEntry3 = new SchemaEntry(valueSchemaId3, valueSchemaStr3); - Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3)) + SchemaEntry valueSchemaEntry4 = new SchemaEntry(-1, valueSchemaStr3); + Mockito.doReturn(Arrays.asList(valueSchemaEntry1, valueSchemaEntry2, valueSchemaEntry3, valueSchemaEntry4)) .when(schemaRepo) .getValueSchemas(storeName);