diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index e1af5264c41e..e617188bab45 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -284,7 +284,7 @@ - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/... - label: ":potable_water: Dataset datasource integration tests (Python 3.7)" - conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED"] + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - DATA_PROCESSING_TESTING=1 ARROW_VERSION=9.* ARROW_MONGO_VERSION=0.5.* ./ci/env/install-dependencies.sh diff --git a/python/ray/data/datasource/mongo_datasource.py b/python/ray/data/datasource/mongo_datasource.py index d44dec57a667..f1153271c532 100644 --- a/python/ray/data/datasource/mongo_datasource.py +++ b/python/ray/data/datasource/mongo_datasource.py @@ -95,6 +95,10 @@ def __init__( self._client = pymongo.MongoClient(uri) _validate_database_collection_exist(self._client, database, collection) + self._avg_obj_size = self._client[database].command("collstats", collection)[ + "avgObjSize" + ] + def estimate_inmemory_data_size(self) -> Optional[int]: # TODO(jian): Add memory size estimation to improve auto-tune of parallelism. return None @@ -154,7 +158,7 @@ def make_block( for i, partition in enumerate(partitions_ids): metadata = BlockMetadata( num_rows=partition["count"], - size_bytes=None, + size_bytes=partition["count"] * self._avg_obj_size, schema=None, input_files=None, exec_stats=None,