From cf0e029be2189a950846434fe8048c81b721b048 Mon Sep 17 00:00:00 2001 From: brimoor Date: Mon, 3 Oct 2022 12:42:33 -0400 Subject: [PATCH 1/3] refactor merge_samples() to safely cleanup in case of errors --- fiftyone/core/dataset.py | 161 +++++++++++++++++++++++---------------- 1 file changed, 94 insertions(+), 67 deletions(-) diff --git a/fiftyone/core/dataset.py b/fiftyone/core/dataset.py index 6b502c35d3..bbdd41cdd7 100644 --- a/fiftyone/core/dataset.py +++ b/fiftyone/core/dataset.py @@ -6299,6 +6299,13 @@ def _cleanup_index(dataset, db_field, new_index, dropped_index): coll.create_index(db_field) +def _cleanup_frame_index(dataset, index): + coll = dataset._frame_collection + + if index: + coll.drop_index(index) + + def _get_single_index_map(coll): # db_field -> (name, unique) return { @@ -6526,10 +6533,6 @@ def _merge_samples_pipeline( merge_lists=True, overwrite=True, ): - # - # Prepare for merge - # - in_key_field = key_field db_fields_map = src_collection._get_db_fields_map() key_field = db_fields_map.get(key_field, key_field) @@ -6556,13 +6559,6 @@ def _merge_samples_pipeline( src_dataset = src_collection._dataset - new_src_index, dropped_src_index = _ensure_index( - src_dataset, key_field, unique=True - ) - new_dst_index, dropped_dst_index = _ensure_index( - dst_dataset, key_field, unique=True - ) - if contains_videos: frame_fields = None omit_frame_fields = None @@ -6593,43 +6589,7 @@ def _merge_samples_pipeline( omit_frame_fields = None # - # The implementation of merging video frames is currently a bit complex. - # It may be possible to simplify this... - # - # The trouble is that the `_sample_id` of the frame documents need to match - # the `_id` of the sample documents after merging. There may be a more - # clever way to make this happen via `$lookup` than what is implemented - # here, but here's the current workflow: - # - # - Store the `key_field` value on each frame document in both the source - # and destination collections corresopnding to its parent sample in a - # temporary `frame_key_field` field - # - Merge the sample documents without frames attached - # - Merge the frame documents on `[frame_key_field, frame_number]` with - # their old `_sample_id`s unset - # - Generate a `key_field` -> `_id` mapping for the post-merge sample docs, - # then make a pass over the frame documents and set - # their `_sample_id` to the corresponding value from this mapping - # - The merge is complete, so delete `frame_key_field` from both frame - # collections - # - - if contains_videos: - frame_key_field = "_merge_key" - _index_frames(dst_videos, key_field, frame_key_field) - _index_frames(src_videos, key_field, frame_key_field) - - # Must create unique indexes in order to use `$merge` - frame_index_spec = [(frame_key_field, 1), ("frame_number", 1)] - dst_frame_index = dst_dataset._frame_collection.create_index( - frame_index_spec, unique=True - ) - src_frame_index = src_dataset._frame_collection.create_index( - frame_index_spec, unique=True - ) - - # - # Merge samples + # Prepare samples merge pipeline # default_fields = set( @@ -6725,18 +6685,33 @@ def _merge_samples_pipeline( } ) - # Merge samples - src_dataset._aggregate(pipeline=sample_pipeline, manual_group_select=True) - - # Cleanup indexes - _cleanup_index(src_dataset, key_field, new_src_index, dropped_src_index) - _cleanup_index(dst_dataset, key_field, new_dst_index, dropped_dst_index) - # - # Merge frames + # Prepare frames merge pipeline + # + # The implementation of merging video frames is currently a bit complex. + # It may be possible to simplify this... + # + # The trouble is that the `_sample_id` of the frame documents need to match + # the `_id` of the sample documents after merging. There may be a more + # clever way to make this happen via `$lookup` than what is implemented + # here, but here's the current workflow: + # + # - Store the `key_field` value on each frame document in both the source + # and destination collections corresopnding to its parent sample in a + # temporary `frame_key_field` field + # - Merge the sample documents without frames attached + # - Merge the frame documents on `[frame_key_field, frame_number]` with + # their old `_sample_id`s unset + # - Generate a `key_field` -> `_id` mapping for the post-merge sample docs, + # then make a pass over the frame documents and set + # their `_sample_id` to the corresponding value from this mapping + # - The merge is complete, so delete `frame_key_field` from both frame + # collections # if contains_videos: + frame_key_field = "_merge_key" + # @todo this there a cleaner way to avoid this? we have to be sure that # `frame_key_field` is not excluded by a user's view here... _src_videos = _always_select_field( @@ -6796,22 +6771,74 @@ def _merge_samples_pipeline( } ) - # Merge frames + # + # Perform the merge(s) + # + # We wrap this in a try-finally because we need to ensure that temporary + # data and collection indexes are deleted if something goes wrong during + # the actual merges + # + + new_src_index = None + dropped_src_index = None + new_dst_index = None + dropped_dst_index = None + dst_frame_index = None + src_frame_index = None + + try: + # Create unique index on merge key, if necessary + new_src_index, dropped_src_index = _ensure_index( + src_dataset, key_field, unique=True + ) + new_dst_index, dropped_dst_index = _ensure_index( + dst_dataset, key_field, unique=True + ) + + if contains_videos: + _index_frames(dst_videos, key_field, frame_key_field) + _index_frames(src_videos, key_field, frame_key_field) + + # Create unique index on frame merge key + frame_index_spec = [(frame_key_field, 1), ("frame_number", 1)] + dst_frame_index = dst_dataset._frame_collection.create_index( + frame_index_spec, unique=True + ) + src_frame_index = src_dataset._frame_collection.create_index( + frame_index_spec, unique=True + ) + + # Merge samples src_dataset._aggregate( - pipeline=frame_pipeline, manual_group_select=True + pipeline=sample_pipeline, manual_group_select=True ) - # Drop indexes - dst_dataset._frame_collection.drop_index(dst_frame_index) - src_dataset._frame_collection.drop_index(src_frame_index) + if contains_videos: + # Merge frames + src_dataset._aggregate( + pipeline=frame_pipeline, manual_group_select=True + ) - # Finalize IDs - _finalize_frames(dst_videos, key_field, frame_key_field) + # Finalize IDs + _finalize_frames(dst_videos, key_field, frame_key_field) + finally: + # Cleanup indexes + _cleanup_index( + src_dataset, key_field, new_src_index, dropped_src_index + ) + _cleanup_index( + dst_dataset, key_field, new_dst_index, dropped_dst_index + ) - # Cleanup merge key - cleanup_op = {"$unset": {frame_key_field: ""}} - src_dataset._frame_collection.update_many({}, cleanup_op) - dst_dataset._frame_collection.update_many({}, cleanup_op) + if contains_videos: + # Cleanup indexes + _cleanup_frame_index(dst_dataset, dst_frame_index) + _cleanup_frame_index(src_dataset, src_frame_index) + + # Cleanup merge key + cleanup_op = {"$unset": {frame_key_field: ""}} + src_dataset._frame_collection.update_many({}, cleanup_op) + dst_dataset._frame_collection.update_many({}, cleanup_op) # Reload docs fos.Sample._reload_docs(dst_dataset._sample_collection_name) From 2fae95b53f30654314ad0b5af317849c5e8bff9b Mon Sep 17 00:00:00 2001 From: brimoor Date: Thu, 20 Oct 2022 16:17:25 -0400 Subject: [PATCH 2/3] more safe cleanup via finally blocks --- fiftyone/core/clips.py | 134 ++++++++++++++++--------------- fiftyone/core/dataset.py | 34 ++++---- fiftyone/utils/beam.py | 20 ++--- fiftyone/utils/data/importers.py | 55 +++++++------ 4 files changed, 127 insertions(+), 116 deletions(-) diff --git a/fiftyone/core/clips.py b/fiftyone/core/clips.py index 95a48f931c..097020f473 100644 --- a/fiftyone/core/clips.py +++ b/fiftyone/core/clips.py @@ -647,46 +647,47 @@ def _write_trajectories(dataset, src_collection, field, other_fields=None): _allow_missing=True, ) - src_collection = fod._always_select_field(src_collection, _tmp_field) - - id_field = "_id" if not src_dataset._is_clips else "_sample_id" - - project = { - "_id": False, - "_sample_id": "$" + id_field, - _tmp_field: True, - "_media_type": True, - "filepath": True, - "metadata": True, - "tags": True, - field: True, - } - - if other_fields: - project.update({f: True for f in other_fields}) - - src_collection._aggregate( - post_pipeline=[ - {"$project": project}, - {"$unwind": "$" + _tmp_field}, - { - "$set": { - "support": {"$slice": ["$" + _tmp_field, 2, 2]}, - field: { - "_cls": "Label", - "label": {"$arrayElemAt": ["$" + _tmp_field, 0]}, - "index": {"$arrayElemAt": ["$" + _tmp_field, 1]}, + try: + src_collection = fod._always_select_field(src_collection, _tmp_field) + + id_field = "_id" if not src_dataset._is_clips else "_sample_id" + + project = { + "_id": False, + "_sample_id": "$" + id_field, + _tmp_field: True, + "_media_type": True, + "filepath": True, + "metadata": True, + "tags": True, + field: True, + } + + if other_fields: + project.update({f: True for f in other_fields}) + + src_collection._aggregate( + post_pipeline=[ + {"$project": project}, + {"$unwind": "$" + _tmp_field}, + { + "$set": { + "support": {"$slice": ["$" + _tmp_field, 2, 2]}, + field: { + "_cls": "Label", + "label": {"$arrayElemAt": ["$" + _tmp_field, 0]}, + "index": {"$arrayElemAt": ["$" + _tmp_field, 1]}, + }, + "_rand": {"$rand": {}}, }, - "_rand": {"$rand": {}}, }, - }, - {"$unset": _tmp_field}, - {"$out": dataset._sample_collection_name}, - ] - ) - - cleanup_op = {"$unset": {_tmp_field: ""}} - src_dataset._sample_collection.update_many({}, cleanup_op) + {"$unset": _tmp_field}, + {"$out": dataset._sample_collection_name}, + ] + ) + finally: + cleanup_op = {"$unset": {_tmp_field: ""}} + src_dataset._sample_collection.update_many({}, cleanup_op) def _write_expr_clips( @@ -725,34 +726,35 @@ def _write_manual_clips(dataset, src_collection, clips, other_fields=None): _allow_missing=True, ) - src_collection = fod._always_select_field(src_collection, _tmp_field) - - id_field = "_id" if not src_dataset._is_clips else "_sample_id" - - project = { - "_id": False, - "_sample_id": "$" + id_field, - "_media_type": True, - "filepath": True, - "support": "$" + _tmp_field, - "metadata": True, - "tags": True, - } - - if other_fields: - project.update({f: True for f in other_fields}) - - src_collection._aggregate( - post_pipeline=[ - {"$project": project}, - {"$unwind": "$support"}, - {"$set": {"_rand": {"$rand": {}}}}, - {"$out": dataset._sample_collection_name}, - ] - ) - - cleanup_op = {"$unset": {_tmp_field: ""}} - src_dataset._sample_collection.update_many({}, cleanup_op) + try: + src_collection = fod._always_select_field(src_collection, _tmp_field) + + id_field = "_id" if not src_dataset._is_clips else "_sample_id" + + project = { + "_id": False, + "_sample_id": "$" + id_field, + "_media_type": True, + "filepath": True, + "support": "$" + _tmp_field, + "metadata": True, + "tags": True, + } + + if other_fields: + project.update({f: True for f in other_fields}) + + src_collection._aggregate( + post_pipeline=[ + {"$project": project}, + {"$unwind": "$support"}, + {"$set": {"_rand": {"$rand": {}}}}, + {"$out": dataset._sample_collection_name}, + ] + ) + finally: + cleanup_op = {"$unset": {_tmp_field: ""}} + src_dataset._sample_collection.update_many({}, cleanup_op) def _get_trajectories(sample_collection, frame_field): diff --git a/fiftyone/core/dataset.py b/fiftyone/core/dataset.py index 2b10f674d8..388a4b807a 100644 --- a/fiftyone/core/dataset.py +++ b/fiftyone/core/dataset.py @@ -2348,22 +2348,24 @@ def merge_samples( # if key_fcn is None: - tmp = Dataset() - tmp.add_samples(samples, num_samples=num_samples) - - self.merge_samples( - tmp, - key_field=key_field, - skip_existing=skip_existing, - insert_new=insert_new, - fields=fields, - omit_fields=omit_fields, - merge_lists=merge_lists, - overwrite=overwrite, - expand_schema=expand_schema, - include_info=False, - ) - tmp.delete() + try: + tmp = Dataset() + tmp.add_samples(samples, num_samples=num_samples) + + self.merge_samples( + tmp, + key_field=key_field, + skip_existing=skip_existing, + insert_new=insert_new, + fields=fields, + omit_fields=omit_fields, + merge_lists=merge_lists, + overwrite=overwrite, + expand_schema=expand_schema, + include_info=False, + ) + finally: + tmp.delete() return diff --git a/fiftyone/utils/beam.py b/fiftyone/utils/beam.py index e8bb16d4b4..97c41123a7 100644 --- a/fiftyone/utils/beam.py +++ b/fiftyone/utils/beam.py @@ -186,16 +186,18 @@ def beam_merge( if kwargs.get("key_fcn", None) is None: tmp_dataset = fod.Dataset() - beam_import( - tmp_dataset, - samples, - parse_fcn=parse_fcn, - options=options, - verbose=verbose, - ) + try: + beam_import( + tmp_dataset, + samples, + parse_fcn=parse_fcn, + options=options, + verbose=verbose, + ) - dataset.merge_samples(tmp_dataset, **kwargs) - tmp_dataset.delete() + dataset.merge_samples(tmp_dataset, **kwargs) + finally: + tmp_dataset.delete() return diff --git a/fiftyone/utils/data/importers.py b/fiftyone/utils/data/importers.py index 5af2cabaf0..879886b1f1 100644 --- a/fiftyone/utils/data/importers.py +++ b/fiftyone/utils/data/importers.py @@ -263,26 +263,27 @@ def merge_samples( # if isinstance(dataset_importer, BatchDatasetImporter): - tmp = fod.Dataset() - with dataset_importer: - dataset_importer.import_samples(tmp, tags=tags) - - dataset.merge_samples( - tmp, - key_field=key_field, - key_fcn=key_fcn, - skip_existing=skip_existing, - insert_new=insert_new, - fields=fields, - omit_fields=omit_fields, - merge_lists=merge_lists, - overwrite=overwrite, - expand_schema=expand_schema, - include_info=add_info, - overwrite_info=True, - ) - - tmp.delete() + try: + tmp = fod.Dataset() + with dataset_importer: + dataset_importer.import_samples(tmp, tags=tags) + + dataset.merge_samples( + tmp, + key_field=key_field, + key_fcn=key_fcn, + skip_existing=skip_existing, + insert_new=insert_new, + fields=fields, + omit_fields=omit_fields, + merge_lists=merge_lists, + overwrite=overwrite, + expand_schema=expand_schema, + include_info=add_info, + overwrite_info=True, + ) + finally: + tmp.delete() return @@ -1639,11 +1640,15 @@ def import_samples(self, dataset, tags=None): # into a temporary dataset, perform the migration, and then merge # into the destination dataset tmp_dataset = fod.Dataset() - sample_ids = self._import_samples( - tmp_dataset, dataset_dict, tags=tags - ) - dataset.add_collection(tmp_dataset) - tmp_dataset.delete() + + try: + sample_ids = self._import_samples( + tmp_dataset, dataset_dict, tags=tags + ) + dataset.add_collection(tmp_dataset) + finally: + tmp_dataset.delete() + return sample_ids return self._import_samples(dataset, dataset_dict, tags=tags) From ff6985b18033c6ad58bf0eacec5546953f18bfed Mon Sep 17 00:00:00 2001 From: brimoor Date: Thu, 20 Oct 2022 16:19:30 -0400 Subject: [PATCH 3/3] init outside try --- fiftyone/core/dataset.py | 3 ++- fiftyone/utils/data/importers.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fiftyone/core/dataset.py b/fiftyone/core/dataset.py index 388a4b807a..c155e9b6e3 100644 --- a/fiftyone/core/dataset.py +++ b/fiftyone/core/dataset.py @@ -2348,8 +2348,9 @@ def merge_samples( # if key_fcn is None: + tmp = Dataset() + try: - tmp = Dataset() tmp.add_samples(samples, num_samples=num_samples) self.merge_samples( diff --git a/fiftyone/utils/data/importers.py b/fiftyone/utils/data/importers.py index 879886b1f1..7295761d7f 100644 --- a/fiftyone/utils/data/importers.py +++ b/fiftyone/utils/data/importers.py @@ -263,8 +263,9 @@ def merge_samples( # if isinstance(dataset_importer, BatchDatasetImporter): + tmp = fod.Dataset() + try: - tmp = fod.Dataset() with dataset_importer: dataset_importer.import_samples(tmp, tags=tags)