From 5f1029a6419951ec51f6806734439f239a1f2dec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Mon, 27 May 2024 11:29:23 +0100 Subject: [PATCH 1/3] storage: Fix VariantSearchTest. #TASK-6136 --- .../core/variant/search/VariantSearchTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/search/VariantSearchTest.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/search/VariantSearchTest.java index 56588d25863..17d7d073c51 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/search/VariantSearchTest.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/search/VariantSearchTest.java @@ -187,15 +187,16 @@ public void testSpecialCharacter() throws Exception { List variants = getVariants(limit); List annotatedVariants = annotatedVariants(variants); - String studyId = "abyu12"; - String fileId = "a.vcf"; + String study = "abyu12"; + String file = "a.vcf"; - variants.get(0).getStudies().get(0).getFiles().get(0).setFileId(fileId); + variants.get(0).getStudies().get(0).getFiles().get(0).setFileId(file); System.out.println(variants.get(0).getStudies().get(0).getFiles().get(0).getFileId()); //System.exit(-1); - scm.createStudy(studyId); - + int studyId = scm.createStudy(study).getId(); + int fileId = scm.registerFile(studyId, file, Arrays.asList("A-A", "B", "C", "D")); + scm.addIndexedFiles(studyId, Collections.singletonList(fileId)); String collection = solr.coreName; variantSearchManager.create(collection); @@ -204,13 +205,13 @@ public void testSpecialCharacter() throws Exception { samplePosition.put("B", 1); samplePosition.put("C", 2); samplePosition.put("D", 3); - annotatedVariants.get(0).getStudies().get(0).setStudyId(studyId).setSortedSamplesPosition(samplePosition); + annotatedVariants.get(0).getStudies().get(0).setStudyId(study).setSortedSamplesPosition(samplePosition); variantSearchManager.insert(collection, annotatedVariants); Query query = new Query(); - query.put(VariantQueryParam.STUDY.key(), studyId); + query.put(VariantQueryParam.STUDY.key(), study); // query.put(VariantQueryParam.SAMPLE.key(), samplePosition.keySet().toArray()[0]); - query.put(VariantQueryParam.FILE.key(), fileId); + query.put(VariantQueryParam.FILE.key(), file); query.put(VariantQueryParam.FILTER.key(), "PASS"); query.put(VariantQueryParam.ANNOT_CLINICAL_SIGNIFICANCE.key(), "benign"); VariantQueryResult results = variantSearchManager.query(collection, variantStorageEngine.parseQuery(query, From 9f46a7cf2c3c95bfb6a10bd7d834e4b7c4d58b50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Tue, 28 May 2024 17:20:02 +0100 Subject: [PATCH 2/3] storage: Fix tests from HadoopVariantStorageEngineSplitDataTest #TASK-6136 --- ...adoopVariantStorageEngineSplitDataTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngineSplitDataTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngineSplitDataTest.java index e4f1f2e14b4..1401a7b5ad9 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngineSplitDataTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngineSplitDataTest.java @@ -224,6 +224,10 @@ public void testMultiChromosomeSplitData() throws Exception { } private void failAtLoadingFile(String x, String file1, URI outputUri) throws Exception { + failAtLoadingFile(x, file1, outputUri, 1); + } + + private void failAtLoadingFile(String x, String file1, URI outputUri, int expectedRunningTasks) throws Exception { try { VariantStorageEngine engine = getMockedStorageEngine(new ObjectMap(VariantStorageOptions.STUDY.key(), STUDY_NAME)); engine.index(Collections.singletonList(getResourceUri(x + file1)), outputUri); @@ -236,11 +240,13 @@ private void failAtLoadingFile(String x, String file1, URI outputUri) throws Exc assertEquals(TaskMetadata.Status.NONE, fileMetadata.getIndexStatus()); List runningTasks = new ArrayList<>(); metadataManager.getRunningTasks(studyId).forEach(runningTasks::add); - assertEquals(1, runningTasks.size()); - assertEquals(TaskMetadata.Type.LOAD, runningTasks.get(0).getType()); - assertEquals(TaskMetadata.Status.RUNNING, runningTasks.get(0).currentStatus()); - assertEquals(Arrays.asList(fileMetadata.getId()), runningTasks.get(0).getFileIds()); + assertEquals(expectedRunningTasks, runningTasks.size()); + TaskMetadata taskMetadata = runningTasks.get(runningTasks.size() - 1); + assertEquals(TaskMetadata.Type.LOAD, taskMetadata.getType()); + assertEquals(TaskMetadata.Status.RUNNING, taskMetadata.currentStatus()); + assertEquals(Arrays.asList(fileMetadata.getId()), taskMetadata.getFileIds()); } catch (AssertionError error) { + error.addSuppressed(e); e.printStackTrace(); throw error; } @@ -594,7 +600,7 @@ public void testLoadMultiFileDataConcurrencyDeleteMany() throws Exception { String file2 = "1K.end.platinum-genomes-vcf-NA12878_S1.vcf.gz"; failAtLoadingFile(resourceDir, file1, outDir); - failAtLoadingFile(resourceDir, file2, outDir); + failAtLoadingFile(resourceDir, file2, outDir, 2); // try { // getMockedStorageEngine().index(Collections.singletonList(getResourceUri(resourceDir + file1)), outDir); // fail("Should have thrown an exception"); @@ -656,7 +662,7 @@ public void testLoadMultiFileDataConcurrencyFail() throws Exception { variantStorageEngine.getOptions().put(VariantStorageOptions.LOAD_MULTI_FILE_DATA.key(), true); variantStorageEngine.getOptions().put(VariantStorageOptions.RESUME.key(), true); - variantStorageEngine.index(Collections.singletonList(getResourceUri(file1)), outDir); + variantStorageEngine.index(Collections.singletonList(getResourceUri(resourceDir + file1)), outDir); } From 1c7db628514e40d960c97fdea2b1a782e67b698a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 31 May 2024 14:48:15 +0100 Subject: [PATCH 3/3] storage: Improve testing SampleIndexAggregation of intergenic queries. #TASK-6136 --- .../index/sample/SampleIndexQueryParser.java | 90 ++++++++++++------- .../sample/SampleIndexQueryParserTest.java | 18 +++- .../variant/index/sample/SampleIndexTest.java | 44 ++++++--- 3 files changed, 109 insertions(+), 43 deletions(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParser.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParser.java index 6137c5bd6c1..803fa75fea5 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParser.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParser.java @@ -1216,18 +1216,7 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema CtBtFtCombinationIndexSchema.Filter ctBtTfFilter = schema.getCtBtTfIndex().getField().noOpFilter(); IndexFilter clinicalFilter = schema.getClinicalIndexSchema().noOpFilter(); - Boolean intergenic = null; - - ParsedVariantQuery.VariantQueryXref variantQueryXref = VariantQueryParser.parseXrefs(query); - if (!isValidParam(query, REGION)) { - if (!variantQueryXref.getGenes().isEmpty() - && variantQueryXref.getIds().isEmpty() - && variantQueryXref.getOtherXrefs().isEmpty() - && variantQueryXref.getVariants().isEmpty()) { - // If only filtering by genes, is not intergenic. - intergenic = false; - } - } + final Boolean intergenic = isIntergenicQuery(query); // BiotypeConsquenceTypeFlagCombination combination = BiotypeConsquenceTypeFlagCombination // .fromQuery(query, Arrays.asList(schema.getTranscriptFlagIndexSchema().getField().getConfiguration().getValues())); @@ -1237,18 +1226,10 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema boolean tfCovered = false; if (isValidParam(query, ANNOT_CONSEQUENCE_TYPE)) { - List soNames = query.getAsStringList(VariantQueryParam.ANNOT_CONSEQUENCE_TYPE.key()); - soNames = soNames.stream() + List soNames = query.getAsStringList(VariantQueryParam.ANNOT_CONSEQUENCE_TYPE.key()) + .stream() .map(ct -> ConsequenceTypeMappings.accessionToTerm.get(VariantQueryUtils.parseConsequenceType(ct))) .collect(Collectors.toList()); - if (!soNames.contains(VariantAnnotationConstants.INTERGENIC_VARIANT) - && !soNames.contains(VariantAnnotationConstants.REGULATORY_REGION_VARIANT) - && !soNames.contains(VariantAnnotationConstants.TF_BINDING_SITE_VARIANT)) { - // All ct values but "intergenic_variant" and "regulatory_region_variant" are in genes (i.e. non-intergenic) - intergenic = false; - } else if (soNames.size() == 1 && soNames.contains(VariantAnnotationConstants.INTERGENIC_VARIANT)) { - intergenic = true; - } // else, leave undefined : intergenic = null boolean ctFilterCoveredBySummary = false; boolean ctBtCombinationCoveredBySummary = false; if (SampleIndexSchema.CUSTOM_LOF.containsAll(soNames)) { @@ -1295,14 +1276,17 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema } } - // Do not use ctIndex if the CT filter is covered by the summary - // Use the ctIndex if: + // Do not use ctIndex for intergenic queries (intergenic == true) + // or queries that might return intergenic variants (intergenic == null) + // + // Use the ctIndex if any of: // - The CtFilter is not covered by the summary // - The query has the combination CT+BT , and it is not covered by the summary // - The query has the combination CT+TF - boolean useCtIndexFilter = !ctFilterCoveredBySummary - || (!ctBtCombinationCoveredBySummary && combination.isBiotype()) - || combination.isFlag(); + boolean useCtIndexFilter = + intergenic == Boolean.FALSE && (!ctFilterCoveredBySummary + || (!ctBtCombinationCoveredBySummary && combination.isBiotype()) + || combination.isFlag()); if (useCtIndexFilter) { ctCovered = completeIndex; consequenceTypeFilter = schema.getCtIndex().getField().buildFilter(new OpValue<>("=", soNames)); @@ -1317,8 +1301,6 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema } if (isValidParam(query, ANNOT_BIOTYPE)) { - // All biotype values are in genes (i.e. non-intergenic) - intergenic = false; boolean biotypeFilterCoveredBySummary = false; List biotypes = query.getAsStringList(VariantQueryParam.ANNOT_BIOTYPE.key()); if (BIOTYPE_SET.containsAll(biotypes)) { @@ -1350,8 +1332,6 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema List transcriptFlags = query.getAsStringList(ANNOT_TRANSCRIPT_FLAG.key()); tfFilter = schema.getTranscriptFlagIndexSchema().getField().buildFilter(new OpValue<>("=", transcriptFlags)); tfCovered = completeIndex & tfFilter.isExactFilter(); - // Transcript flags are in transcripts/genes. (i.e. non-intergenic) - intergenic = false; // TranscriptFlag filter is covered by index if (tfCovered) { if (!isValidParam(query, GENE) && simpleCombination(combination)) { @@ -1538,12 +1518,60 @@ protected SampleAnnotationIndexQuery parseAnnotationIndexQuery(SampleIndexSchema // If intergenic is undefined, or true, CT and BT filters can not be used. biotypeFilter = schema.getBiotypeIndex().getField().noOpFilter(); consequenceTypeFilter = schema.getCtIndex().getField().noOpFilter(); + if (!biotypeFilter.isNoOp()) { + throw new IllegalStateException("Unexpected BT filter for intergenic=" + intergenic); + } + if (!consequenceTypeFilter.isNoOp()) { + throw new IllegalStateException("Unexpected CT filter for intergenic=" + intergenic); + } } return new SampleAnnotationIndexQuery(new byte[]{annotationIndexMask, annotationIndex}, consequenceTypeFilter, biotypeFilter, tfFilter, ctBtTfFilter, clinicalFilter, populationFrequencyFilter); } + private Boolean isIntergenicQuery(Query query) { + ParsedVariantQuery.VariantQueryXref variantQueryXref = VariantQueryParser.parseXrefs(query); + if (!isValidParam(query, REGION)) { + if (!variantQueryXref.getGenes().isEmpty() + && variantQueryXref.getIds().isEmpty() + && variantQueryXref.getOtherXrefs().isEmpty() + && variantQueryXref.getVariants().isEmpty()) { + // If only filtering by genes, is not intergenic. + return false; + } + } + + if (isValidParam(query, ANNOT_BIOTYPE)) { + // All biotype values are in genes (i.e. non-intergenic) + return false; + } + if (isValidParam(query, ANNOT_BIOTYPE)) { + // All biotype values are in genes (i.e. non-intergenic) + return false; + } + if (isValidParam(query, ANNOT_TRANSCRIPT_FLAG)) { + // Transcript flags are in transcripts/genes. (i.e. non-intergenic) + return false; + } + if (isValidParam(query, ANNOT_CONSEQUENCE_TYPE)) { + List soNames = query.getAsStringList(VariantQueryParam.ANNOT_CONSEQUENCE_TYPE.key()); + soNames = soNames.stream() + .map(ct -> ConsequenceTypeMappings.accessionToTerm.get(VariantQueryUtils.parseConsequenceType(ct))) + .collect(Collectors.toList()); + if (!soNames.contains(VariantAnnotationConstants.INTERGENIC_VARIANT) + && !soNames.contains(VariantAnnotationConstants.REGULATORY_REGION_VARIANT) + && !soNames.contains(VariantAnnotationConstants.TF_BINDING_SITE_VARIANT)) { + // All ct values but "intergenic_variant" and "regulatory_region_variant" are in genes (i.e. non-intergenic) + return false; + } else if (soNames.size() == 1 && soNames.contains(VariantAnnotationConstants.INTERGENIC_VARIANT)) { + return true; + } // else, leave undefined : intergenic = null + } + // Unable to determine if the query is intergenic or not. Return null for uncertain. + return null; + } + private boolean simpleCombination(BiotypeConsquenceTypeFlagCombination combination) { return combination.numParams() == 1; } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParserTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParserTest.java index 7ae4e36f3ec..212b0521b26 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParserTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexQueryParserTest.java @@ -1384,7 +1384,10 @@ public void parseIntergenicTest() { checkIntergenic(true, new Query(ANNOT_CONSEQUENCE_TYPE.key(), "intergenic_variant")); checkIntergenic(null, new Query(ANNOT_CONSEQUENCE_TYPE.key(), "missense_variant,intergenic_variant")); checkIntergenic(null, new Query(ANNOT_CONSEQUENCE_TYPE.key(), "intergenic_variant,missense_variant")); - + checkIntergenic(null, new Query(ANNOT_CONSEQUENCE_TYPE.key(), VariantAnnotationConstants.REGULATORY_REGION_VARIANT)); + checkIntergenic(false, new Query(ANNOT_CONSEQUENCE_TYPE.key(), VariantAnnotationConstants.REGULATORY_REGION_VARIANT) + .append(ANNOT_BIOTYPE.key(), "protein_coding")); + // Nonsense combination checkIntergenic(false, new Query(ANNOT_CONSEQUENCE_TYPE.key(), "intergenic_variant").append(ANNOT_BIOTYPE.key(), "protein_coding")); } @@ -1570,6 +1573,19 @@ public void testCoveredQuery_ct() { parseAnnotationIndexQuery(query, true); assertTrue(query.isEmpty()); + query = new Query().append(ANNOT_CONSEQUENCE_TYPE.key(), String.join(OR, VariantAnnotationConstants.REGULATORY_REGION_VARIANT)); + parseAnnotationIndexQuery(query, true); + indexQuery = parseAnnotationIndexQuery(query, true); + assertTrue(indexQuery.getConsequenceTypeFilter().isNoOp()); + assertFalse(query.isEmpty()); // regulatory_region_variant can't be used for CT filter alone + + query = new Query().append(ANNOT_CONSEQUENCE_TYPE.key(), String.join(OR, VariantAnnotationConstants.REGULATORY_REGION_VARIANT)) + .append(ANNOT_BIOTYPE.key(), "protein_coding"); + indexQuery = parseAnnotationIndexQuery(query, true); + assertFalse(indexQuery.getConsequenceTypeFilter().isNoOp()); + assertFalse(indexQuery.getBiotypeFilter().isNoOp()); + assertTrue(query.isEmpty()); // regulatory_region_variant can be used together with biotype + query = new Query().append(ANNOT_CONSEQUENCE_TYPE.key(), String.join(OR, VariantAnnotationConstants.STOP_LOST)); parseAnnotationIndexQuery(query, false); indexQuery = parseAnnotationIndexQuery(query, false); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java index e15cb6ae4bb..9ee362f8724 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/index/sample/SampleIndexTest.java @@ -897,29 +897,55 @@ public void testAggregationCorrectnessCt() throws Exception { @Test public void testAggregationCorrectnessTFBS() throws Exception { - testAggregationCorrectness(TF_BINDING_SITE_VARIANT, true); + // Special scenario. This CT might include intergenic values, so can't be used alone + testAggregationCorrectness(new Query(ANNOT_BIOTYPE.key(), "protein_coding"), TF_BINDING_SITE_VARIANT); } @Test public void testAggregationCorrectnessRegulatoryRegionVariant() throws Exception { - testAggregationCorrectness(REGULATORY_REGION_VARIANT); + // Special scenario. This CT might include intergenic values, so can't be used alone + testAggregationCorrectness(new Query(ANNOT_BIOTYPE.key(), "protein_coding"), + REGULATORY_REGION_VARIANT); + } + + @Test + public void testAggregationByIntergenicQuery() throws Exception { + SampleIndexVariantAggregationExecutor executor = new SampleIndexVariantAggregationExecutor(metadataManager, sampleIndexDBAdaptor); + + Query baseQuery = new Query(STUDY.key(), STUDY_NAME_3) + .append(SAMPLE.key(), "NA12877"); + + assertFalse(executor.canUseThisExecutor(new Query(baseQuery) + .append(ANNOT_CONSEQUENCE_TYPE.key(), REGULATORY_REGION_VARIANT), new QueryOptions(QueryOptions.FACET, "consequenceType"))); + assertFalse(executor.canUseThisExecutor(new Query(baseQuery) + .append(ANNOT_CONSEQUENCE_TYPE.key(), TF_BINDING_SITE_VARIANT), new QueryOptions(QueryOptions.FACET, "consequenceType"))); + + assertTrue(executor.canUseThisExecutor(new Query(baseQuery) + .append(ANNOT_CONSEQUENCE_TYPE.key(), REGULATORY_REGION_VARIANT) + .append(ANNOT_BIOTYPE.key(), "protein_coding"), + new QueryOptions(QueryOptions.FACET, "consequenceType"))); + assertTrue(executor.canUseThisExecutor(new Query(baseQuery) + .append(ANNOT_CONSEQUENCE_TYPE.key(), TF_BINDING_SITE_VARIANT) + .append(ANNOT_BIOTYPE.key(), "protein_coding"), + new QueryOptions(QueryOptions.FACET, "consequenceType"))); } private void testAggregationCorrectness(String ct) throws Exception { - testAggregationCorrectness(ct, false); + testAggregationCorrectness(new Query(), ct); } - private void testAggregationCorrectness(String ct, boolean sampleIndexMightBeMoreAccurate) throws Exception { + private void testAggregationCorrectness(Query baseQuery, String ct) throws Exception { SampleIndexVariantAggregationExecutor executor = new SampleIndexVariantAggregationExecutor(metadataManager, sampleIndexDBAdaptor); - Query query = new Query(STUDY.key(), STUDY_NAME_3) + Query query = new Query(baseQuery) + .append(STUDY.key(), STUDY_NAME_3) .append(SAMPLE.key(), "NA12877") .append(ANNOT_CONSEQUENCE_TYPE.key(), ct); assertTrue(executor.canUseThisExecutor(query, new QueryOptions(QueryOptions.FACET, "consequenceType"))); AtomicInteger count = new AtomicInteger(0); sampleIndexDBAdaptor.iterator(new Query(query), new QueryOptions()).forEachRemaining(v -> count.incrementAndGet()); - FacetField facet = executor.aggregation(query, new QueryOptions(QueryOptions.FACET, "consequenceType")).first(); + FacetField facet = executor.aggregation(new Query(query), new QueryOptions(QueryOptions.FACET, "consequenceType")).first(); assertEquals(count.get(), facet.getCount()); FacetField.Bucket bucket = facet.getBuckets().stream().filter(b -> b.getValue().equals(ct)).findFirst().orElse(null); @@ -934,11 +960,7 @@ private void testAggregationCorrectness(String ct, boolean sampleIndexMightBeMor } } else { assertNotNull(msg, bucket); - if (sampleIndexMightBeMoreAccurate) { - assertThat(msg, count.get(), gte(bucket.getCount())); - } else { - assertEquals(msg, count.get(), bucket.getCount()); - } + assertEquals(msg, count.get(), bucket.getCount()); } }