Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: The bulk writer cannot convert nullable data to a CSV file. #2313

Open
1 task done
zhuwenxing opened this issue Oct 25, 2024 · 5 comments
Open
1 task done

[Bug]: The bulk writer cannot convert nullable data to a CSV file. #2313

zhuwenxing opened this issue Oct 25, 2024 · 5 comments
Assignees
Labels
kind/bug Something isn't working

Comments

@zhuwenxing
Copy link
Contributor

Is there an existing issue for this?

  • I have searched the existing issues

Describe the bug

[2024-10-25 19:42:28 - ERROR - local_bulk_writer]: Failed to fulsh, error: int() argument must be a string, a bytes-like object or a number, not 'NoneType' (local_bulk_writer.py:142)

[2024-10-25 19:42:27 - INFO - ci_test]: ################################################################################ (conftest.py:232)
[2024-10-25 19:42:27 - INFO - ci_test]: [initialize_milvus] Log cleaned up, start testing... (conftest.py:233)
[2024-10-25 19:42:27 - INFO - ci_test]: [setup_class] Start setup class... (client_base.py:40)
[2024-10-25 19:42:27 - INFO - ci_test]: *********************************** setup *********************************** (client_base.py:46)
[2024-10-25 19:42:27 - INFO - ci_test]: [setup_method] Start setup test case test_with_all_field_csv_with_bulk_writer. (client_base.py:47)
-------------------------------- live log call ---------------------------------
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer (local_bulk_writer.py:77)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Data path created: /Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262 (local_bulk_writer.py:82)
[2024-10-25 19:42:28 - INFO - remote_bulk_writer]: Minio/S3 blob storage client successfully initialized (remote_bulk_writer.py:157)
[2024-10-25 19:42:28 - INFO - remote_bulk_writer]: Remote buffer writer initialized, target path: /bulk_data/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262 (remote_bulk_writer.py:123)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Prepare to flush buffer, row_count: 1000, size: 1360576 (local_bulk_writer.py:109)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Flush thread begin, name: Thread-10 (local_bulk_writer.py:116)
[2024-10-25 19:42:28 - ERROR - local_bulk_writer]: Failed to fulsh, error: int() argument must be a string, a bytes-like object or a number, not 'NoneType' (local_bulk_writer.py:142)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Wait flush to finish (local_bulk_writer.py:120)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Flush thread finished, name: Thread-10 (local_bulk_writer.py:146)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Commit done with async=False (local_bulk_writer.py:124)
[2024-10-25 19:42:28 - INFO - ci_test]: bulk insert files: [] (test_bulk_insert.py:1874)
[2024-10-25 19:42:28 - INFO - local_bulk_writer]: Delete local directory '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pycharm/bulk_writer/6c82cb06-dff3-4dcf-ba6e-e2a48c89f262' (local_bulk_writer.py:88)
[2024-10-25 19:42:29 - INFO - ci_test]:  collection entities: 0 (test_bulk_insert.py:1889)
FAILED
testcases/test_bulk_insert.py:1806 (TestBulkInsert.test_with_all_field_csv_with_bulk_writer[doc-True-True-1000-128-True])
0 != 1000

Expected :1000
Actual   :0
<Click to see difference>

self = <test_bulk_insert.TestBulkInsert object at 0x130eae7c0>, auto_id = True
dim = 128, entities = 1000, enable_dynamic_field = True, sparse_format = 'doc'
nullable = True

    @pytest.mark.tags(CaseLabel.L3)
    @pytest.mark.parametrize("auto_id", [True, False])
    @pytest.mark.parametrize("dim", [128])  # 128
    @pytest.mark.parametrize("entities", [1000])  # 1000
    @pytest.mark.parametrize("enable_dynamic_field", [True, False])
    @pytest.mark.parametrize("nullable", [True, False])
    @pytest.mark.parametrize("sparse_format", ["doc", "coo"])
    def test_with_all_field_csv_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format, nullable):
        """
        """
        self._connect()
        fields = [
            cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
            cf.gen_int64_field(name=df.int_field, nullable=nullable),
            cf.gen_float_field(name=df.float_field, nullable=nullable),
            cf.gen_string_field(name=df.string_field, nullable=nullable),
            cf.gen_json_field(name=df.json_field, nullable=nullable),
            cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64, nullable=nullable),
            cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
            cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
            cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable),
            cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
            cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
            cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
            cf.gen_sparse_vec_field(name=df.sparse_vec_field),
        ]
        c_name = cf.gen_unique_str("bulk_insert")
        schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
        self.collection_wrap.init_collection(c_name, schema=schema)
        with RemoteBulkWriter(
            schema=schema,
            remote_path="bulk_data",
            connect_param=RemoteBulkWriter.ConnectParam(
                bucket_name=self.bucket_name,
                endpoint=self.minio_endpoint,
                access_key="minioadmin",
                secret_key="minioadmin",
            ),
            file_type=BulkFileType.CSV,
        ) as remote_writer:
            json_value = [
                {"key": "value"},
            ]
            for i in range(entities):
                row = {
                    df.pk_field: i,
                    df.int_field: 1 if not (nullable and random.random() < 0.5) else None,
                    df.float_field: 1.0 if not (nullable and random.random() < 0.5) else None,
                    df.string_field: "string" if not (nullable and random.random() < 0.5) else None,
                    df.json_field: json_value[i%len(json_value)] if not (nullable and random.random() < 0.5) else None,
                    df.array_int_field: [1, 2] if not (nullable and random.random() < 0.5) else None,
                    df.array_float_field: [1.0, 2.0] if not (nullable and random.random() < 0.5) else None,
                    df.array_string_field: ["string1", "string2"] if not (nullable and random.random() < 0.5) else None,
                    df.array_bool_field: [True, False] if not (nullable and random.random() < 0.5) else None,
                    df.float_vec_field: cf.gen_vectors(1, dim)[0],
                    df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
                    df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
                    df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0]
                }
                if auto_id:
                    row.pop(df.pk_field)
                if enable_dynamic_field:
                    row["name"] = fake.name()
                    row["address"] = fake.address()
                remote_writer.append_row(row)
            remote_writer.commit()
            files = remote_writer.batch_files
            log.info(f"bulk insert files: {files}")
        # import data
        for f in files:
            t0 = time.time()
            task_id, _ = self.utility_wrap.do_bulk_insert(
                collection_name=c_name, files=f
            )
            logging.info(f"bulk insert task ids:{task_id}")
            success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
                task_ids=[task_id], timeout=300
            )
            tt = time.time() - t0
            log.info(f"bulk insert state:{success} in {tt} with states:{states}")
            assert success
        num_entities = self.collection_wrap.num_entities
        log.info(f" collection entities: {num_entities}")
>       assert num_entities == entities
E       assert 0 == 1000

test_bulk_insert.py:1890: AssertionError

Expected Behavior

No response

Steps/Code To Reproduce behavior

No response

Environment details

- Hardware/Softward conditions (OS, CPU, GPU, Memory):
- Method of installation (Docker, or from source):
- Milvus version (v0.3.1, or v0.4.0):
- Milvus configuration (Settings you made in `server_config.yaml`):

Anything else?

No response

@zhuwenxing zhuwenxing added the kind/bug Something isn't working label Oct 25, 2024
@binbinlv
Copy link

Now bulk writer does not support nullable, @smellthemoon could you please confirm? Thanks.

@binbinlv
Copy link

/assign @smellthemoon

@smellthemoon
Copy link
Contributor

seems related with #2281

@smellthemoon
Copy link
Contributor

could you help to verify it? @zhuwenxing

@smellthemoon
Copy link
Contributor

/assign @zhuwenxing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants