diff --git a/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2/v2_12_6/SyncCohortsAndSamplesMigration.java b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2/v2_12_6/SyncCohortsAndSamplesMigration.java new file mode 100644 index 00000000000..6f6f5cbf18d --- /dev/null +++ b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2/v2_12_6/SyncCohortsAndSamplesMigration.java @@ -0,0 +1,73 @@ +package org.opencb.opencga.app.migrations.v2.v2_12_6; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Updates; +import org.apache.commons.collections4.CollectionUtils; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.opencb.opencga.catalog.db.api.CohortDBAdaptor; +import org.opencb.opencga.catalog.db.api.SampleDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.OrganizationMongoDBAdaptorFactory; +import org.opencb.opencga.catalog.migration.Migration; +import org.opencb.opencga.catalog.migration.MigrationTool; + +import java.util.List; +import java.util.stream.Collectors; + +@Migration(id = "syncCohortsAndSamplesMigration" , + description = "Sync array of samples from cohort with array of cohortIds from Sample", + version = "2.12.6", + domain = Migration.MigrationDomain.CATALOG, + language = Migration.MigrationLanguage.JAVA, + date = 20240621 +) +public class SyncCohortsAndSamplesMigration extends MigrationTool { + + @Override + protected void run() throws Exception { + MongoCollection sampleCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_COLLECTION); + MongoCollection sampleArchiveCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_ARCHIVE_COLLECTION); + + queryMongo(OrganizationMongoDBAdaptorFactory.COHORT_COLLECTION, new Document(), + Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key()), + cohortDoc -> { + String cohortId = cohortDoc.getString(CohortDBAdaptor.QueryParams.ID.key()); + List samples = cohortDoc.getList(CohortDBAdaptor.QueryParams.SAMPLES.key(), Document.class); + if (CollectionUtils.isNotEmpty(samples)) { + List sampleUids = samples + .stream() + .map(s -> s.get(SampleDBAdaptor.QueryParams.UID.key(), Number.class).longValue()) + .collect(Collectors.toList()); + // Ensure all those samples have a reference to the cohortId + Bson query = Filters.and( + Filters.in(SampleDBAdaptor.QueryParams.UID.key(), sampleUids), + Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true) + ); + Bson update = Updates.addToSet(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId); + long addedMissingCohort = sampleCollection.updateMany(query, update).getModifiedCount(); + sampleArchiveCollection.updateMany(query, update); + + // Ensure there aren't any samples pointing to this cohort that are not in the samples array + query = Filters.and( + Filters.nin(SampleDBAdaptor.QueryParams.UID.key(), sampleUids), + Filters.eq(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId), + Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true) + ); + update = Updates.pull(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId); + long removedNonAssociatedCohort = sampleCollection.updateMany(query, update).getModifiedCount(); + sampleArchiveCollection.updateMany(query, update); + + if (addedMissingCohort > 0 || removedNonAssociatedCohort > 0) { + logger.info("Fixed cohort '{}' references. " + + "Added missing reference to {} samples. " + + "Removed non-associated reference from {} samples.", + cohortId, addedMissingCohort, removedNonAssociatedCohort); + } + } + }); + } + +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java index f34f5ced849..7f25b1ee7b9 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java @@ -235,22 +235,21 @@ public OpenCGAResult update(long cohortId, ObjectMap parameters, QueryOptions qu @Override public OpenCGAResult update(long cohortUid, ObjectMap parameters, List variableSetList, QueryOptions queryOptions) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Query query = new Query(QueryParams.UID.key(), cohortUid); - QueryOptions options = new QueryOptions(QueryOptions.INCLUDE, - Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(), - QueryParams.SAMPLES.key() + "." + QueryParams.ID.key())); - OpenCGAResult documentResult = get(query, options); - if (documentResult.getNumResults() == 0) { - throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found."); - } - String cohortId = documentResult.first().getId(); - try { - return runTransaction(clientSession -> transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList, - queryOptions)); - } catch (CatalogDBException e) { - logger.error("Could not update cohort {}: {}", cohortId, e.getMessage(), e); - throw new CatalogDBException("Could not update cohort " + cohortId + ": " + e.getMessage(), e.getCause()); + return runTransaction(clientSession -> { + Query query = new Query(QueryParams.UID.key(), cohortUid); + QueryOptions options = new QueryOptions(QueryOptions.INCLUDE, + Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(), + QueryParams.SAMPLES.key() + "." + QueryParams.ID.key())); + OpenCGAResult documentResult = get(clientSession, query, options); + if (documentResult.getNumResults() == 0) { + throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found."); + } + return transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList, queryOptions); + }); + } catch (Exception e) { + logger.error("Could not update cohort {}: {}", cohortUid, e.getMessage(), e); + throw new CatalogDBException("Could not update cohort " + cohortUid + ": " + e.getMessage(), e); } } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java index a782ea23ed2..38a03bbf24c 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java @@ -221,7 +221,8 @@ private void validateProjectForCreation(String organizationId, Project project) } try { CellBaseConfiguration cellBaseConfiguration = ParamUtils.defaultObject(project.getCellbase(), - new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION)); + new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION, + ParamConstants.CELLBASE_DATA_RELEASE, ParamConstants.CELLBASE_APIKEY)); cellBaseConfiguration = CellBaseValidator.validate(cellBaseConfiguration, project.getOrganism().getScientificName(), project.getOrganism().getAssembly(), true); project.setCellbase(cellBaseConfiguration); diff --git a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java index 70e59390b7b..73cab79045b 100644 --- a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java +++ b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java @@ -17,9 +17,11 @@ package org.opencb.opencga.catalog.managers; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -66,6 +68,9 @@ import javax.naming.NamingException; import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.allOf; @@ -1770,6 +1775,88 @@ public void updateSampleCohortTest() throws Exception { } } + @Test + public void updateSampleCohortWithThreadsTest() throws Exception { + Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, ownerToken).first(); + Sample sampleId2 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_2"), INCLUDE_RESULT, ownerToken).first(); + Sample sampleId3 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_3"), INCLUDE_RESULT, ownerToken).first(); + catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort1") + .setSamples(Arrays.asList(sampleId1)), null, ownerToken).first(); + catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort2") + .setSamples(Arrays.asList(sampleId2, sampleId3)), null, ownerToken).first(); + + ExecutorService executorService = Executors.newFixedThreadPool(10, + new ThreadFactoryBuilder() + .setNameFormat("executor-service-%d") + .build()); + + StopWatch stopWatch = StopWatch.createStarted(); + List> sampleIds = new ArrayList<>(5); + List innerArray = new ArrayList<>(50); + for (int i = 0; i < 250; i++) { + if (i % 50 == 0) { + System.out.println("i = " + i); + } + + String sampleId = "SAMPLE_AUTO_" + i; + executorService.submit(() -> { + try { + catalogManager.getSampleManager().create(studyFqn, new Sample().setId(sampleId), QueryOptions.empty(), ownerToken); + } catch (CatalogException e) { + throw new RuntimeException(e); + } + }); + if (innerArray.size() == 50) { + sampleIds.add(new ArrayList<>(innerArray)); + innerArray.clear(); + } + innerArray.add(sampleId); + } + sampleIds.add(new ArrayList<>(innerArray)); + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + + System.out.println("Creating 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds"); + + stopWatch.stop(); + stopWatch.reset(); + stopWatch.start(); + executorService = Executors.newFixedThreadPool(3); + int execution = 0; + Map actionMap = new HashMap<>(); + actionMap.put(CohortDBAdaptor.QueryParams.SAMPLES.key(), ParamUtils.BasicUpdateAction.SET); + QueryOptions queryOptions = new QueryOptions(); + queryOptions.put(Constants.ACTIONS, actionMap); + for (List innerSampleIds : sampleIds) { + Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, ownerToken).first(); + List sampleReferenceParamList = new ArrayList<>(myCohort1.getNumSamples() + innerSampleIds.size()); + sampleReferenceParamList.addAll(myCohort1.getSamples().stream().map(s -> new SampleReferenceParam().setId(s.getId())).collect(Collectors.toList())); + sampleReferenceParamList.addAll(innerSampleIds.stream().map(s -> new SampleReferenceParam().setId(s)).collect(Collectors.toList())); + int executionId = execution++; + executorService.submit(() -> { + try { + catalogManager.getCohortManager().update(studyFqn, "MyCohort1", + new CohortUpdateParams().setSamples(sampleReferenceParamList), + queryOptions, ownerToken); + System.out.println("Execution: " + executionId); + } catch (CatalogException e) { + throw new RuntimeException(e); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + System.out.println("Attaching 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds"); + + // Ensure persistence + Query sampleQuery = new Query(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), "MyCohort1"); + OpenCGAResult search = catalogManager.getSampleManager().search(studyFqn, sampleQuery, SampleManager.INCLUDE_SAMPLE_IDS, ownerToken); + Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, ownerToken).first(); + assertEquals(search.getNumResults(), myCohort1.getNumSamples()); + Set sampleIdSet = search.getResults().stream().map(Sample::getId).collect(Collectors.toSet()); + assertTrue(myCohort1.getSamples().stream().map(Sample::getId).collect(Collectors.toSet()).containsAll(sampleIdSet)); + } + @Test public void deleteSampleCohortTest() throws Exception { Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, ownerToken).first(); diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java b/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java index c616de15653..a6929d12d79 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java @@ -80,8 +80,8 @@ public class ParamConstants { private static final String UP_TO_100 = " up to a maximum of 100"; public static final String CELLBASE_URL = "https://ws.zettagenomics.com/cellbase"; - public static final String CELLBASE_VERSION = "v5.2"; - public static final String CELLBASE_DATA_RELEASE = "3"; + public static final String CELLBASE_VERSION = "v5.8"; + public static final String CELLBASE_DATA_RELEASE = "7"; public static final String CELLBASE_APIKEY = ""; public static final String POP_FREQ_1000G_CB_V4 = "1kG_phase3"; diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java b/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java index 2ef6a77cca4..101a822562c 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java @@ -46,7 +46,8 @@ public class CellBaseConfiguration { private String apiKey; public CellBaseConfiguration() { - this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION); + this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION, ParamConstants.CELLBASE_DATA_RELEASE, + ParamConstants.CELLBASE_APIKEY); } public CellBaseConfiguration(String url, String version) { diff --git a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml index bd6ec5875b8..b9970d18eaf 100644 --- a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml +++ b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml @@ -11,7 +11,7 @@ cellbase: ## URL host to annotate variants, for example: https://uk.ws.zettagenomics.com/cellbase/ url: "${OPENCGA.CELLBASE.REST.HOST}" version: "${OPENCGA.CELLBASE.VERSION}" - dataRelease: "2" + dataRelease: "7" ## Storage Query Server configuration. When CLI is launched in 'server' mode a RESTful web server ## is launched in the specified port. diff --git a/pom.xml b/pom.xml index ad4c098cd93..e5500bbadd5 100644 --- a/pom.xml +++ b/pom.xml @@ -1429,7 +1429,7 @@ https://uk.ws.zettagenomics.com/cellbase/ - v5.2 + v5.8