Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anvil allow partial project loading #4479

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions seqr/utils/search/add_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def format_email(sample_summary, project_link, num_new_samples):

def prepare_data_loading_request(projects: list[Project], sample_type: str, dataset_type: str, genome_version: str,
data_path: str, user: User, pedigree_dir: str, raise_pedigree_error: bool = False,
individual_ids: list[str] = None, skip_validation: bool = False):
individual_ids: list[int] = None, skip_validation: bool = False):
project_guids = sorted([p.guid for p in projects])
variables = {
'projects_to_run': project_guids,
Expand All @@ -137,7 +137,7 @@ def _dag_dataset_type(sample_type: str, dataset_type: str):
else dataset_type


def _upload_data_loading_files(projects: list[Project], user: User, file_path: str, individual_ids: list[str], raise_error: bool):
def _upload_data_loading_files(projects: list[Project], user: User, file_path: str, individual_ids: list[int], raise_error: bool):
file_annotations = OrderedDict({
'Project_GUID': F('family__project__guid'), 'Family_GUID': F('family__guid'),
'Family_ID': F('family__family_id'),
Expand Down
52 changes: 31 additions & 21 deletions seqr/views/apis/anvil_workspace_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def create_project_from_workspace(request, namespace, name):
error = 'Field(s) "{}" are required'.format(', '.join(missing_fields))
return create_json_response({'error': error}, status=400, reason=error)

pedigree_records = _parse_uploaded_pedigree(request_json)
pedigree_records, _ = _parse_uploaded_pedigree(request_json)

# Create a new Project in seqr
project_args = {
Expand Down Expand Up @@ -229,27 +229,36 @@ def add_workspace_data(request, project_guid):
error = 'Field(s) "{}" are required'.format(', '.join(missing_fields))
return create_json_response({'error': error}, status=400, reason=error)

pedigree_records = _parse_uploaded_pedigree(request_json, project=project)
pedigree_records, records_by_family = _parse_uploaded_pedigree(request_json, project=project)

previous_samples = get_search_samples([project]).filter(
dataset_type=Sample.DATASET_TYPE_VARIANT_CALLS).prefetch_related('individual')
if not previous_samples:
previous_samples = get_search_samples([project]).filter(dataset_type=Sample.DATASET_TYPE_VARIANT_CALLS)
sample = previous_samples.first()
if not sample:
return create_json_response({
'error': 'New data cannot be added to this project until the previously requested data is loaded',
}, status=400)

previous_loaded_individuals = {s.individual.individual_id for s in previous_samples}
missing_loaded_samples = [individual_id for individual_id in previous_loaded_individuals if
individual_id not in request_json['vcfSamples']]
if missing_loaded_samples:
sample_type = sample.sample_type

previous_loaded_individuals = previous_samples.filter(
individual__family__family_id__in=records_by_family,
).values_list('individual_id', 'individual__individual_id', 'individual__family__family_id')
missing_samples_by_family = defaultdict(list)
for _, individual_id, family_id in previous_loaded_individuals:
if individual_id not in request_json['vcfSamples']:
missing_samples_by_family[family_id].append(individual_id)
if missing_samples_by_family:
missing_family_sample_messages = [
f'Family {family_id}: {", ".join(sorted(individual_ids))}'
for family_id, individual_ids in missing_samples_by_family.items()
]
return create_json_response({
'error': 'In order to add new data to this project, new samples must be joint called in a single VCF with all previously loaded samples.'
' The following samples were previously loaded in this project but are missing from the VCF: {}'.format(
', '.join(sorted(missing_loaded_samples)))}, status=400)
'error': 'In order to load data for families with previously loaded data, new family samples must be joint called in a single VCF with all previously loaded samples.'
' The following samples were previously loaded in this project but are missing from the VCF:\n{}'.format(
'\n'.join(sorted(missing_family_sample_messages)))}, status=400)

pedigree_json = _trigger_add_workspace_data(
project, pedigree_records, request.user, request_json['fullDataPath'], previous_samples.first().sample_type,
previous_loaded_ids=previous_loaded_individuals, get_pedigree_json=True)
project, pedigree_records, request.user, request_json['fullDataPath'], sample_type,
previous_loaded_ids=[i[0] for i in previous_loaded_individuals], get_pedigree_json=True)

return create_json_response(pedigree_json)

Expand Down Expand Up @@ -285,17 +294,17 @@ def _parse_uploaded_pedigree(request_json, project=None):
if errors:
raise ErrorsWarningsException(errors, [])

return pedigree_records
return pedigree_records, records_by_family


def _trigger_add_workspace_data(project, pedigree_records, user, data_path, sample_type, previous_loaded_ids=None, get_pedigree_json=False):
# add families and individuals according to the uploaded individual records
pedigree_json, sample_ids = add_or_update_individuals_and_families(
project, individual_records=pedigree_records, user=user, get_update_json=get_pedigree_json, get_updated_individual_ids=True,
pedigree_json, individual_ids = add_or_update_individuals_and_families(
project, individual_records=pedigree_records, user=user, get_update_json=get_pedigree_json, get_updated_individual_db_ids=True,
allow_features_update=True,
)
num_updated_individuals = len(sample_ids)
sample_ids.update(previous_loaded_ids or [])
num_updated_individuals = len(individual_ids)
individual_ids.update(previous_loaded_ids or [])

# use airflow api to trigger AnVIL dags
reload_summary = f' and {len(previous_loaded_ids)} re-loaded' if previous_loaded_ids else ''
Expand All @@ -305,14 +314,15 @@ def _trigger_add_workspace_data(project, pedigree_records, user, data_path, samp
trigger_success = trigger_airflow_data_loading(
[project], sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, project.genome_version, data_path, user=user, success_message=success_message,
success_slack_channel=SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, error_message=f'ERROR triggering AnVIL loading for project {project.guid}',
individual_ids=individual_ids,
)
AirtableSession(user, base=AirtableSession.ANVIL_BASE).safe_create_records(
ANVIL_REQUEST_TRACKING_TABLE, [{
'Requester Name': user.get_full_name(),
'Requester Email': user.email,
'AnVIL Project URL': _get_seqr_project_url(project),
'Initial Request Date': datetime.now().strftime('%Y-%m-%d'),
'Number of Samples': len(sample_ids),
'Number of Samples': len(individual_ids),
'Status': 'Loading' if trigger_success else 'Loading Requested'
}])

Expand Down
Loading
Loading