From 37c08a65d692171f17f4509ac7356f32006558d5 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Mon, 6 Feb 2023 16:34:47 +0800 Subject: [PATCH] Resolve compatibility issue for kubernetes cases under CGroup2 (#3303) --- .github/workflows/benchmark-ci.yml | 2 +- .github/workflows/core-ci.yml | 2 +- .github/workflows/os-compat-ci.yml | 2 +- .github/workflows/platform-ci.yml | 12 ++- azure-pipelines.yml | 2 +- benchmarks/tpch/gen_data.py | 2 - mars/core/entity/executable.py | 4 +- mars/dataframe/base/shift.py | 4 +- mars/dataframe/base/to_numeric.py | 1 - mars/dataframe/datasource/from_tensor.py | 4 +- mars/dataframe/datasource/read_parquet.py | 2 +- mars/dataframe/datasource/read_sql.py | 37 +++++--- .../tests/test_datasource_execution.py | 2 +- .../tests/test_datastore_execution.py | 2 + mars/dataframe/datastore/to_sql.py | 2 + mars/dataframe/groupby/aggregation.py | 5 +- mars/dataframe/groupby/core.py | 4 +- mars/dataframe/indexing/setitem.py | 4 +- mars/dataframe/indexing/where.py | 4 +- mars/dataframe/utils.py | 20 ++++ .../kubernetes/tests/test_kubernetes.py | 4 +- mars/learn/cluster/_k_means_elkan_iter.py | 12 +-- mars/learn/cluster/_k_means_lloyd_iter.py | 12 +-- mars/learn/cluster/_kmeans.py | 1 - mars/learn/contrib/pytorch/dataset.py | 1 - mars/learn/contrib/pytorch/sampler.py | 1 - mars/learn/contrib/tensorflow/dataset.py | 1 - .../contrib/tensorflow/tests/tf_dataset.py | 1 - mars/learn/contrib/xgboost/dmatrix.py | 4 +- mars/learn/decomposition/tests/test_pca.py | 1 - mars/learn/ensemble/tests/test_bagging.py | 1 - mars/learn/glm/_logistic.py | 1 - mars/learn/linear_model/tests/test_base.py | 3 - mars/learn/metrics/tests/test_regression.py | 1 - mars/learn/neighbors/base.py | 1 - mars/learn/preprocessing/tests/test_data.py | 1 - .../simple_index/tests/test_simple_index.py | 1 - .../semi_supervised/_label_propagation.py | 1 - mars/learn/utils/validation.py | 1 - mars/lib/filesystem/local.py | 1 - mars/lib/sparse/array.py | 1 - mars/lib/version.py | 5 - mars/resource.py | 47 +++++++--- mars/services/cluster/uploader.py | 4 +- .../scheduling/worker/tests/test_execution.py | 8 +- mars/services/subtask/worker/processor.py | 4 +- mars/services/task/api/oscar.py | 1 - mars/services/task/api/web.py | 1 - .../task/supervisor/graph_visualizer.py | 1 - mars/tensor/base/repeat.py | 4 +- mars/tensor/datasource/diag.py | 4 +- mars/tensor/einsum/core.py | 6 +- mars/tensor/einsum/einsumfunc.py | 5 - mars/tensor/stats/ttest.py | 1 - mars/tests/test_resource.py | 93 +++++++++++++++---- setup.cfg | 3 +- 56 files changed, 204 insertions(+), 151 deletions(-) diff --git a/.github/workflows/benchmark-ci.yml b/.github/workflows/benchmark-ci.yml index 5dadca722e..0e0d44ee1e 100644 --- a/.github/workflows/benchmark-ci.yml +++ b/.github/workflows/benchmark-ci.yml @@ -35,7 +35,7 @@ jobs: shell: bash run: | source ./ci/install-conda.sh - python -m pip install --upgrade pip "setuptools<64" wheel coverage; + python -m pip install --upgrade pip setuptools wheel coverage; - name: Install dependencies id: build diff --git a/.github/workflows/core-ci.yml b/.github/workflows/core-ci.yml index e65651103b..7def1b3445 100644 --- a/.github/workflows/core-ci.yml +++ b/.github/workflows/core-ci.yml @@ -35,7 +35,7 @@ jobs: shell: bash run: | source ./ci/install-conda.sh - python -m pip install --upgrade pip "setuptools<64" wheel coverage; + python -m pip install --upgrade pip setuptools wheel coverage; - name: Install dependencies env: diff --git a/.github/workflows/os-compat-ci.yml b/.github/workflows/os-compat-ci.yml index cbf9ab8fa2..2a74130c97 100644 --- a/.github/workflows/os-compat-ci.yml +++ b/.github/workflows/os-compat-ci.yml @@ -32,7 +32,7 @@ jobs: shell: bash run: | source ./ci/install-conda.sh - python -m pip install --upgrade pip "setuptools<64" wheel coverage; + python -m pip install --upgrade pip setuptools wheel coverage; - name: Install dependencies env: diff --git a/.github/workflows/platform-ci.yml b/.github/workflows/platform-ci.yml index 3f96b64d84..5257e1f43e 100644 --- a/.github/workflows/platform-ci.yml +++ b/.github/workflows/platform-ci.yml @@ -18,9 +18,9 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: [3.8-ray, 3.8-ray-deploy, 3.8-ray-dag, 3.8-vineyard, 3.8-dask] + python-version: [3.8-kubernetes, 3.8-ray, 3.8-ray-deploy, 3.8-ray-dag, 3.8-vineyard, 3.8-dask] include: - - { os: ubuntu-20.04, python-version: 3.8-kubernetes, no-common-tests: 1, + - { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1, no-deploy: 1, with-kubernetes: "with Kubernetes" } - { os: ubuntu-20.04, python-version: 3.8-hadoop, no-common-tests: 1, no-deploy: 1, with-hadoop: "with hadoop" } @@ -47,13 +47,14 @@ jobs: shell: bash run: | source ./ci/install-conda.sh - python -m pip install --upgrade pip "setuptools<64" wheel coverage; + python -m pip install --upgrade pip setuptools wheel coverage - name: Start minikube if: ${{ matrix.with-kubernetes }} - with: - driver: none uses: medyagh/setup-minikube@master + with: + driver: docker + mount-path: '/home/runner:/home/runner' - name: Install dependencies env: @@ -125,6 +126,7 @@ jobs: RUN_DASK: ${{ matrix.run-dask }} NO_COMMON_TESTS: ${{ matrix.no-common-tests }} NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1 + USE_MINIKUBE_DOCKER_ENV: true CHANGE_MINIKUBE_NONE_USER: true shell: bash run: | diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 4bafb6c90d..13841bbc42 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -94,7 +94,7 @@ jobs: # do compatibility test for earliest supported pandas release if [[ "$(mars.test.module)" == "dataframe" ]]; then - pip install numpy\<1.24.0 + pip install numpy\<1.24.0 sqlalchemy\<2.0 pip install -i https://pkgs.dev.azure.com/mars-project/mars/_packaging/pandas/pypi/simple/ pandas==1.0.5 pytest $PYTEST_CONFIG -m pd_compat mars/dataframe mv .coverage build/.coverage.pd_compat.file diff --git a/benchmarks/tpch/gen_data.py b/benchmarks/tpch/gen_data.py index 430484cb8b..4ebb77f831 100644 --- a/benchmarks/tpch/gen_data.py +++ b/benchmarks/tpch/gen_data.py @@ -102,7 +102,6 @@ def to_parquet(args): def generate( tables, SCALE_FACTOR, folder, upload_to_s3, validate_dataset, num_processes ): - if upload_to_s3: assert "AWS_ACCESS_KEY_ID" in os.environ, "AWS credentials not set" else: @@ -117,7 +116,6 @@ def generate( fs = s3fs.S3FileSystem() for table_name, (table_short, num_pieces, load_func) in tables.items(): - if upload_to_s3: output_prefix = f"s3://{folder}/{table_name}.pq" else: diff --git a/mars/core/entity/executable.py b/mars/core/entity/executable.py index e880681dce..f4a2a1f9f9 100644 --- a/mars/core/entity/executable.py +++ b/mars/core/entity/executable.py @@ -56,7 +56,9 @@ def _thread_body(self): fut.set_result(None) except (RuntimeError, ConnectionError, KeyError, ActorNotExist): fut.set_result(None) - except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except + except ( + Exception + ) as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except fut.set_exception(ex) finally: del session diff --git a/mars/dataframe/base/shift.py b/mars/dataframe/base/shift.py index 3bd02c6798..6239c2d2ba 100644 --- a/mars/dataframe/base/shift.py +++ b/mars/dataframe/base/shift.py @@ -299,9 +299,7 @@ def _tile_series(cls, op): to_concats = [c] left = abs(op.periods) while left > 0 and 0 <= prev_i < inp.chunk_shape[0]: - prev_chunk = inp.cix[ - prev_i, - ] + prev_chunk = inp.cix[prev_i,] size = min(left, prev_chunk.shape[0]) left -= size prev_i = prev_i - 1 if inc else prev_i + 1 diff --git a/mars/dataframe/base/to_numeric.py b/mars/dataframe/base/to_numeric.py index 94a0d49f1b..4dc02e28e6 100644 --- a/mars/dataframe/base/to_numeric.py +++ b/mars/dataframe/base/to_numeric.py @@ -23,7 +23,6 @@ class DataFrameToNumeric(DataFrameOperand, DataFrameOperandMixin): - errors = StringField("errors") downcast = StringField("downcast") diff --git a/mars/dataframe/datasource/from_tensor.py b/mars/dataframe/datasource/from_tensor.py index 8313c1211b..c8a7b8807b 100644 --- a/mars/dataframe/datasource/from_tensor.py +++ b/mars/dataframe/datasource/from_tensor.py @@ -310,9 +310,7 @@ def _tile_input_1d_tileables(cls, op: "DataFrameFromTensor"): index_value = parse_index(pd_index, store_data=True) else: assert op.index is not None - index_chunk = in_tensors[-1].cix[ - i, - ] + index_chunk = in_tensors[-1].cix[i,] index_value = parse_index( pd.Index([], dtype=index_chunk.dtype), index_chunk, diff --git a/mars/dataframe/datasource/read_parquet.py b/mars/dataframe/datasource/read_parquet.py index 47011f3d4c..7d94a7094c 100644 --- a/mars/dataframe/datasource/read_parquet.py +++ b/mars/dataframe/datasource/read_parquet.py @@ -277,7 +277,7 @@ def _tile_partitioned(cls, op: "DataFrameReadParquet"): out_df = op.outputs[0] shape = (np.nan, out_df.shape[1]) dtypes = cls._to_arrow_dtypes(out_df.dtypes, op) - dataset = pq.ParquetDataset(op.path) + dataset = pq.ParquetDataset(op.path, use_legacy_dataset=True) path_prefix = _parse_prefix(op.path) diff --git a/mars/dataframe/datasource/read_sql.py b/mars/dataframe/datasource/read_sql.py index 3086f407c9..9de76d28de 100644 --- a/mars/dataframe/datasource/read_sql.py +++ b/mars/dataframe/datasource/read_sql.py @@ -38,7 +38,12 @@ from ...tensor.utils import normalize_chunk_sizes from ...typing import OperandType, TileableType from ..arrays import ArrowStringDtype -from ..utils import parse_index, create_sa_connection, to_arrow_dtypes +from ..utils import ( + parse_index, + create_sa_connection, + to_arrow_dtypes, + patch_sa_engine_execute, +) from .core import ( IncrementalIndexDatasource, ColumnPruneSupportedDataSourceMixin, @@ -127,7 +132,6 @@ def _get_selectable(self, engine_or_conn, columns=None): selectable = sa.Table( self.table_or_sql, m, - autoload=True, autoload_with=engine_or_conn, schema=self.schema, ) @@ -141,12 +145,10 @@ def _get_selectable(self, engine_or_conn, columns=None): .alias(temp_name_2) ) else: - selectable = sql.select( - "*", - from_obj=sql.text( - f"({self.table_or_sql}) AS {temp_name_1}" - ), - ).alias(temp_name_2) + from_tb = sql.text(f"({self.table_or_sql}) AS {temp_name_1}") + selectable = ( + sql.select("*").select_from(from_tb).alias(temp_name_2) + ) self.selectable = selectable return selectable @@ -155,11 +157,15 @@ def _collect_info(self, engine_or_conn, selectable, columns, test_rows): # fetch test DataFrame if columns: - query = sql.select( - [sql.column(c) for c in columns], from_obj=selectable - ).limit(test_rows) + query = ( + sql.select(*[sql.column(c) for c in columns]) + .select_from(selectable) + .limit(test_rows) + ) else: - query = sql.select(selectable.columns, from_obj=selectable).limit(test_rows) + query = ( + sql.select(*selectable.columns).select_from(selectable).limit(test_rows) + ) test_df = pd.read_sql( query, engine_or_conn, @@ -178,7 +184,7 @@ def _collect_info(self, engine_or_conn, selectable, columns, test_rows): # fetch size size = list( engine_or_conn.execute( - sql.select([sql.func.count()]).select_from(selectable) + sql.select(sql.func.count()).select_from(selectable) ) )[0][0] shape = (size, test_df.shape[1]) @@ -352,7 +358,7 @@ def _tile_partition(cls, op: "DataFrameReadSQL"): try: part_col = selectable.columns[op.partition_col] range_results = engine.execute( - sql.select([sql.func.min(part_col), sql.func.max(part_col)]) + sql.select(sql.func.min(part_col), sql.func.max(part_col)) ) op.low_limit, op.high_limit = next(range_results) @@ -429,6 +435,7 @@ def _adapt_datetime(dt): out = op.outputs[0] + patch_sa_engine_execute() engine = sa.create_engine(op.con, **(op.engine_kwargs or dict())) try: selectable = op._get_selectable(engine) @@ -444,7 +451,7 @@ def _adapt_datetime(dt): op.low_limit = _adapt_datetime(op.low_limit) op.high_limit = _adapt_datetime(op.high_limit) - query = sa.sql.select(columns) + query = sa.sql.select(*columns) if op.method == "partition": part_col = selectable.columns[op.partition_col] if op.left_end: diff --git a/mars/dataframe/datasource/tests/test_datasource_execution.py b/mars/dataframe/datasource/tests/test_datasource_execution.py index 0a7c25a600..0923a1434e 100644 --- a/mars/dataframe/datasource/tests/test_datasource_execution.py +++ b/mars/dataframe/datasource/tests/test_datasource_execution.py @@ -835,7 +835,7 @@ def test_read_sql_execution(setup): result = r.execute().fetch() pd.testing.assert_frame_equal(result, expected) - table = sa.Table(table_name, m, autoload=True, autoload_with=engine) + table = sa.Table(table_name, m, autoload_with=engine) r = md.read_sql_table( table, engine, diff --git a/mars/dataframe/datastore/tests/test_datastore_execution.py b/mars/dataframe/datastore/tests/test_datastore_execution.py index 2e06794213..1699311fad 100644 --- a/mars/dataframe/datastore/tests/test_datastore_execution.py +++ b/mars/dataframe/datastore/tests/test_datastore_execution.py @@ -39,6 +39,7 @@ from .... import dataframe as md from ....tests.core import flaky from ... import DataFrame +from ...utils import patch_sa_engine_execute def test_to_csv_execution(setup): @@ -130,6 +131,7 @@ def test_to_sql(): index=index, ) + patch_sa_engine_execute() with tempfile.TemporaryDirectory() as d: table_name1 = "test_table" table_name2 = "test_table2" diff --git a/mars/dataframe/datastore/to_sql.py b/mars/dataframe/datastore/to_sql.py index 125b031a6c..749cdb52f4 100644 --- a/mars/dataframe/datastore/to_sql.py +++ b/mars/dataframe/datastore/to_sql.py @@ -32,6 +32,7 @@ build_empty_df, build_empty_series, create_sa_connection, + patch_sa_engine_execute, ) @@ -171,6 +172,7 @@ def execute(cls, ctx, op: "DataFrameToSQLTable"): import sqlalchemy as sa + patch_sa_engine_execute() engine = sa.create_engine(op.con, **(op.engine_kwargs or dict())) try: diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 093e78764c..c844b6ce10 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -475,9 +475,7 @@ def _gen_map_chunks( by = [] for v in map_op.groupby_params["by"]: if isinstance(v, ENTITY_TYPE): - by_chunk = v.cix[ - chunk.index[0], - ] + by_chunk = v.cix[chunk.index[0],] chunk_inputs.append(by_chunk) by.append(by_chunk) else: @@ -554,7 +552,6 @@ def _gen_pivot_chunk( sample_chunks: List[ChunkType], agg_chunk_len: int, ): - properties = dict( by=op.groupby_params["by"], gpu=op.is_gpu(), diff --git a/mars/dataframe/groupby/core.py b/mars/dataframe/groupby/core.py index e130135096..e718a99867 100644 --- a/mars/dataframe/groupby/core.py +++ b/mars/dataframe/groupby/core.py @@ -277,9 +277,7 @@ def tile(cls, op): chunk_by = [] for k in by: if isinstance(k, SERIES_TYPE): - by_chunk = k.cix[ - chunk.index[0], - ] + by_chunk = k.cix[chunk.index[0],] chunk_by.append(by_chunk) chunk_inputs.append(by_chunk) else: diff --git a/mars/dataframe/indexing/setitem.py b/mars/dataframe/indexing/setitem.py index 3a805e1b40..234d7a005d 100644 --- a/mars/dataframe/indexing/setitem.py +++ b/mars/dataframe/indexing/setitem.py @@ -268,9 +268,7 @@ def tile(cls, op: "DataFrameSetitem"): value_chunks, shape=shape, dtypes=dtypes ) else: - value_chunk = value.cix[ - c.index[0], - ] + value_chunk = value.cix[c.index[0],] chunk_inputs = [c, value_chunk] diff --git a/mars/dataframe/indexing/where.py b/mars/dataframe/indexing/where.py index 0cde782a97..94931947df 100644 --- a/mars/dataframe/indexing/where.py +++ b/mars/dataframe/indexing/where.py @@ -174,9 +174,7 @@ def get_tiled_chunk(obj, index, axis=None): return obj.cix[index[0], index[1]] elif isinstance(obj, SERIES_TYPE): axis = axis if axis is not None else op.axis - return obj.cix[ - index[axis], - ] + return obj.cix[index[axis],] else: return obj diff --git a/mars/dataframe/utils.py b/mars/dataframe/utils.py index d9896c8177..0ecc9b1adb 100644 --- a/mars/dataframe/utils.py +++ b/mars/dataframe/utils.py @@ -1566,3 +1566,23 @@ def restore_func(ctx: Context, op): logger.info("%s func %s is restored.", op, op.func) else: op.func = cloudpickle.loads(op.func) + + +def patch_sa_engine_execute(): + """ + pandas did not resolve compatibility issue of sqlalchemy 2.0, the issue + is https://github.com/pandas-dev/pandas/issues/40686. We need to patch + Engine class in SQLAlchemy, and then our code can work well. + """ + try: + from sqlalchemy.engine import Engine + except ImportError: # pragma: no cover + return + + def execute(self, statement, *multiparams, **params): + connection = self.connect() + return connection.execute(statement, *multiparams, **params) + + if hasattr(Engine, "execute"): # pragma: no cover + return + Engine.execute = execute diff --git a/mars/deploy/kubernetes/tests/test_kubernetes.py b/mars/deploy/kubernetes/tests/test_kubernetes.py index f11730c316..992e23956a 100644 --- a/mars/deploy/kubernetes/tests/test_kubernetes.py +++ b/mars/deploy/kubernetes/tests/test_kubernetes.py @@ -129,7 +129,9 @@ def _remove_docker_image(image_name, raises=True): def _load_docker_env(): - if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"): + if "USE_MINIKUBE_DOCKER_ENV" not in os.environ and ( + os.path.exists("/var/run/docker.sock") or not shutil.which("minikube") + ): return proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE) diff --git a/mars/learn/cluster/_k_means_elkan_iter.py b/mars/learn/cluster/_k_means_elkan_iter.py index f01088d40c..55aa134df2 100644 --- a/mars/learn/cluster/_k_means_elkan_iter.py +++ b/mars/learn/cluster/_k_means_elkan_iter.py @@ -402,15 +402,9 @@ def tile(cls, op: "KMeansElkanUpdate"): out_chunks = [list() for _ in range(op.output_limit)] for i in range(x.chunk_shape[0]): x_chunk = x.cix[i, 0] - sample_weight_chunk = sample_weight.cix[ - i, - ] - labels_chunk = labels.cix[ - i, - ] - upper_bounds_chunk = upper_bounds.cix[ - i, - ] + sample_weight_chunk = sample_weight.cix[i,] + labels_chunk = labels.cix[i,] + upper_bounds_chunk = upper_bounds.cix[i,] lower_bounds_chunk = lower_bounds.cix[i, 0] chunk_op = op.copy().reset_key() chunk_op.stage = OperandStage.map diff --git a/mars/learn/cluster/_k_means_lloyd_iter.py b/mars/learn/cluster/_k_means_lloyd_iter.py index 755c97cb9a..40801684ae 100644 --- a/mars/learn/cluster/_k_means_lloyd_iter.py +++ b/mars/learn/cluster/_k_means_lloyd_iter.py @@ -148,15 +148,9 @@ def tile(cls, op: "KMeansLloydUpdate"): labels_chunks, centers_new_chunks, weight_in_clusters_chunks = [], [], [] for i in range(x.chunk_shape[0]): x_chunk = x.cix[i, 0] - sample_weight_chunk = sample_weight.cix[ - i, - ] - x_squared_norms_chunk = x_squared_norms.cix[ - i, - ] - labels_chunk = labels.cix[ - i, - ] + sample_weight_chunk = sample_weight.cix[i,] + x_squared_norms_chunk = x_squared_norms.cix[i,] + labels_chunk = labels.cix[i,] chunk_op = op.copy().reset_key() chunk_op.stage = OperandStage.map chunk_kws = [ diff --git a/mars/learn/cluster/_kmeans.py b/mars/learn/cluster/_kmeans.py index 148995bff4..6f6c196488 100644 --- a/mars/learn/cluster/_kmeans.py +++ b/mars/learn/cluster/_kmeans.py @@ -740,7 +740,6 @@ def __init__( oversampling_factor=2, init_iter=5, ): - self.n_clusters = n_clusters self.init = init self.max_iter = max_iter diff --git a/mars/learn/contrib/pytorch/dataset.py b/mars/learn/contrib/pytorch/dataset.py index 67e3fc4aca..5c1cd8e7b5 100644 --- a/mars/learn/contrib/pytorch/dataset.py +++ b/mars/learn/contrib/pytorch/dataset.py @@ -52,7 +52,6 @@ class MarsDataset(Dataset): """ def __init__(self, *tileables, fetch_kwargs=None): - self._context = get_context() self._tileables = tileables self._fetch_kwargs = fetch_kwargs or dict() diff --git a/mars/learn/contrib/pytorch/sampler.py b/mars/learn/contrib/pytorch/sampler.py index 160da7d730..d03d3e87ff 100644 --- a/mars/learn/contrib/pytorch/sampler.py +++ b/mars/learn/contrib/pytorch/sampler.py @@ -64,7 +64,6 @@ class RandomSampler(Sampler): def __init__( self, data_source, replacement=False, num_samples=None, generator=None ): - self.data_source = data_source self.replacement = replacement self._num_samples = num_samples diff --git a/mars/learn/contrib/tensorflow/dataset.py b/mars/learn/contrib/tensorflow/dataset.py index bd3c755ede..041c3ceb65 100644 --- a/mars/learn/contrib/tensorflow/dataset.py +++ b/mars/learn/contrib/tensorflow/dataset.py @@ -48,7 +48,6 @@ def __init__( output_types: Tuple[np.dtype, ...] = None, fetch_kwargs=None, ): - self._context = get_context() self._tensors = tensors self._output_shapes = output_shapes diff --git a/mars/learn/contrib/tensorflow/tests/tf_dataset.py b/mars/learn/contrib/tensorflow/tests/tf_dataset.py index 20f7ad88af..0ed051f111 100644 --- a/mars/learn/contrib/tensorflow/tests/tf_dataset.py +++ b/mars/learn/contrib/tensorflow/tests/tf_dataset.py @@ -48,7 +48,6 @@ def train(feature_data, labels): if __name__ == "__main__": - assert json.loads(os.environ["TF_CONFIG"])["task"]["index"] in {0, 1} assert len(sys.argv) == 2 assert sys.argv[1] == "multiple" diff --git a/mars/learn/contrib/xgboost/dmatrix.py b/mars/learn/contrib/xgboost/dmatrix.py index 7569f14275..770df7b705 100644 --- a/mars/learn/contrib/xgboost/dmatrix.py +++ b/mars/learn/contrib/xgboost/dmatrix.py @@ -117,9 +117,7 @@ def _get_collocated( for type_name, inp in zip(types[1:], [label, weight, base_margin]): if inp is None: continue - inp_chunk = inp.cix[ - i, - ] + inp_chunk = inp.cix[i,] setattr(chunk_op, type_name, inp_chunk) inps.append(inp_chunk) kw = cls._get_kw(inp_chunk) diff --git a/mars/learn/decomposition/tests/test_pca.py b/mars/learn/decomposition/tests/test_pca.py index 48c0489a75..a2cf5a9008 100644 --- a/mars/learn/decomposition/tests/test_pca.py +++ b/mars/learn/decomposition/tests/test_pca.py @@ -329,7 +329,6 @@ def test_pca_validation(setup): # We conduct the same test on X.T so that it is invariant to axis. for data in [X, X.T]: for n_components in [-1, 3]: - if solver == "auto": solver_reported = "full" else: diff --git a/mars/learn/ensemble/tests/test_bagging.py b/mars/learn/ensemble/tests/test_bagging.py index 920646e7d8..b613439663 100644 --- a/mars/learn/ensemble/tests/test_bagging.py +++ b/mars/learn/ensemble/tests/test_bagging.py @@ -289,7 +289,6 @@ def test_bagging_classifier( def test_bagging_regressor( setup, use_dataframe, max_samples, max_features, with_weights ): - rs = np.random.RandomState(0) raw_x, raw_y = make_regression( diff --git a/mars/learn/glm/_logistic.py b/mars/learn/glm/_logistic.py index bdf3b68062..172c503ff6 100644 --- a/mars/learn/glm/_logistic.py +++ b/mars/learn/glm/_logistic.py @@ -175,7 +175,6 @@ def __init__( multi_class="auto", verbose=0, ): - self.penalty = penalty self.fit_intercept = fit_intercept self.C = C diff --git a/mars/learn/linear_model/tests/test_base.py b/mars/learn/linear_model/tests/test_base.py index f581eb6a96..ff712d786b 100644 --- a/mars/learn/linear_model/tests/test_base.py +++ b/mars/learn/linear_model/tests/test_base.py @@ -101,13 +101,11 @@ def test_linear_regression_sample_weights(setup): # It would not work with under-determined systems for n_samples, n_features in ((6, 5),): - y = rng.randn(n_samples) X = rng.randn(n_samples, n_features) sample_weight = 1.0 + rng.rand(n_samples) for intercept in (True, False): - # LinearRegression with explicit sample_weight reg = LinearRegression(fit_intercept=intercept) reg.fit(X, y, sample_weight=sample_weight) @@ -557,7 +555,6 @@ def test_dtype_preprocess_data(setup): for fit_intercept in [True, False]: for normalize in [True, False]: - Xt_32, yt_32, X_mean_32, y_mean_32, X_norm_32 = _preprocess_data( X_32, y_32, diff --git a/mars/learn/metrics/tests/test_regression.py b/mars/learn/metrics/tests/test_regression.py index 52c47caec3..3e2c9369b1 100644 --- a/mars/learn/metrics/tests/test_regression.py +++ b/mars/learn/metrics/tests/test_regression.py @@ -39,7 +39,6 @@ def test__check_reg_targets(setup): ] for (type1, y1, n_out1), (type2, y2, n_out2) in product(EXAMPLES, repeat=2): - if type1 == type2 and n_out1 == n_out2: y_type, y_check1, y_check2, multioutput = _check_reg_targets(y1, y2, None) assert type1 == y_type diff --git a/mars/learn/neighbors/base.py b/mars/learn/neighbors/base.py index 702920540e..81f37f9b22 100644 --- a/mars/learn/neighbors/base.py +++ b/mars/learn/neighbors/base.py @@ -86,7 +86,6 @@ def __init__( metric_params=None, n_jobs=None, ): - self.n_neighbors = n_neighbors self.radius = radius self.algorithm = algorithm diff --git a/mars/learn/preprocessing/tests/test_data.py b/mars/learn/preprocessing/tests/test_data.py index d261322c35..b95f62351c 100644 --- a/mars/learn/preprocessing/tests/test_data.py +++ b/mars/learn/preprocessing/tests/test_data.py @@ -167,7 +167,6 @@ def test_min_max_scaler1d(setup): # Test scaling of dataset along single axis for X in [X_1row, X_1col, X_list_1row, X_list_1col]: - scaler = MinMaxScaler(copy=True) X_scaled = scaler.fit(X).transform(X) diff --git a/mars/learn/proxima/simple_index/tests/test_simple_index.py b/mars/learn/proxima/simple_index/tests/test_simple_index.py index d99b9f810c..0539b0a824 100644 --- a/mars/learn/proxima/simple_index/tests/test_simple_index.py +++ b/mars/learn/proxima/simple_index/tests/test_simple_index.py @@ -139,7 +139,6 @@ def build_and_query( index_reformer=None, index_reformer_params=None, ): - if measure_name is None: measure_name = "SquaredEuclidean" if dimension is None: diff --git a/mars/learn/semi_supervised/_label_propagation.py b/mars/learn/semi_supervised/_label_propagation.py index aebabb8e7e..c680ccbef5 100644 --- a/mars/learn/semi_supervised/_label_propagation.py +++ b/mars/learn/semi_supervised/_label_propagation.py @@ -59,7 +59,6 @@ class BaseLabelPropagation(ClassifierMixin, BaseEstimator, metaclass=ABCMeta): def __init__( self, kernel="rbf", gamma=20, n_neighbors=7, alpha=1, max_iter=30, tol=1e-3 ): - self.max_iter = max_iter self.tol = tol diff --git a/mars/learn/utils/validation.py b/mars/learn/utils/validation.py index 08839e888b..bbf7e7dce9 100644 --- a/mars/learn/utils/validation.py +++ b/mars/learn/utils/validation.py @@ -269,7 +269,6 @@ def check_array( ensure_min_features=1, estimator=None, ) -> Tensor: - """Input validation on a tensor, list, sparse matrix or similar. By default, the input is checked to be a non-empty 2D array containing diff --git a/mars/lib/filesystem/local.py b/mars/lib/filesystem/local.py index fae4793631..59bd09dcf6 100644 --- a/mars/lib/filesystem/local.py +++ b/mars/lib/filesystem/local.py @@ -22,7 +22,6 @@ class LocalFileSystem(FileSystem): - _instance = None @classmethod diff --git a/mars/lib/sparse/array.py b/mars/lib/sparse/array.py index 7b94c11470..ddd71d383a 100644 --- a/mars/lib/sparse/array.py +++ b/mars/lib/sparse/array.py @@ -34,7 +34,6 @@ class SparseNDArray: __array_priority__ = 21 def __new__(cls, *args, **kwargs): - shape = kwargs.get("shape", None) if shape is not None and len(shape) == 1: from .vector import SparseVector diff --git a/mars/lib/version.py b/mars/lib/version.py index 8e38b79d88..32457773a6 100644 --- a/mars/lib/version.py +++ b/mars/lib/version.py @@ -301,7 +301,6 @@ def _parse_version_parts(s: str) -> Iterator[str]: def _legacy_cmpkey(version: str) -> LegacyCmpKey: - # We hardcode an epoch of -1 here. A PEP 440 version can only have a epoch # greater than or equal to 0. This will effectively put the LegacyVersion, # which uses the defacto standard originally implemented by setuptools, @@ -362,11 +361,9 @@ def _legacy_cmpkey(version: str) -> LegacyCmpKey: class Version(_BaseVersion): - _regex = re.compile(r"^\s*" + VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE) def __init__(self, version: str) -> None: - # Validate the version and parse it into pieces match = self._regex.search(version) if not match: @@ -500,7 +497,6 @@ def micro(self) -> int: def _parse_letter_version( letter: str, number: Union[str, bytes, SupportsInt] ) -> Optional[Tuple[str, int]]: - if letter: # We consider there to be an implicit 0 in a pre-release if there is # not a numeral associated with it. @@ -556,7 +552,6 @@ def _cmpkey( dev: Optional[Tuple[str, int]], local: Optional[Tuple[SubLocalType]], ) -> CmpKey: - # When we compare a release version, we want to compare it with all of the # trailing zeros removed. So we'll use a reverse the list, drop all the now # leading zeros until we come to something non zero, then take the rest diff --git a/mars/resource.py b/mars/resource.py index bf630cdecc..8fd33e3ace 100644 --- a/mars/resource.py +++ b/mars/resource.py @@ -33,8 +33,13 @@ logger = logging.getLogger(__name__) -CGROUP_CPU_STAT_FILE = "/sys/fs/cgroup/cpuacct/cpuacct.usage" -CGROUP_MEM_STAT_FILE = "/sys/fs/cgroup/memory/memory.stat" +CGROUP_V1_CPU_ACCT_FILE = "/sys/fs/cgroup/cpuacct/cpuacct.usage" +CGROUP_V1_MEM_STAT_FILE = "/sys/fs/cgroup/memory/memory.stat" +CGROUP_V2_CPU_STAT_FILE = "/sys/fs/cgroup/cpu.stat" +CGROUP_V2_MEM_CURRENT_FILE = "/sys/fs/cgroup/memory.current" +CGROUP_V2_MEM_MAX_FILE = "/sys/fs/cgroup/memory.max" + +_is_cgroup_v2 = os.path.exists(CGROUP_V2_CPU_STAT_FILE) _proc = psutil.Process() _timer = getattr(time, "monotonic", time.time) @@ -75,8 +80,8 @@ _shm_path = _shm_path[0] -def _read_cgroup_stat_file(): - with open(CGROUP_MEM_STAT_FILE, "r") as cg_file: +def _read_cgroup_stat_file(file_name: str): + with open(file_name, "r") as cg_file: contents = cg_file.read() kvs = dict() for line in contents.splitlines(): @@ -94,11 +99,21 @@ def virtual_memory() -> _virt_memory_stat: sys_mem = psutil.virtual_memory() if _mem_use_cgroup_stat: - # see section 5.5 in https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt - cgroup_mem_info = _read_cgroup_stat_file() - total = cgroup_mem_info["hierarchical_memory_limit"] - total = min(_mem_total or total, total) - used = cgroup_mem_info["rss"] + cgroup_mem_info.get("swap", 0) + max_mem = min(_mem_total or sys_mem.total, sys_mem.total) + if _is_cgroup_v2: + # see Memory section in https://www.kernel.org/doc/Documentation/cgroup-v2.txt + with open(CGROUP_V2_MEM_MAX_FILE, "r") as mem_max_file: + max_str = mem_max_file.read().strip() + total = max_mem if max_str == "max" else int(max_str) + with open(CGROUP_V2_MEM_CURRENT_FILE, "r") as mem_current_file: + used = int(mem_current_file.read().strip()) + else: + # see section 5.5 in https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + cgroup_mem_info = _read_cgroup_stat_file(CGROUP_V1_MEM_STAT_FILE) + total = cgroup_mem_info["hierarchical_memory_limit"] + total = min(max_mem, total) + used = cgroup_mem_info["rss"] + cgroup_mem_info.get("swap", 0) + if _shm_path: shm_stats = psutil.disk_usage(_shm_path) used += shm_stats.used @@ -179,10 +194,16 @@ def _take_process_cpu_snapshot(): def cpu_percent(): global _last_cgroup_cpu_measure, _last_proc_cpu_measure, _last_cpu_percent, _last_psutil_measure if _cpu_use_cgroup_stat: - # see https://www.kernel.org/doc/Documentation/cgroup-v1/cpuacct.txt - with open(CGROUP_CPU_STAT_FILE, "r") as cgroup_file: - cpu_acct = int(cgroup_file.read()) - sample_time = _timer() + if _is_cgroup_v2: + # see CPU section in https://www.kernel.org/doc/Documentation/cgroup-v2.txt + cpu_content = _read_cgroup_stat_file(CGROUP_V2_CPU_STAT_FILE) + cpu_acct = cpu_content["usage_usec"] * 1000 + else: + # see https://www.kernel.org/doc/Documentation/cgroup-v1/cpuacct.txt + with open(CGROUP_V1_CPU_ACCT_FILE, "r") as cgroup_file: + cpu_acct = int(cgroup_file.read()) + sample_time = _timer() + if _last_cgroup_cpu_measure is None: _last_cgroup_cpu_measure = (cpu_acct, sample_time) return None diff --git a/mars/services/cluster/uploader.py b/mars/services/cluster/uploader.py index e34ec73594..e85407db0e 100644 --- a/mars/services/cluster/uploader.py +++ b/mars/services/cluster/uploader.py @@ -88,7 +88,9 @@ async def _periodical_upload_node_info(self): self._uploaded_future.set_result(None) except asyncio.CancelledError: # pragma: no cover break - except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except + except ( + Exception + ) as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except logger.error(f"Failed to upload node info: {ex}") if not self._uploaded_future.done(): self._uploaded_future.set_exception(ex) diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index abeb50ba7f..1bda5b3f80 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -436,10 +436,14 @@ def test_estimate_size(): index_value = parse_index(pd.Index([10, 20, 30], dtype=np.int64)) - input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk( + input1 = DataFrameFetch( + output_types=[OutputType.series], + ).new_chunk( [], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value ) - input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk( + input2 = DataFrameFetch( + output_types=[OutputType.series], + ).new_chunk( [], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value ) result_chunk = DataFrameAdd( diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index 5511084398..112b9b204c 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -494,7 +494,9 @@ async def run(self): self.result.status = SubtaskStatus.cancelled self.result.progress = 1.0 raise - except BaseException as ex: # noqa: E722 # nosec # pylint: disable=bare-except + except ( + BaseException + ) as ex: # noqa: E722 # nosec # pylint: disable=bare-except self.result.status = SubtaskStatus.errored self.result.progress = 1.0 if isinstance(ex, ExecutionError): diff --git a/mars/services/task/api/oscar.py b/mars/services/task/api/oscar.py index 2395edff49..ae4a7009b4 100644 --- a/mars/services/task/api/oscar.py +++ b/mars/services/task/api/oscar.py @@ -78,7 +78,6 @@ async def get_tileable_details(self, task_id: str): async def get_tileable_subtasks( self, task_id: str, tileable_id: str, with_input_output: bool ): - return await self._task_manager_ref.get_tileable_subtasks( task_id, tileable_id, with_input_output ) diff --git a/mars/services/task/api/web.py b/mars/services/task/api/web.py index 7d0aeaf122..21dffd8e0c 100644 --- a/mars/services/task/api/web.py +++ b/mars/services/task/api/web.py @@ -286,7 +286,6 @@ async def get_tileable_details(self, task_id: str): async def get_tileable_subtasks( self, task_id: str, tileable_id: str, with_input_output: bool ): - with_input_output = "true" if with_input_output else "false" path = f"{self._address}/api/session/{self._session_id}/task/{task_id}/{tileable_id}/subtask" params = { diff --git a/mars/services/task/supervisor/graph_visualizer.py b/mars/services/task/supervisor/graph_visualizer.py index 46850a3656..417bb38925 100644 --- a/mars/services/task/supervisor/graph_visualizer.py +++ b/mars/services/task/supervisor/graph_visualizer.py @@ -73,7 +73,6 @@ def _export_subtask_to_dot( chunk_key_to_subtask: Dict[str, List], trunc_key: int = 5, ): - chunk_graph = subtask.chunk_graph sio = StringIO() chunk_style = "[shape=box]" diff --git a/mars/tensor/base/repeat.py b/mars/tensor/base/repeat.py index dd8e53b610..2d2e4448ac 100644 --- a/mars/tensor/base/repeat.py +++ b/mars/tensor/base/repeat.py @@ -133,9 +133,7 @@ def tile(cls, op): rp = repeats[start:stop] size = int(rp.sum()) elif not isinstance(repeats, Integral): - rp = repeats.cix[ - ax_idx, - ] + rp = repeats.cix[ax_idx,] size = np.nan else: rp = repeats diff --git a/mars/tensor/datasource/diag.py b/mars/tensor/datasource/diag.py index 2030121d82..b5ca00a920 100644 --- a/mars/tensor/datasource/diag.py +++ b/mars/tensor/datasource/diag.py @@ -140,9 +140,7 @@ def _get_nsplits(cls, op): def _get_chunk(cls, op, chunk_k, chunk_shape, chunk_idx): assert chunk_shape[0] == chunk_shape[1] input_idx = chunk_idx[1] if op.k < 0 else chunk_idx[0] - input_chunk = op.inputs[0].cix[ - input_idx, - ] + input_chunk = op.inputs[0].cix[input_idx,] op = TensorDiag(k=chunk_k, dtype=op.dtype, gpu=op.gpu, sparse=op.sparse) return op.new_chunk([input_chunk], shape=chunk_shape, index=chunk_idx) diff --git a/mars/tensor/einsum/core.py b/mars/tensor/einsum/core.py index 2585440039..b7474d806c 100644 --- a/mars/tensor/einsum/core.py +++ b/mars/tensor/einsum/core.py @@ -86,11 +86,11 @@ def tile(cls, op): # rechunk to unify nsplits input_nsplits = defaultdict(list) - for (t, axes) in tensor_axes: + for t, axes in tensor_axes: for splits, ax in zip(t.nsplits, axes): input_nsplits[ax].append(splits) input_tensors = [] - for (t, axes) in tensor_axes: + for t, axes in tensor_axes: new_nsplits = tuple( decide_unify_split(*input_nsplits[ax]) if t.shape[j] > 1 @@ -120,7 +120,7 @@ def tile(cls, op): tensor_shape = [] for i, idx in enumerate(out_idx): tensor_shape.append(axes_splits[output_scripts[i]][idx]) - for (t_idx, axis) in output_axes[output_scripts[i]]: + for t_idx, axis in output_axes[output_scripts[i]]: if input_tensors[t_idx].shape[axis] == 1: all_indexes[t_idx][axis] = 0 else: diff --git a/mars/tensor/einsum/einsumfunc.py b/mars/tensor/einsum/einsumfunc.py index c59baedbaa..e7959be6f9 100644 --- a/mars/tensor/einsum/einsumfunc.py +++ b/mars/tensor/einsum/einsumfunc.py @@ -193,7 +193,6 @@ def _optimal_path(input_sets, output_set, idx_dict, memory_limit): for curr in full_results: cost, positions, remaining = curr for con in itertools.combinations(range(len(input_sets) - iteration), 2): - # Find the contraction cont = _find_contraction(con, remaining, output_set) new_result, new_input_sets, idx_removed, idx_contract = cont @@ -310,7 +309,6 @@ def _update_other_results(results, best): mod_results = [] for cost, (x, y), con_sets in results: - # Ignore results involving tensors just contracted if x in best_con or y in best_con: continue @@ -381,10 +379,8 @@ def _greedy_path(input_sets, output_set, idx_dict, memory_limit): path = [] for iteration in range(len(input_sets) - 1): - # Iterate over all pairs on first step, only previously found pairs on subsequent steps for positions in comb_iter: - # Always initially ignore outer products if input_sets[positions[0]].isdisjoint(input_sets[positions[1]]): continue @@ -403,7 +399,6 @@ def _greedy_path(input_sets, output_set, idx_dict, memory_limit): # If we do not have a inner contraction, rescan pairs including outer products if len(known_contractions) == 0: # pragma: no cover - # Then check the outer products for positions in itertools.combinations(range(len(input_sets)), 2): result = _parse_possible_contraction( diff --git a/mars/tensor/stats/ttest.py b/mars/tensor/stats/ttest.py index 087efe0a58..ff99c13644 100644 --- a/mars/tensor/stats/ttest.py +++ b/mars/tensor/stats/ttest.py @@ -61,7 +61,6 @@ def _unequal_var_ttest_denom(v1, n1, v2, n2): def _ttest_ind_from_stats(mean1, mean2, denom, df, alternative): - d = mean1 - mean2 with np.errstate(divide="ignore", invalid="ignore"): t = mt_divide(d, denom) diff --git a/mars/tests/test_resource.py b/mars/tests/test_resource.py index 28cc461a96..eaf94cbacc 100644 --- a/mars/tests/test_resource.py +++ b/mars/tests/test_resource.py @@ -16,13 +16,24 @@ import os import tempfile import time + +import pytest + from ..resource import Resource, ZeroResource -_cpu_stat_first = "8678870951786" -_cpu_stat_last = "8679429771672" +_v1_cpu_stat_first = "8678870951786" +_v1_cpu_stat_last = "8679429771672" + +# just a fragment of real cpu.stat +_v2_cpu_stat_first = """ +usage_usec 8678870951 +""" +_v2_cpu_stat_last = """ +usage_usec 8679429771 +""" # just a fragment of real memory.stat -_memory_stat_content = """ +_v1_memory_stat_content = """ cache 489275392 rss 218181632 mapped_file 486768640 @@ -34,6 +45,9 @@ hierarchical_memory_limit 1073741824 """ +_v2_memory_current_content = "218181632\n" +_v2_memory_max_content = "1073741824\n" + def test_stats(): from mars import resource @@ -105,31 +119,60 @@ def test_use_process_stats(): importlib.reload(resource) -def test_use_c_group_stats(): +@pytest.mark.parametrize("cgroup_ver", ["v1", "v2"]) +def test_use_c_group_stats(cgroup_ver): from mars import resource - fd, cpu_stat_path = tempfile.mkstemp(prefix="test-mars-res-cpu-") - with os.fdopen(fd, "w") as f: - f.write(_cpu_stat_first) - fd, mem_stat_path = tempfile.mkstemp(prefix="test-mars-res-mem-") - with os.fdopen(fd, "w") as f: - f.write(_memory_stat_content) + def write_tmp_text_file(prefix, content): + fd, file_name = tempfile.mkstemp(prefix) + with os.fdopen(fd, "w") as f: + f.write(content) + return file_name - old_cpu_stat_file = resource.CGROUP_CPU_STAT_FILE - old_mem_stat_file = resource.CGROUP_MEM_STAT_FILE + v1_cpu_acct_path = write_tmp_text_file( + "test-mars-res-cgroup-v1-cpu-", _v1_cpu_stat_first + ) + v1_mem_stat_path = write_tmp_text_file( + "test-mars-res-cgroup-v1-mem-", _v1_memory_stat_content + ) + v2_cpu_stat_path = write_tmp_text_file( + "test-mars-res-cgroup-v2-cpu-", _v2_cpu_stat_first + ) + v2_mem_cur_path = write_tmp_text_file( + "test-mars-res-cgroup-v2-cpu-", _v2_memory_current_content + ) + v2_mem_max_path = write_tmp_text_file( + "test-mars-res-cgroup-v2-cpu-", _v2_memory_max_content + ) + + old_is_cgroup_v2 = resource._is_cgroup_v2 + old_v1_cpu_acct_file = resource.CGROUP_V1_CPU_ACCT_FILE + old_v1_mem_stat_file = resource.CGROUP_V1_MEM_STAT_FILE + old_v2_cpu_stat_file = resource.CGROUP_V2_CPU_STAT_FILE + old_v2_mem_current_file = resource.CGROUP_V2_MEM_CURRENT_FILE + old_v2_mem_max_file = resource.CGROUP_V2_MEM_MAX_FILE old_shm_path = resource._shm_path try: os.environ["MARS_USE_CGROUP_STAT"] = "1" resource = importlib.reload(resource) - resource.CGROUP_CPU_STAT_FILE = cpu_stat_path - resource.CGROUP_MEM_STAT_FILE = mem_stat_path + if cgroup_ver == "v1": + resource.CGROUP_V1_CPU_ACCT_FILE = v1_cpu_acct_path + resource.CGROUP_V1_MEM_STAT_FILE = v1_mem_stat_path + resource._is_cgroup_v2 = False + else: + resource.CGROUP_V2_CPU_STAT_FILE = v2_cpu_stat_path + resource.CGROUP_V2_MEM_CURRENT_FILE = v2_mem_cur_path + resource.CGROUP_V2_MEM_MAX_FILE = v2_mem_max_path + resource._is_cgroup_v2 = True resource._shm_path = None assert resource.cpu_percent() is None time.sleep(0.5) - with open(cpu_stat_path, "w") as f: - f.write(_cpu_stat_last) + with open(v1_cpu_acct_path, "w") as f: + f.write(_v1_cpu_stat_last) + with open(v2_cpu_stat_path, "w") as f: + f.write(_v2_cpu_stat_last) assert resource.cpu_percent() > 50 assert resource.cpu_percent() < 150 @@ -137,12 +180,22 @@ def test_use_c_group_stats(): assert mem_stats.total == 1073741824 assert mem_stats.used == 218181632 finally: - resource.CGROUP_CPU_STAT_FILE = old_cpu_stat_file - resource.CGROUP_MEM_STAT_FILE = old_mem_stat_file + resource._is_cgroup_v2 = old_is_cgroup_v2 resource._shm_path = old_shm_path + resource.CGROUP_V1_CPU_ACCT_FILE = old_v1_cpu_acct_file + resource.CGROUP_V1_MEM_STAT_FILE = old_v1_mem_stat_file + resource.CGROUP_V2_CPU_STAT_FILE = old_v2_cpu_stat_file + resource.CGROUP_V2_MEM_CURRENT_FILE = old_v2_mem_current_file + resource.CGROUP_V2_MEM_MAX_FILE = old_v2_mem_max_file + del os.environ["MARS_USE_CGROUP_STAT"] - os.unlink(cpu_stat_path) - os.unlink(mem_stat_path) + + os.unlink(v1_cpu_acct_path) + os.unlink(v1_mem_stat_path) + os.unlink(v2_cpu_stat_path) + os.unlink(v2_mem_cur_path) + os.unlink(v2_mem_max_path) + importlib.reload(resource) diff --git a/setup.cfg b/setup.cfg index f933995b50..fdb54c66ff 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,7 +37,8 @@ install_requires = pickle5; python_version<"3.8" shared-memory38>=0.1.1; python_version<"3.8" tornado>=6.0 - sqlalchemy>=1.2.0 + sqlalchemy>=1.2.0,<2.0; python_version<"3.8" + sqlalchemy>=1.2.0; python_version>="3.8" defusedxml>=0.5.0 tqdm>=4.1.0 uvloop>=0.14.0; sys.platform!="win32"