From 10965d36362a2d54e668e12615b45b5e6d2c48a2 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Mon, 24 Jun 2024 20:29:51 +0200 Subject: [PATCH 01/32] test: update tests for refactored mapping method --- argilla/tests/unit/test_record_ingestion.py | 284 +++++++++----------- 1 file changed, 125 insertions(+), 159 deletions(-) diff --git a/argilla/tests/unit/test_record_ingestion.py b/argilla/tests/unit/test_record_ingestion.py index 76058ec8c9..8ac8ab9968 100644 --- a/argilla/tests/unit/test_record_ingestion.py +++ b/argilla/tests/unit/test_record_ingestion.py @@ -36,191 +36,157 @@ def dataset(): def test_ingest_record_from_dict(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "What is the capital of France?", - "label": "positive", - }, + record_api_models = dataset.records._ingest_records( + records=[ + { + "prompt": "What is the capital of France?", + "label": "positive", + } + ], ) + record = record_api_models[0] assert record.fields["prompt"] == "What is the capital of France?" - assert record.suggestions["label"].value == "positive" - - -def test_ingest_record_from_dict_with_mapping(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "my_prompt": "What is the capital of France?", - "label": "positive", - }, - mapping={ - "my_prompt": "prompt", - }, + assert record.suggestions[0].value == "positive" + + +def test_ingest_record_from_dict_with_mapped_suggestions(dataset): + mock_mapping = { + "my_prompt": "prompt", + "score": "label.suggestion.score", + "model": "label.suggestion.agent", + } + record_api_models = dataset.records._ingest_records( + records=[ + { + "my_prompt": "What is the capital of France?", + "label": "positive", + "score": 0.9, + "model": "model_name", + } + ], + mapping=mock_mapping, ) - + record = record_api_models[0] assert record.fields["prompt"] == "What is the capital of France?" - assert record.suggestions["label"].value == "positive" - - -def test_ingest_record_from_dict_with_suggestions(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - }, - ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - - -def test_ingest_record_from_dict_with_suggestions_scores(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "score": 0.9, - "model": "model_name", - }, - mapping={ - "score": "label.suggestion.score", - "model": "label.suggestion.agent", - }, - ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.suggestions["label"].score == 0.9 - assert record.suggestions["label"].agent == "model_name" - - -def test_ingest_record_from_dict_with_suggestions_scores_and_agent(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "score": 0.9, - "model": "model_name", - }, - mapping={ - "score": "label.suggestion.score", - "model": "label.suggestion.agent", - }, - ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.suggestions["label"].score == 0.9 - assert record.suggestions["label"].agent == "model_name" + assert record.suggestions[0].value == "positive" + assert record.suggestions[0].question_name == "label" + assert record.suggestions[0].score == 0.9 + assert record.suggestions[0].agent == "model_name" -def test_ingest_record_from_dict_with_responses(dataset): +def test_ingest_record_from_dict_with_mapped_responses(dataset): user_id = uuid4() - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - }, - mapping={ - "label": "label.response", - }, + mocked_mapping = { + "label": "label.response", + } + record_api_models = dataset.records._ingest_records( + records=[ + { + "prompt": "Hello World, how are you?", + "label": "negative", + } + ], + mapping=mocked_mapping, user_id=user_id, ) + record = record_api_models[0] assert record.fields["prompt"] == "Hello World, how are you?" - assert record.responses["label"][0].value == "negative" - assert record.responses["label"][0].user_id == user_id + assert record.responses[0].values["label"]["value"] == "negative" + assert record.responses[0].user_id == user_id def test_ingest_record_from_dict_with_id_as_id(dataset): record_id = uuid4() - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "id": record_id, - }, + record_api_models = dataset.records._ingest_records( + records=[ + { + "prompt": "Hello World, how are you?", + "label": "negative", + "id": record_id, + } + ], ) - + record = record_api_models[0] assert record.fields["prompt"] == "Hello World, how are you?" - assert record.id == record_id + assert record.external_id == record_id -def test_ingest_record_from_dict_with_id_and_mapping(dataset): +def test_ingest_record_from_dict_with_mapped_id(dataset): record_id = uuid4() - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "test_id": record_id, - }, - mapping={ - "test_id": "id", - }, + mock_mapping = { + "test_id": "id", + } + record_api_models = dataset.records._ingest_records( + records=[ + { + "prompt": "Hello World, how are you?", + "label": "negative", + "test_id": record_id, + } + ], + mapping=mock_mapping, ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.id == record_id - - -def test_ingest_record_from_dict_with_metadata(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "score": 0.9, - }, - ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.metadata["score"] == 0.9 - - -def test_ingest_record_from_dict_with_metadata_and_mapping(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "test_score": 0.9, - }, - mapping={ - "test_score": "score", - }, + record_model = record_api_models[0] + assert record_model.fields["prompt"] == "Hello World, how are you?" + assert record_model.external_id == record_id + + +def test_ingest_record_from_dict_with_mapped_metadata_vectors(dataset): + mock_mapping = { + "test_score": "score", + "test_vector": "vector", + } + record_api_models = dataset.records._ingest_records( + records=[ + { + "prompt": "Hello World, how are you?", + "label": "negative", + "test_score": 0.9, + "test_vector": [1, 2, 3], + } + ], + mapping=mock_mapping, ) + record = record_api_models[0] assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.metadata["score"] == 0.9 + assert record.suggestions[0].value == "negative" + assert record.vectors[0].vector_values == [1, 2, 3] + assert record.vectors[0].name == "vector" + assert record.metadata[0].value == 0.9 + assert record.metadata[0].name == "score" -def test_ingest_record_from_dict_with_vectors(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "vector": [1, 2, 3], - }, +def test_ingest_record_from_dict_with_mapping_multiple(): + settings = rg.Settings( + fields=[rg.TextField(name="prompt_field")], + questions=[ + rg.LabelQuestion(name="label", labels=["negative", "positive"]), + rg.TextQuestion(name="prompt_question"), + ], ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.vectors["vector"] == [1, 2, 3] - - -def test_ingest_record_from_dict_with_vectors_and_mapping(dataset): - record = dataset.records._infer_record_from_mapping( - data={ - "prompt": "Hello World, how are you?", - "label": "negative", - "test_vector": [1, 2, 3], - }, - mapping={ - "test_vector": "vector", - }, + workspace = rg.Workspace(name="workspace", id=uuid4()) + dataset = rg.Dataset( + name="test_dataset", + settings=settings, + workspace=workspace, ) - - assert record.fields["prompt"] == "Hello World, how are you?" - assert record.suggestions["label"].value == "negative" - assert record.vectors["vector"] == [1, 2, 3] + mapping = { + "my_prompt": ("prompt_field", "prompt_question"), + } + record_api_models = dataset.records._ingest_records( + records=[ + { + "my_prompt": "What is the capital of France?", + "label": "positive", + } + ], + mapping=mapping, + ) + record = record_api_models[0] + suggestions = [s.value for s in record.suggestions] + assert record.fields["prompt_field"] == "What is the capital of France?" + assert "positive" in suggestions + assert "What is the capital of France?" in suggestions From a416a2fc4ec2f792a743c11974315ced99d315b9 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Mon, 24 Jun 2024 20:30:45 +0200 Subject: [PATCH 02/32] refactor: introduce independent mapping method and move logic to before ingestion loop --- .../src/argilla/records/_dataset_records.py | 132 ++++++++++-------- 1 file changed, 75 insertions(+), 57 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index e6a641650d..db49b62a04 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -367,6 +367,7 @@ def _ingest_records( records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) if all(map(lambda r: isinstance(r, dict), records)): # Records as flat dicts of values to be matched to questions as suggestion or response + mapping = self._render_record_mapping(records=records, mapping=mapping) records = [self._infer_record_from_mapping(data=r, mapping=mapping, user_id=user_id) for r in records] # type: ignore elif all(map(lambda r: isinstance(r, Record), records)): for record in records: @@ -398,6 +399,43 @@ def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None: if vector_name not in self.__dataset.schema: raise ValueError(f"Vector field {vector_name} not found in dataset schema.") + def _render_record_mapping( + self, + records: List[Dict[str, Any]], + mapping: Optional[Dict[str, str]] = None, + ) -> Dict[str, List[str]]: + schema = self.__dataset.schema + mapping = mapping or {} + singular_mapping = {} + + # update the mapping with unmapped columns + for key, value in records[0].items(): + if key not in schema and key not in mapping: + warnings.warn( + message=f"Record attribute {key} is not in the schema so skipping.", + ) + if key not in mapping: + mapping[key] = key + + # create a singular mapping with destinations from the schema + for source_key, value in mapping.items(): + destinations = [] + _destinations = [value] if isinstance(value, str) else list(value) + + for attribute_mapping in _destinations: + attribute_mapping = attribute_mapping.split(".") + + attribute_name = attribute_mapping[0] + schema_item = schema.get(attribute_name) + attribute_type = attribute_mapping[1] if len(attribute_mapping) > 1 else None + sub_attribute = attribute_mapping[2] if len(attribute_mapping) > 2 else None + + destinations.append((attribute_name, attribute_type, sub_attribute, schema_item)) + + singular_mapping[source_key] = destinations + + return singular_mapping + def _infer_record_from_mapping( self, data: dict, @@ -412,76 +450,56 @@ def _infer_record_from_mapping( user_id: The user id to associate with the record responses. Returns: A Record object. + """ + record_id: Optional[str] = None fields: Dict[str, FieldValue] = {} vectors: Dict[str, VectorValue] = {} metadata: Dict[str, MetadataValue] = {} - responses: List[Response] = [] suggestion_values: Dict[str, dict] = defaultdict(dict) - schema = self.__dataset.schema - - for attribute, value in data.items(): - schema_item = schema.get(attribute) - attribute_type = None - sub_attribute = None - - # Map source data keys using the mapping - if mapping and attribute in mapping: - attribute_mapping = mapping.get(attribute) - attribute_mapping = attribute_mapping.split(".") - attribute = attribute_mapping[0] - schema_item = schema.get(attribute) - if len(attribute_mapping) > 1: - attribute_type = attribute_mapping[1] - if len(attribute_mapping) > 2: - sub_attribute = attribute_mapping[2] - elif schema_item is mapping is None and attribute != "id": - warnings.warn( - message=f"""Record attribute {attribute} is not in the schema so skipping. - Define a mapping to map source data fields to Argilla Fields, Questions, and ids - """ - ) - continue - - if attribute == "id": - record_id = value - continue - - # Add suggestion values to the suggestions - if attribute_type == "suggestion": - if sub_attribute in ["score", "agent"]: - suggestion_values[attribute][sub_attribute] = value - - elif sub_attribute is None: - suggestion_values[attribute].update( - {"value": value, "question_name": attribute, "question_id": schema_item.id} + for source_key, destinations in mapping.items(): + value = data.get(source_key) + for attribute_name, attribute_type, sub_attribute, schema_item in destinations: + if attribute_name == "id": + record_id = value + continue + # Add suggestion values to the suggestions + if attribute_type == "suggestion": + if sub_attribute in ["score", "agent"]: + suggestion_values[attribute_name][sub_attribute] = value + + elif sub_attribute is None: + suggestion_values[attribute_name].update( + {"value": value, "question_name": attribute_name, "question_id": schema_item.id} + ) + else: + warnings.warn( + message=f"Record attribute {sub_attribute} is not a valid suggestion sub_attribute so skipping." + ) + continue + + # Assign the value to question, field, or response based on schema item + if isinstance(schema_item, TextField): + fields[attribute_name] = value + elif isinstance(schema_item, QuestionPropertyBase) and attribute_type == "response": + responses.append(Response(question_name=attribute_name, value=value, user_id=user_id)) + elif isinstance(schema_item, QuestionPropertyBase) and attribute_type is None: + suggestion_values[attribute_name].update( + {"value": value, "question_name": attribute_name, "question_id": schema_item.id} ) + elif isinstance(schema_item, VectorField): + vectors[attribute_name] = value + elif isinstance(schema_item, MetadataPropertyBase): + metadata[attribute_name] = value else: warnings.warn( - message=f"Record attribute {sub_attribute} is not a valid suggestion sub_attribute so skipping." + message=f"Record attribute {attribute_name} is not in the schema or mapping so skipping." ) - continue - - # Assign the value to question, field, or response based on schema item - if isinstance(schema_item, TextField): - fields[attribute] = value - elif isinstance(schema_item, QuestionPropertyBase) and attribute_type == "response": - responses.append(Response(question_name=attribute, value=value, user_id=user_id)) - elif isinstance(schema_item, QuestionPropertyBase) and attribute_type is None: - suggestion_values[attribute].update( - {"value": value, "question_name": attribute, "question_id": schema_item.id} - ) - elif isinstance(schema_item, VectorField): - vectors[attribute] = value - elif isinstance(schema_item, MetadataPropertyBase): - metadata[attribute] = value - else: - warnings.warn(message=f"Record attribute {attribute} is not in the schema or mapping so skipping.") - continue + continue suggestions = [Suggestion(**suggestion_dict) for suggestion_dict in suggestion_values.values()] From 35db9f625642b7e529c2fc3667aa1cac09b90d96 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 25 Jun 2024 19:56:20 +0200 Subject: [PATCH 03/32] docs: update all doc strings in dataset records --- argilla/src/argilla/records/_dataset_records.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index db49b62a04..9d60bf7c69 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -361,6 +361,7 @@ def _ingest_records( mapping: Optional[Dict[str, str]] = None, user_id: Optional[UUID] = None, ) -> List[RecordModel]: + """ Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" if len(records) == 0: raise ValueError("No records provided to ingest.") if HFDatasetsIO._is_hf_dataset(dataset=records): @@ -404,6 +405,7 @@ def _render_record_mapping( records: List[Dict[str, Any]], mapping: Optional[Dict[str, str]] = None, ) -> Dict[str, List[str]]: + """ Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" schema = self.__dataset.schema mapping = mapping or {} singular_mapping = {} @@ -467,6 +469,7 @@ def _infer_record_from_mapping( if attribute_name == "id": record_id = value continue + # Add suggestion values to the suggestions if attribute_type == "suggestion": if sub_attribute in ["score", "agent"]: From eae088b4113923e89f4d4e082393fdeee6ef9823 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 25 Jun 2024 20:08:14 +0200 Subject: [PATCH 04/32] chore: improve typing and docs on type --- .../src/argilla/records/_dataset_records.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 9d60bf7c69..32aa377b85 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -14,7 +14,7 @@ import warnings from collections import defaultdict from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union from uuid import UUID from tqdm import tqdm @@ -208,7 +208,7 @@ def __repr__(self) -> str: def log( self, records: Union[List[dict], List[Record], HFDataset], - mapping: Optional[Dict[str, str]] = None, + mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, user_id: Optional[UUID] = None, batch_size: int = DEFAULT_BATCH_SIZE, ) -> "DatasetRecords": @@ -222,6 +222,7 @@ def log( If records are defined as a dictionaries or a dataset, the keys/ column names should correspond to the fields in the Argilla dataset's fields and questions. `id` should be provided to identify the records when updating. mapping: A dictionary that maps the keys/ column names in the records to the fields or questions in the Argilla dataset. + To assign an incoming key or column to multiple fields or questions, provide a list or tuple of field or question names. user_id: The user id to be associated with the records' response. If not provided, the current user id is used. batch_size: The number of records to send in each batch. The default is 256. @@ -358,18 +359,20 @@ def to_datasets(self) -> HFDataset: def _ingest_records( self, records: Union[List[Dict[str, Any]], Dict[str, Any], List[Record], Record, HFDataset], - mapping: Optional[Dict[str, str]] = None, + mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, user_id: Optional[UUID] = None, ) -> List[RecordModel]: - """ Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" + """Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" if len(records) == 0: raise ValueError("No records provided to ingest.") if HFDatasetsIO._is_hf_dataset(dataset=records): records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) if all(map(lambda r: isinstance(r, dict), records)): # Records as flat dicts of values to be matched to questions as suggestion or response - mapping = self._render_record_mapping(records=records, mapping=mapping) - records = [self._infer_record_from_mapping(data=r, mapping=mapping, user_id=user_id) for r in records] # type: ignore + rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) + records = [ + self._infer_record_from_mapping(data=r, mapping=rendered_mapping, user_id=user_id) for r in records + ] # type: ignore elif all(map(lambda r: isinstance(r, Record), records)): for record in records: record.dataset = self.__dataset @@ -403,9 +406,9 @@ def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None: def _render_record_mapping( self, records: List[Dict[str, Any]], - mapping: Optional[Dict[str, str]] = None, - ) -> Dict[str, List[str]]: - """ Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" + mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, + ) -> Dict[str, Tuple[Optional[str]]]: + """Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" schema = self.__dataset.schema mapping = mapping or {} singular_mapping = {} @@ -426,7 +429,7 @@ def _render_record_mapping( for attribute_mapping in _destinations: attribute_mapping = attribute_mapping.split(".") - + attribute_name = attribute_mapping[0] schema_item = schema.get(attribute_name) attribute_type = attribute_mapping[1] if len(attribute_mapping) > 1 else None @@ -440,8 +443,8 @@ def _render_record_mapping( def _infer_record_from_mapping( self, - data: dict, - mapping: Optional[Dict[str, str]] = None, + data: Dict[str, Any], + mapping: Dict[str, Tuple[Optional[str]]], user_id: Optional[UUID] = None, ) -> "Record": """Converts a mapped record dictionary to a Record object for use by the add or update methods. @@ -469,7 +472,7 @@ def _infer_record_from_mapping( if attribute_name == "id": record_id = value continue - + # Add suggestion values to the suggestions if attribute_type == "suggestion": if sub_attribute in ["score", "agent"]: From 4490d1144f111684e39e4b6de3365551869f0d10 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 25 Jun 2024 20:08:36 +0200 Subject: [PATCH 05/32] docs: wrong method in records api reference --- argilla/docs/reference/argilla/records/records.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argilla/docs/reference/argilla/records/records.md b/argilla/docs/reference/argilla/records/records.md index d4daee9966..fc930c21d8 100644 --- a/argilla/docs/reference/argilla/records/records.md +++ b/argilla/docs/reference/argilla/records/records.md @@ -12,7 +12,7 @@ The `Record` object is used to represent a single record in Argilla. It contains To create records, you can use the `Record` class and pass it to the `Dataset.records.log` method. The `Record` class requires a `fields` parameter, which is a dictionary of field names and values. The field names must match the field names in the dataset's `Settings` object to be accepted. ```python -dataset.records.add( +dataset.records.log( records=[ rg.Record( fields={"text": "Hello World, how are you?"}, From b5b33961802f87918a3a46c9fa4404f848337f84 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 26 Jun 2024 09:09:59 +0200 Subject: [PATCH 06/32] feat: add exception for record ingestion --- argilla/src/argilla/_exceptions/__init__.py | 1 + argilla/src/argilla/_exceptions/_records.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 argilla/src/argilla/_exceptions/_records.py diff --git a/argilla/src/argilla/_exceptions/__init__.py b/argilla/src/argilla/_exceptions/__init__.py index 298688769d..da00d1f34a 100644 --- a/argilla/src/argilla/_exceptions/__init__.py +++ b/argilla/src/argilla/_exceptions/__init__.py @@ -16,3 +16,4 @@ from argilla._exceptions._metadata import * # noqa: F403 from argilla._exceptions._serialization import * # noqa: F403 from argilla._exceptions._settings import * # noqa: F403 +from argilla._exceptions._records import * # noqa: F403 diff --git a/argilla/src/argilla/_exceptions/_records.py b/argilla/src/argilla/_exceptions/_records.py new file mode 100644 index 0000000000..b3b5008e2f --- /dev/null +++ b/argilla/src/argilla/_exceptions/_records.py @@ -0,0 +1,18 @@ +# Copyright 2024-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from argilla._exceptions._base import ArgillaErrorBase + +class RecordsIngestionError(ArgillaErrorBase): + pass \ No newline at end of file From ffeb0b0e028293afc2f73985c3ea11bb25230b07 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 26 Jun 2024 09:11:27 +0200 Subject: [PATCH 07/32] refactor: improve explainabilitity and readability in ingestion code with refactor and tqdm and exceptions --- .../src/argilla/records/_dataset_records.py | 190 +++++++++++------- 1 file changed, 119 insertions(+), 71 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 32aa377b85..ac33c78430 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -21,7 +21,8 @@ from argilla._api import RecordsAPI from argilla._helpers import LoggingMixin -from argilla._models import RecordModel, MetadataValue, VectorValue, FieldValue +from argilla._models import RecordModel +from argilla._exceptions import RecordsIngestionError from argilla.client import Argilla from argilla.records._io import GenericIO, HFDataset, HFDatasetsIO, JsonIO from argilla.records._resource import Record @@ -240,7 +241,7 @@ def log( created_or_updated = [] records_updated = 0 for batch in tqdm( - iterable=range(0, len(records), batch_size), desc="Adding and updating records", unit="batch" + iterable=range(0, len(records), batch_size), desc="2/2: Adding and updating records", unit="batch" ): self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.") batch_records = record_models[batch : batch + batch_size] @@ -363,25 +364,31 @@ def _ingest_records( user_id: Optional[UUID] = None, ) -> List[RecordModel]: """Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" + if len(records) == 0: raise ValueError("No records provided to ingest.") + if HFDatasetsIO._is_hf_dataset(dataset=records): records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) - if all(map(lambda r: isinstance(r, dict), records)): - # Records as flat dicts of values to be matched to questions as suggestion or response - rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) - records = [ - self._infer_record_from_mapping(data=r, mapping=rendered_mapping, user_id=user_id) for r in records - ] # type: ignore - elif all(map(lambda r: isinstance(r, Record), records)): - for record in records: - record.dataset = self.__dataset - else: - raise ValueError( - "Records should be a a list Record instances, " - "a Hugging Face Dataset, or a list of dictionaries representing the records." - ) - return [record.api_model() for record in records] + + ingested_records = [] + for record in tqdm(records, desc="1/2: Ingesting records", unit="record"): + try: + if not isinstance(record, Record): + rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) + record = self._infer_record_from_mapping(data=record, mapping=rendered_mapping, user_id=user_id) + elif isinstance(record, Record): + record.dataset = self.__dataset + else: + raise ValueError( + "Records should be a a list Record instances, " + "a Hugging Face Dataset, or a list of dictionaries representing the records." + f"Found a record of type {type(record)}: {record}." + ) + except Exception as e: + raise RecordsIngestionError(f"Failed to ingest record from dict {record}: {e}") + ingested_records.append(record.api_model()) + return ingested_records def _normalize_batch_size(self, batch_size: int, records_length, max_value: int): norm_batch_size = min(batch_size, records_length, max_value) @@ -411,7 +418,7 @@ def _render_record_mapping( """Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" schema = self.__dataset.schema mapping = mapping or {} - singular_mapping = {} + singular_mapping = defaultdict(dict) # update the mapping with unmapped columns for key, value in records[0].items(): @@ -424,10 +431,9 @@ def _render_record_mapping( # create a singular mapping with destinations from the schema for source_key, value in mapping.items(): - destinations = [] - _destinations = [value] if isinstance(value, str) else list(value) + destinations = [value] if isinstance(value, str) else list(value) - for attribute_mapping in _destinations: + for attribute_mapping in destinations: attribute_mapping = attribute_mapping.split(".") attribute_name = attribute_mapping[0] @@ -435,9 +441,32 @@ def _render_record_mapping( attribute_type = attribute_mapping[1] if len(attribute_mapping) > 1 else None sub_attribute = attribute_mapping[2] if len(attribute_mapping) > 2 else None - destinations.append((attribute_name, attribute_type, sub_attribute, schema_item)) + # Assign the value to question, field, or response based on schema item + if attribute_name == "id": + attribute_type = "id" + elif isinstance(schema_item, TextField): + attribute_type = "field" + elif isinstance(schema_item, QuestionPropertyBase) and attribute_type == "response": + attribute_type = "response" + elif ( + isinstance(schema_item, QuestionPropertyBase) + and attribute_type is None + or attribute_type == "suggestion" + ): + attribute_type = "suggestion" + sub_attribute = sub_attribute or "value" + attribute_name = (attribute_name, sub_attribute) + elif isinstance(schema_item, VectorField): + attribute_type = "vector" + elif isinstance(schema_item, MetadataPropertyBase): + attribute_type = "metadata" + else: + warnings.warn( + message=f"Record attribute {attribute_name} is not in the schema or mapping so skipping." + ) + continue - singular_mapping[source_key] = destinations + singular_mapping[attribute_type][attribute_name] = source_key return singular_mapping @@ -451,63 +480,33 @@ def _infer_record_from_mapping( Args: dataset: The dataset object to which the record belongs. data: A dictionary representing the record. - mapping: A dictionary mapping source data keys to Argilla fields, questions, and ids. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. user_id: The user id to associate with the record responses. Returns: A Record object. """ - record_id: Optional[str] = None + id_mapping = mapping.get("id", {}) + suggestion_mapping = mapping.get("suggestion", {}) + response_mapping = mapping.get("response", {}) + field_mapping = mapping.get("field", {}) + metadata_mapping = mapping.get("metadata", {}) + vector_mapping = mapping.get("vector", {}) - fields: Dict[str, FieldValue] = {} - vectors: Dict[str, VectorValue] = {} - metadata: Dict[str, MetadataValue] = {} - responses: List[Response] = [] - suggestion_values: Dict[str, dict] = defaultdict(dict) + if "id" in id_mapping: + record_id = data.get(id_mapping["id"]) + else: + record_id = None - for source_key, destinations in mapping.items(): - value = data.get(source_key) - for attribute_name, attribute_type, sub_attribute, schema_item in destinations: - if attribute_name == "id": - record_id = value - continue + # Parse suggestions and responses into objects aligned with questions + suggestions = self._parse_suggestion_from_mapping(data=data, mapping=suggestion_mapping) + responses = self._parse_response_from_mapping(data=data, mapping=response_mapping, user_id=user_id) - # Add suggestion values to the suggestions - if attribute_type == "suggestion": - if sub_attribute in ["score", "agent"]: - suggestion_values[attribute_name][sub_attribute] = value - - elif sub_attribute is None: - suggestion_values[attribute_name].update( - {"value": value, "question_name": attribute_name, "question_id": schema_item.id} - ) - else: - warnings.warn( - message=f"Record attribute {sub_attribute} is not a valid suggestion sub_attribute so skipping." - ) - continue - - # Assign the value to question, field, or response based on schema item - if isinstance(schema_item, TextField): - fields[attribute_name] = value - elif isinstance(schema_item, QuestionPropertyBase) and attribute_type == "response": - responses.append(Response(question_name=attribute_name, value=value, user_id=user_id)) - elif isinstance(schema_item, QuestionPropertyBase) and attribute_type is None: - suggestion_values[attribute_name].update( - {"value": value, "question_name": attribute_name, "question_id": schema_item.id} - ) - elif isinstance(schema_item, VectorField): - vectors[attribute_name] = value - elif isinstance(schema_item, MetadataPropertyBase): - metadata[attribute_name] = value - else: - warnings.warn( - message=f"Record attribute {attribute_name} is not in the schema or mapping so skipping." - ) - continue - - suggestions = [Suggestion(**suggestion_dict) for suggestion_dict in suggestion_values.values()] + # Parse fields, metadata, and vectors into + fields = {attribute_name: data.get(source_key) for attribute_name, source_key in field_mapping.items()} + metadata = {attribute_name: data.get(source_key) for attribute_name, source_key in metadata_mapping.items()} + vectors = {attribute_name: data.get(source_key) for attribute_name, source_key in vector_mapping.items()} return Record( id=record_id, @@ -518,3 +517,52 @@ def _infer_record_from_mapping( responses=responses, _dataset=self.__dataset, ) + + def _parse_suggestion_from_mapping( + self, data: Dict[str, Any], mapping: Dict[str, Tuple[Optional[str]]] + ) -> List[Suggestion]: + """Converts a mapped suggestion dictionary to a Suggestion object for use by the add or update methods. + Suggestions can be defined across multiple source values and mapped to single questions. + Args: + data: A dictionary representing the suggestion. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. + Returns: + A Suggestion object. + + """ + suggestion_values = defaultdict(dict) + + for (attribute_name, sub_attribute), source_key in mapping.items(): + value = data.get(source_key) + schema_item = self.__dataset.schema.get(attribute_name) + suggestion_values[attribute_name].update( + {sub_attribute: value, "question_name": attribute_name, "question_id": schema_item.id} + ) + + suggestions = [Suggestion(**suggestion_dict) for suggestion_dict in suggestion_values.values()] + + return suggestions + + def _parse_response_from_mapping( + self, data: Dict[str, Any], mapping: Dict[str, Tuple[Optional[str]]], user_id: UUID + ) -> List[Response]: + """Converts a mapped response dictionary to a Response object for use by the add or update methods. + Args: + data: A dictionary representing the response. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. + user_id: The user id to associate with the record responses. + Returns: + A Response object. + + """ + responses = [] + + for attribute_name, source_key in mapping.items(): + response = Response( + value=data.get(source_key), + question_name=attribute_name, + user_id=user_id, + ) + responses.append(response) + + return responses From 594283e650a1bb4f3b2f42b5b40c018b857dc077 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 26 Jun 2024 07:13:20 +0000 Subject: [PATCH 08/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/_exceptions/_records.py | 3 ++- argilla/src/argilla/records/_dataset_records.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/argilla/src/argilla/_exceptions/_records.py b/argilla/src/argilla/_exceptions/_records.py index b3b5008e2f..ec7f045b82 100644 --- a/argilla/src/argilla/_exceptions/_records.py +++ b/argilla/src/argilla/_exceptions/_records.py @@ -14,5 +14,6 @@ from argilla._exceptions._base import ArgillaErrorBase + class RecordsIngestionError(ArgillaErrorBase): - pass \ No newline at end of file + pass diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index ac33c78430..dd533d13b4 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -364,10 +364,10 @@ def _ingest_records( user_id: Optional[UUID] = None, ) -> List[RecordModel]: """Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" - + if len(records) == 0: raise ValueError("No records provided to ingest.") - + if HFDatasetsIO._is_hf_dataset(dataset=records): records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) From 16f14d1562bbfda0c29b825a0f18085dfe901d44 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 26 Jun 2024 09:33:47 +0200 Subject: [PATCH 09/32] enhancement: move mapping out of record loop --- argilla/src/argilla/records/_dataset_records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index ac33c78430..e44e503138 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -372,10 +372,10 @@ def _ingest_records( records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) ingested_records = [] + rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) for record in tqdm(records, desc="1/2: Ingesting records", unit="record"): try: if not isinstance(record, Record): - rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) record = self._infer_record_from_mapping(data=record, mapping=rendered_mapping, user_id=user_id) elif isinstance(record, Record): record.dataset = self.__dataset From db08548f8f959adb9860c762781ca727cdcc0ba4 Mon Sep 17 00:00:00 2001 From: Paco Aranda Date: Thu, 27 Jun 2024 10:43:34 +0200 Subject: [PATCH 10/32] [REFACTOR] Avoid autofetch when accessing settings (#5112) Reviewing and improving records.log Instead of: Captura de pantalla 2024-06-26 a las 12 48 14 for 50 records, records.log can log 1000: Captura de pantalla 2024-06-26 a las 12 48 57 --- argilla/src/argilla/client.py | 2 +- argilla/src/argilla/datasets/_resource.py | 9 ++++++--- argilla/tests/unit/test_resources/test_datasets.py | 1 + 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/argilla/src/argilla/client.py b/argilla/src/argilla/client.py index db2aa6d202..437a94d156 100644 --- a/argilla/src/argilla/client.py +++ b/argilla/src/argilla/client.py @@ -271,7 +271,7 @@ def __call__(self, name: str, workspace: Optional[Union["Workspace", str]] = Non for dataset in workspace.datasets: if dataset.name == name: - return dataset + return dataset.get() warnings.warn(f"Dataset {name} not found. Creating a new dataset. Do `dataset.create()` to create the dataset.") return Dataset(name=name, workspace=workspace, client=self._client, **kwargs) diff --git a/argilla/src/argilla/datasets/_resource.py b/argilla/src/argilla/datasets/_resource.py index e09cd3baea..4cc9562b36 100644 --- a/argilla/src/argilla/datasets/_resource.py +++ b/argilla/src/argilla/datasets/_resource.py @@ -101,8 +101,6 @@ def records(self) -> "DatasetRecords": @property def settings(self) -> Settings: - if self._is_published() and self._settings.is_outdated: - self._settings.get() return self._settings @settings.setter @@ -142,6 +140,11 @@ def schema(self) -> dict: # Core methods # ##################### + def get(self) -> "Dataset": + super().get() + self.settings.get() + return self + def exists(self) -> bool: """Checks if the dataset exists on the server @@ -185,7 +188,7 @@ def _publish(self) -> "Dataset": self._settings.create() self._api.publish(dataset_id=self._model.id) - return self.get() # type: ignore + return self.get() def _workspace_id_from_name(self, workspace: Optional[Union["Workspace", str]]) -> UUID: if workspace is None: diff --git a/argilla/tests/unit/test_resources/test_datasets.py b/argilla/tests/unit/test_resources/test_datasets.py index b4bfa3b386..868de40a25 100644 --- a/argilla/tests/unit/test_resources/test_datasets.py +++ b/argilla/tests/unit/test_resources/test_datasets.py @@ -73,6 +73,7 @@ def dataset(httpx_mock: HTTPXMock) -> rg.Dataset: yield dataset +@pytest.mark.skip(reason="HTTP mocked calls must be updated") class TestDatasets: def url(self, path: str) -> str: return f"http://test_url{path}" From 05df51a8274488f8c0710e707261c663ead96b94 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 27 Jun 2024 11:35:47 +0200 Subject: [PATCH 11/32] enhancement: use just one progress bar --- argilla/src/argilla/records/_dataset_records.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 3a91828116..68490193ab 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -240,8 +240,12 @@ def log( created_or_updated = [] records_updated = 0 + for batch in tqdm( - iterable=range(0, len(records), batch_size), desc="2/2: Adding and updating records", unit="batch" + iterable=range(0, len(records), batch_size), + desc="Sending records...", + total=len(records) // batch_size, + unit="batch", ): self._log_message(message=f"Sending records from {batch} to {batch + batch_size}.") batch_records = record_models[batch : batch + batch_size] @@ -373,7 +377,8 @@ def _ingest_records( ingested_records = [] rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) - for record in tqdm(records, desc="1/2: Ingesting records", unit="record"): + + for record in records: try: if not isinstance(record, Record): record = self._infer_record_from_mapping(data=record, mapping=rendered_mapping, user_id=user_id) From 863dde24a374b36f78f3881b6f9562131de4e51f Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 27 Jun 2024 11:46:04 +0200 Subject: [PATCH 12/32] chore: update typing of mapping --- argilla/src/argilla/records/_dataset_records.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 68490193ab..9e7c11a707 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -209,7 +209,7 @@ def __repr__(self) -> str: def log( self, records: Union[List[dict], List[Record], HFDataset], - mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, + mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, user_id: Optional[UUID] = None, batch_size: int = DEFAULT_BATCH_SIZE, ) -> "DatasetRecords": @@ -364,7 +364,7 @@ def to_datasets(self) -> HFDataset: def _ingest_records( self, records: Union[List[Dict[str, Any]], Dict[str, Any], List[Record], Record, HFDataset], - mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, + mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, user_id: Optional[UUID] = None, ) -> List[RecordModel]: """Ingests records from a list of dictionaries, a Hugging Face Dataset, or a list of Record objects.""" @@ -418,7 +418,7 @@ def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None: def _render_record_mapping( self, records: List[Dict[str, Any]], - mapping: Optional[Dict[str, Union[str, List[str], Tuple[str]]]] = None, + mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, ) -> Dict[str, Tuple[Optional[str]]]: """Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" schema = self.__dataset.schema From bf9e864342ad777a0988e49ee63d87370562f2ab Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 27 Jun 2024 12:01:54 +0200 Subject: [PATCH 13/32] fix: move render mapping into infer record method --- argilla/src/argilla/records/_dataset_records.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 9e7c11a707..2302866641 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -376,12 +376,11 @@ def _ingest_records( records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) ingested_records = [] - rendered_mapping = self._render_record_mapping(records=records, mapping=mapping) for record in records: try: if not isinstance(record, Record): - record = self._infer_record_from_mapping(data=record, mapping=rendered_mapping, user_id=user_id) + record = self._infer_record_from_mapping(data=record, mapping=mapping, user_id=user_id) elif isinstance(record, Record): record.dataset = self.__dataset else: @@ -491,7 +490,8 @@ def _infer_record_from_mapping( A Record object. """ - + + mapping = self._render_record_mapping(records=records, mapping=mapping) id_mapping = mapping.get("id", {}) suggestion_mapping = mapping.get("suggestion", {}) response_mapping = mapping.get("response", {}) From 07aa2497406fa9b3c981b4901ab6437fb0513b41 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:03:26 +0000 Subject: [PATCH 14/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/records/_dataset_records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 2302866641..df5e65f1db 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -490,7 +490,7 @@ def _infer_record_from_mapping( A Record object. """ - + mapping = self._render_record_mapping(records=records, mapping=mapping) id_mapping = mapping.get("id", {}) suggestion_mapping = mapping.get("suggestion", {}) From 8a6d484ea7ef252bb69925c2e7e47c326b0943c4 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 27 Jun 2024 13:07:46 +0200 Subject: [PATCH 15/32] fix: align add records parameters with render function --- argilla/src/argilla/records/_dataset_records.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index df5e65f1db..251a191c69 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -416,7 +416,7 @@ def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None: def _render_record_mapping( self, - records: List[Dict[str, Any]], + data: Dict[str, Any], mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, ) -> Dict[str, Tuple[Optional[str]]]: """Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" @@ -425,7 +425,7 @@ def _render_record_mapping( singular_mapping = defaultdict(dict) # update the mapping with unmapped columns - for key, value in records[0].items(): + for key, value in data.items(): if key not in schema and key not in mapping: warnings.warn( message=f"Record attribute {key} is not in the schema so skipping.", @@ -491,7 +491,7 @@ def _infer_record_from_mapping( """ - mapping = self._render_record_mapping(records=records, mapping=mapping) + mapping = self._render_record_mapping(data=data, mapping=mapping) id_mapping = mapping.get("id", {}) suggestion_mapping = mapping.get("suggestion", {}) response_mapping = mapping.get("response", {}) From 0b623fd3c9abe6b83cf12eb90774fb3dfc33830f Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 2 Jul 2024 15:21:42 +0200 Subject: [PATCH 16/32] feat: implement ingestion mapping as class --- argilla/src/argilla/records/_mapping.py | 326 ++++++++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 argilla/src/argilla/records/_mapping.py diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py new file mode 100644 index 0000000000..a3cf96a370 --- /dev/null +++ b/argilla/src/argilla/records/_mapping.py @@ -0,0 +1,326 @@ +# Copyright 2024-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union +from uuid import UUID + +from pydantic import BaseModel + +from argilla.records._resource import Record +from argilla.responses import Response +from argilla.settings import TextField, VectorField +from argilla.settings._metadata import MetadataPropertyBase +from argilla.settings._question import QuestionPropertyBase +from argilla.suggestions import Suggestion + +if TYPE_CHECKING: + from argilla.datasets import Dataset + + +class ParameterType(str, Enum): + """Parameter types are the different 'sub' values of a records attribute. + For example, the value, score, or agent of a suggestion.""" + + VALUE = "value" + SCORE = "score" + AGENT = "agent" + + +class AttributeType(str, Enum): + """Attribute types are the different types of attributes a record can have.""" + + FIELD = "field" + SUGGESTION = "suggestion" + RESPONSE = "response" + METADATA = "metadata" + VECTOR = "vector" + ID = "id" + + +class AttributeParameter(BaseModel): + """Attribute parameters are the different 'sub' values of a records attribute. + And the source in the data that the parameter is coming from. + """ + + parameter: ParameterType = ParameterType.VALUE + source: str + + +class AttributeRoute(BaseModel): + """AttributeRoute is a representation of a record attribute that is mapped to a source value in the data.""" + + source: str + name: str + type: Optional[AttributeType] = None + parameters: List[AttributeParameter] = [] + + @property + def has_value(self) -> bool: + """All attributes must have a value parameter to be valid.""" + return any(param.parameter == ParameterType.VALUE for param in self.parameters) + + +class RecordAttributesMap(BaseModel): + """RecordAttributesMap is a representation of a record attribute mapping that is used to parse data into a record.""" + + suggestion: Dict[str, AttributeRoute] + response: Dict[str, AttributeRoute] + field: Dict[str, AttributeRoute] + metadata: Dict[str, AttributeRoute] + vector: Dict[str, AttributeRoute] + id: Dict[str, AttributeRoute] + + +class IngestedRecordMapper: + """IngestedRecordMapper is a class that is used to map data into a record object. + It maps values in ingested data to the appropriate record attributes, based on the user provided mapping and the schema of the dataset. + + Attributes: + dataset: The dataset the record will be added to. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. + user_id: The user id to associate with the record responses. + """ + + def __init__( + self, + dataset: "Dataset", + user_id: UUID, + mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, + ): + self._dataset = dataset + self._schema = dataset.schema + self.user_id = user_id + + mapping = mapping or {} + _mapping = self._schematize_mapped_attributes(mapping=mapping) + _mapping = self._schematize_default_attributes(mapping=_mapping) + self.mapping: RecordAttributesMap = _mapping + + def __call__(self, data: Dict[str, Any], user_id: Optional[UUID] = None) -> Record: + """Maps a dictionary of data to a record object. + + Parameters: + data: A dictionary representing the record. + user_id: The user id to associate with the record responses. + + Returns: + Record: The record object. + + """ + + record_id = data.get(self.mapping.id["id"].source) + suggestions = self._map_suggestions(data=data, mapping=self.mapping.suggestion) + responses = self._map_responses(data=data, user_id=user_id or self.user_id, mapping=self.mapping.response) + fields = self._map_attributes(data=data, mapping=self.mapping.field) + metadata = self._map_attributes(data=data, mapping=self.mapping.metadata) + vectors = self._map_attributes(data=data, mapping=self.mapping.vector) + + return Record( + id=record_id, + fields=fields, + vectors=vectors, + metadata=metadata, + suggestions=suggestions, + responses=responses, + _dataset=self._dataset, + ) + + ########################################## + # Private helper functions - Build Mapping + ########################################## + + def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[str]]]) -> RecordAttributesMap: + """Converts a mapping dictionary to a schematized mapping object.""" + schematized_map = { + "suggestion": {}, + "response": {}, + "field": {}, + "metadata": {}, + "vector": {}, + "id": {}, + } + for source_key, value in mapping.items(): + mapped_attributes = [value] if isinstance(value, str) else list(value) + for attribute_mapping in mapped_attributes: + + # Split the attribute mapping into its parts based on the '.' delimiter and create an AttributeRoute object. + attribute_mapping = attribute_mapping.split(".") + attribute_name = attribute_mapping[0] + schema_item = self._schema.get(attribute_name) + type_ = AttributeType(attribute_mapping[1]) if len(attribute_mapping) > 1 else None + parameter = ParameterType(attribute_mapping[2]) if len(attribute_mapping) > 2 else ParameterType.VALUE + attribute_route = AttributeRoute( + source=source_key, + name=attribute_name, + type=type_, + parameters=[AttributeParameter(parameter=parameter, source=source_key)], + ) + attribute_route = self._select_attribute_type(attribute=attribute_route, schema_item=schema_item) + + # Add the attribute route to the schematized map based on the attribute type. + if attribute_route.name in schematized_map[attribute_route.type]: + # Some attributes may be mapped to multiple source values, so we need to append the parameters. + schematized_map[attribute_route.type][attribute_route.name].parameters.extend( + attribute_route.parameters + ) + else: + schematized_map[attribute_route.type][attribute_route.name] = attribute_route + + return RecordAttributesMap(**schematized_map) + + def _select_attribute_type(self, attribute, schema_item: Optional[object] = None): + """Selects the attribute type based on the schema item and the attribute type. + This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. + If the attribute type is not provided, it will be inferred based on the schema item. + """ + if isinstance(schema_item, QuestionPropertyBase) and ( + attribute.type is None or attribute.type == AttributeType.SUGGESTION + ): + # Suggestions are the default destination for questions. + attribute.type = AttributeType.SUGGESTION + elif isinstance(schema_item, QuestionPropertyBase) and attribute.type == AttributeType.RESPONSE: + attribute.type = AttributeType.RESPONSE + elif isinstance(schema_item, TextField): + attribute.type = AttributeType.FIELD + elif isinstance(schema_item, VectorField): + attribute.type = AttributeType.VECTOR + elif isinstance(schema_item, MetadataPropertyBase): + attribute.type = AttributeType.METADATA + elif attribute.name == "id": + attribute.type = AttributeType.ID + else: + warnings.warn(message=f"Record attribute {attribute.name} is not in the schema or mapping so skipping.") + return attribute + + def _schematize_default_attributes(self, mapping: RecordAttributesMap) -> RecordAttributesMap: + """ Updates the mapping with default attributes that are not provided in the mapping. + Uses the schema of the dataset to infer the default attributes and add them to the mapping. + + Parameters: + mapping: The mapping object to update with default attributes. + + Returns: + RecordAttributesMap: The updated mapping object. + """ + + if len(mapping.id) == 0: + # If the id is not provided in the mapping, we will map the 'id' key to the 'id' attribute. + mapping.id["id"] = AttributeRoute(source="id", name="id", type=AttributeType.ID) + + # Map keys that match question names to the suggestion attribute type. + for question in self._dataset.settings.questions: + if question.name not in mapping.suggestion: + mapping.suggestion[question.name] = AttributeRoute( + source=question.name, + name=question.name, + type=AttributeType.SUGGESTION, + parameters=[AttributeParameter(source=question.name)], + ) + + elif not mapping.suggestion[question.name].has_value: + mapping.suggestion[question.name].parameters.append(AttributeParameter(source=question.name)) + + for field in self._dataset.settings.fields: + if field.name not in mapping.field: + mapping.field[field.name] = AttributeRoute( + source=field.name, + name=field.name, + type=AttributeType.FIELD, + ) + + for metadata in self._dataset.settings.metadata: + if metadata.name not in mapping.metadata: + mapping.metadata[metadata.name] = AttributeRoute( + source=metadata.name, + name=metadata.name, + type=AttributeType.METADATA, + ) + + for vector in self._dataset.settings.vectors: + if vector.name not in mapping.vector: + mapping.vector[vector.name] = AttributeRoute( + source=vector.name, + name=vector.name, + type=AttributeType.VECTOR, + ) + + return mapping + + ########################################## + # Private helper functions - Parse Records + ########################################## + + def _map_suggestions(self, data: Dict[str, Any], mapping) -> List[Suggestion]: + """Converts an arbitrary dictionary to a list of Suggestion objects for use by the add or update methods. + Suggestions can be defined accross multiple columns in the data, so we need to map them to the appropriately.add() + + Parameters: + data: A dictionary representing the vector. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. + + Returns: + A list of Suggestion objects. + + """ + suggestions = [] + + for name, route in mapping.items(): + if route.source not in data: + continue + parameters = {param.parameter: data.get(param.source) for param in route.parameters} + schema_item = self._dataset.schema.get(name) + suggestion = Suggestion( + **parameters, + question_name=route.name, + question_id=schema_item.id, + ) + suggestions.append(suggestion) + + return suggestions + + def _map_responses(self, data: Dict[str, Any], user_id: UUID, mapping) -> List[Response]: + """Converts an arbitrary dictionary to a list of Response objects for use by the add or update methods. + + Parameters: + data: A dictionary representing the vector. + mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. + user_id: The user id to associate with the record responses. + + Returns: + A list of Response objects. + """ + responses = [] + + for name, route in mapping.items(): + response = Response( + value=data.get(route.source), + question_name=name, + user_id=user_id, + ) + responses.append(response) + + return responses + + def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRoute]) -> Dict[str, Any]: + """Converts a dictionary to a dictionary of attributes for use by the add or update methods.""" + attributes = {} + for name, route in mapping.items(): + if route.source not in data: + continue + value = data.get(route.source) + if value is not None: + attributes[name] = value + return attributes From 14faccf0b64b855af2a3318c80eb5556fd5a39ec Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 2 Jul 2024 15:23:02 +0200 Subject: [PATCH 17/32] feat: use ingestion mapping class in dataset records not dataset records --- .../src/argilla/records/_dataset_records.py | 180 +----------------- 1 file changed, 8 insertions(+), 172 deletions(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 251a191c69..1359057179 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -11,10 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import warnings -from collections import defaultdict + from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Union from uuid import UUID from tqdm import tqdm @@ -25,13 +24,9 @@ from argilla._exceptions import RecordsIngestionError from argilla.client import Argilla from argilla.records._io import GenericIO, HFDataset, HFDatasetsIO, JsonIO +from argilla.records._mapping import IngestedRecordMapper from argilla.records._resource import Record from argilla.records._search import Query -from argilla.responses import Response -from argilla.settings import TextField, VectorField -from argilla.settings._metadata import MetadataPropertyBase -from argilla.settings._question import QuestionPropertyBase -from argilla.suggestions import Suggestion if TYPE_CHECKING: from argilla.datasets import Dataset @@ -189,8 +184,8 @@ def __call__( self._validate_vector_names(vector_names=with_vectors) return DatasetRecordsIterator( - self.__dataset, - self.__client, + dataset=self.__dataset, + client=self.__client, query=query, batch_size=batch_size, start_offset=start_offset, @@ -229,7 +224,6 @@ def log( Returns: A list of Record objects representing the updated records. - """ record_models = self._ingest_records(records=records, mapping=mapping, user_id=user_id or self.__client.me.id) batch_size = self._normalize_batch_size( @@ -363,7 +357,7 @@ def to_datasets(self) -> HFDataset: def _ingest_records( self, - records: Union[List[Dict[str, Any]], Dict[str, Any], List[Record], Record, HFDataset], + records: Union[List[Dict[str, Any]], List[Record], HFDataset], mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, user_id: Optional[UUID] = None, ) -> List[RecordModel]: @@ -376,11 +370,11 @@ def _ingest_records( records = HFDatasetsIO._record_dicts_from_datasets(dataset=records) ingested_records = [] - + record_mapper = IngestedRecordMapper(mapping=mapping, dataset=self.__dataset, user_id=user_id) for record in records: try: if not isinstance(record, Record): - record = self._infer_record_from_mapping(data=record, mapping=mapping, user_id=user_id) + record = record_mapper(data=record) elif isinstance(record, Record): record.dataset = self.__dataset else: @@ -413,161 +407,3 @@ def _validate_vector_names(self, vector_names: Union[List[str], str]) -> None: continue if vector_name not in self.__dataset.schema: raise ValueError(f"Vector field {vector_name} not found in dataset schema.") - - def _render_record_mapping( - self, - data: Dict[str, Any], - mapping: Optional[Dict[str, Union[str, Sequence[str]]]] = None, - ) -> Dict[str, Tuple[Optional[str]]]: - """Renders a mapping from a list of records and a mapping dictionary, to a singular mapping dictionary.""" - schema = self.__dataset.schema - mapping = mapping or {} - singular_mapping = defaultdict(dict) - - # update the mapping with unmapped columns - for key, value in data.items(): - if key not in schema and key not in mapping: - warnings.warn( - message=f"Record attribute {key} is not in the schema so skipping.", - ) - if key not in mapping: - mapping[key] = key - - # create a singular mapping with destinations from the schema - for source_key, value in mapping.items(): - destinations = [value] if isinstance(value, str) else list(value) - - for attribute_mapping in destinations: - attribute_mapping = attribute_mapping.split(".") - - attribute_name = attribute_mapping[0] - schema_item = schema.get(attribute_name) - attribute_type = attribute_mapping[1] if len(attribute_mapping) > 1 else None - sub_attribute = attribute_mapping[2] if len(attribute_mapping) > 2 else None - - # Assign the value to question, field, or response based on schema item - if attribute_name == "id": - attribute_type = "id" - elif isinstance(schema_item, TextField): - attribute_type = "field" - elif isinstance(schema_item, QuestionPropertyBase) and attribute_type == "response": - attribute_type = "response" - elif ( - isinstance(schema_item, QuestionPropertyBase) - and attribute_type is None - or attribute_type == "suggestion" - ): - attribute_type = "suggestion" - sub_attribute = sub_attribute or "value" - attribute_name = (attribute_name, sub_attribute) - elif isinstance(schema_item, VectorField): - attribute_type = "vector" - elif isinstance(schema_item, MetadataPropertyBase): - attribute_type = "metadata" - else: - warnings.warn( - message=f"Record attribute {attribute_name} is not in the schema or mapping so skipping." - ) - continue - - singular_mapping[attribute_type][attribute_name] = source_key - - return singular_mapping - - def _infer_record_from_mapping( - self, - data: Dict[str, Any], - mapping: Dict[str, Tuple[Optional[str]]], - user_id: Optional[UUID] = None, - ) -> "Record": - """Converts a mapped record dictionary to a Record object for use by the add or update methods. - Args: - dataset: The dataset object to which the record belongs. - data: A dictionary representing the record. - mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. - user_id: The user id to associate with the record responses. - Returns: - A Record object. - - """ - - mapping = self._render_record_mapping(data=data, mapping=mapping) - id_mapping = mapping.get("id", {}) - suggestion_mapping = mapping.get("suggestion", {}) - response_mapping = mapping.get("response", {}) - field_mapping = mapping.get("field", {}) - metadata_mapping = mapping.get("metadata", {}) - vector_mapping = mapping.get("vector", {}) - - if "id" in id_mapping: - record_id = data.get(id_mapping["id"]) - else: - record_id = None - - # Parse suggestions and responses into objects aligned with questions - suggestions = self._parse_suggestion_from_mapping(data=data, mapping=suggestion_mapping) - responses = self._parse_response_from_mapping(data=data, mapping=response_mapping, user_id=user_id) - - # Parse fields, metadata, and vectors into - fields = {attribute_name: data.get(source_key) for attribute_name, source_key in field_mapping.items()} - metadata = {attribute_name: data.get(source_key) for attribute_name, source_key in metadata_mapping.items()} - vectors = {attribute_name: data.get(source_key) for attribute_name, source_key in vector_mapping.items()} - - return Record( - id=record_id, - fields=fields, - vectors=vectors, - metadata=metadata, - suggestions=suggestions, - responses=responses, - _dataset=self.__dataset, - ) - - def _parse_suggestion_from_mapping( - self, data: Dict[str, Any], mapping: Dict[str, Tuple[Optional[str]]] - ) -> List[Suggestion]: - """Converts a mapped suggestion dictionary to a Suggestion object for use by the add or update methods. - Suggestions can be defined across multiple source values and mapped to single questions. - Args: - data: A dictionary representing the suggestion. - mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. - Returns: - A Suggestion object. - - """ - suggestion_values = defaultdict(dict) - - for (attribute_name, sub_attribute), source_key in mapping.items(): - value = data.get(source_key) - schema_item = self.__dataset.schema.get(attribute_name) - suggestion_values[attribute_name].update( - {sub_attribute: value, "question_name": attribute_name, "question_id": schema_item.id} - ) - - suggestions = [Suggestion(**suggestion_dict) for suggestion_dict in suggestion_values.values()] - - return suggestions - - def _parse_response_from_mapping( - self, data: Dict[str, Any], mapping: Dict[str, Tuple[Optional[str]]], user_id: UUID - ) -> List[Response]: - """Converts a mapped response dictionary to a Response object for use by the add or update methods. - Args: - data: A dictionary representing the response. - mapping: A dictionary mapping from source data keys/ columns to Argilla fields, questions, ids, etc. - user_id: The user id to associate with the record responses. - Returns: - A Response object. - - """ - responses = [] - - for attribute_name, source_key in mapping.items(): - response = Response( - value=data.get(source_key), - question_name=attribute_name, - user_id=user_id, - ) - responses.append(response) - - return responses From e2bfc885bcdbcda6c2021af1ea624ac7037a56a2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 Jul 2024 13:26:55 +0000 Subject: [PATCH 18/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/records/_mapping.py | 29 ++++++++++++------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index a3cf96a370..37ea396513 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -14,7 +14,7 @@ import warnings from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union from uuid import UUID from pydantic import BaseModel @@ -155,7 +155,6 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) for attribute_mapping in mapped_attributes: - # Split the attribute mapping into its parts based on the '.' delimiter and create an AttributeRoute object. attribute_mapping = attribute_mapping.split(".") attribute_name = attribute_mapping[0] @@ -169,7 +168,7 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s parameters=[AttributeParameter(parameter=parameter, source=source_key)], ) attribute_route = self._select_attribute_type(attribute=attribute_route, schema_item=schema_item) - + # Add the attribute route to the schematized map based on the attribute type. if attribute_route.name in schematized_map[attribute_route.type]: # Some attributes may be mapped to multiple source values, so we need to append the parameters. @@ -182,9 +181,9 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s return RecordAttributesMap(**schematized_map) def _select_attribute_type(self, attribute, schema_item: Optional[object] = None): - """Selects the attribute type based on the schema item and the attribute type. - This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. - If the attribute type is not provided, it will be inferred based on the schema item. + """Selects the attribute type based on the schema item and the attribute type. + This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. + If the attribute type is not provided, it will be inferred based on the schema item. """ if isinstance(schema_item, QuestionPropertyBase) and ( attribute.type is None or attribute.type == AttributeType.SUGGESTION @@ -206,20 +205,20 @@ def _select_attribute_type(self, attribute, schema_item: Optional[object] = None return attribute def _schematize_default_attributes(self, mapping: RecordAttributesMap) -> RecordAttributesMap: - """ Updates the mapping with default attributes that are not provided in the mapping. - Uses the schema of the dataset to infer the default attributes and add them to the mapping. - - Parameters: - mapping: The mapping object to update with default attributes. - - Returns: - RecordAttributesMap: The updated mapping object. + """Updates the mapping with default attributes that are not provided in the mapping. + Uses the schema of the dataset to infer the default attributes and add them to the mapping. + + Parameters: + mapping: The mapping object to update with default attributes. + + Returns: + RecordAttributesMap: The updated mapping object. """ if len(mapping.id) == 0: # If the id is not provided in the mapping, we will map the 'id' key to the 'id' attribute. mapping.id["id"] = AttributeRoute(source="id", name="id", type=AttributeType.ID) - + # Map keys that match question names to the suggestion attribute type. for question in self._dataset.settings.questions: if question.name not in mapping.suggestion: From 8889c0abf3c66d6a1e4c34c3c6326aa3182e438e Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 2 Jul 2024 17:23:47 +0200 Subject: [PATCH 19/32] chore: tidy imports --- argilla/src/argilla/records/_mapping.py | 29 ++++++++++++------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index a3cf96a370..37ea396513 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -14,7 +14,7 @@ import warnings from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union from uuid import UUID from pydantic import BaseModel @@ -155,7 +155,6 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) for attribute_mapping in mapped_attributes: - # Split the attribute mapping into its parts based on the '.' delimiter and create an AttributeRoute object. attribute_mapping = attribute_mapping.split(".") attribute_name = attribute_mapping[0] @@ -169,7 +168,7 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s parameters=[AttributeParameter(parameter=parameter, source=source_key)], ) attribute_route = self._select_attribute_type(attribute=attribute_route, schema_item=schema_item) - + # Add the attribute route to the schematized map based on the attribute type. if attribute_route.name in schematized_map[attribute_route.type]: # Some attributes may be mapped to multiple source values, so we need to append the parameters. @@ -182,9 +181,9 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s return RecordAttributesMap(**schematized_map) def _select_attribute_type(self, attribute, schema_item: Optional[object] = None): - """Selects the attribute type based on the schema item and the attribute type. - This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. - If the attribute type is not provided, it will be inferred based on the schema item. + """Selects the attribute type based on the schema item and the attribute type. + This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. + If the attribute type is not provided, it will be inferred based on the schema item. """ if isinstance(schema_item, QuestionPropertyBase) and ( attribute.type is None or attribute.type == AttributeType.SUGGESTION @@ -206,20 +205,20 @@ def _select_attribute_type(self, attribute, schema_item: Optional[object] = None return attribute def _schematize_default_attributes(self, mapping: RecordAttributesMap) -> RecordAttributesMap: - """ Updates the mapping with default attributes that are not provided in the mapping. - Uses the schema of the dataset to infer the default attributes and add them to the mapping. - - Parameters: - mapping: The mapping object to update with default attributes. - - Returns: - RecordAttributesMap: The updated mapping object. + """Updates the mapping with default attributes that are not provided in the mapping. + Uses the schema of the dataset to infer the default attributes and add them to the mapping. + + Parameters: + mapping: The mapping object to update with default attributes. + + Returns: + RecordAttributesMap: The updated mapping object. """ if len(mapping.id) == 0: # If the id is not provided in the mapping, we will map the 'id' key to the 'id' attribute. mapping.id["id"] = AttributeRoute(source="id", name="id", type=AttributeType.ID) - + # Map keys that match question names to the suggestion attribute type. for question in self._dataset.settings.questions: if question.name not in mapping.suggestion: From 63e0f7bfcf230c164247cd6da7be9b6e7ca44cd2 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 2 Jul 2024 20:03:05 +0200 Subject: [PATCH 20/32] docs: update mapping parameters in how to guides --- argilla/docs/how_to_guides/record.md | 35 +++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/argilla/docs/how_to_guides/record.md b/argilla/docs/how_to_guides/record.md index 95590c9724..58f0aae7fe 100644 --- a/argilla/docs/how_to_guides/record.md +++ b/argilla/docs/how_to_guides/record.md @@ -318,24 +318,31 @@ Suggestions refer to suggested responses (e.g. model predictions) that you can a You can add suggestions as a dictionary, where the keys correspond to the `name`s of the labels that were configured for your dataset. Remember that you can also use the `mapping` parameter to specify the data structure. ```python - # Add records to the dataset with the label 'my_label' + # Add records to the dataset with the label 'my_label' data = [ { "question": "Do you need oxygen to breathe?", "answer": "Yes", - "my_label.suggestion": "positive", - "my_label.suggestion.score": 0.9, - "my_label.suggestion.agent": "model_name" + "label": "positive", + "score": 0.9, + "agent": "model_name", }, { "question": "What is the boiling point of water?", "answer": "100 degrees Celsius", - "my_label.suggestion": "negative", - "my_label.suggestion.score": 0.9, - "my_label.suggestion.agent": "model_name" + "label": "negative", + "score": 0.9, + "agent": "model_name", }, ] - dataset.records.log(data) + dataset.records.log( + data=data, + mapping={ + "label": "my_label", + "score": "my_label.suggestion.score", + "agent": "my_label.suggestion.agent", + }, + ) ``` ### Responses @@ -385,15 +392,15 @@ If your dataset includes some annotations, you can add those to the records as y { "question": "Do you need oxygen to breathe?", "answer": "Yes", - "my_label.response": "positive", + "label": "positive", }, { "question": "What is the boiling point of water?", "answer": "100 degrees Celsius", - "my_label.response": "negative", + "label": "negative", }, ] - dataset.records.log(data, user_id=user.id) + dataset.records.log(data, user_id=user.id, mapping={"label": "my_label.response"}) ``` ## List records @@ -415,7 +422,7 @@ for record in dataset.records( # Access the responses of the record for response in record.responses: - print(record.[""].value) + print(record[""].value) ``` ## Update records @@ -460,8 +467,8 @@ dataset.records.log(records=updated_data) for record in dataset.records(): - record.vectors["new_vector"] = [...] - record.vector["v"] = [...] + record.vectors["new_vector"] = [ 0, 1, 2, 3, 4, 5 ] + record.vector["v"] = [ 0.1, 0.2, 0.3 ] updated_records.append(record) From ecbdd4ef752da3be9ff3c32c4148d6ff37eadd31 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Tue, 2 Jul 2024 20:21:56 +0200 Subject: [PATCH 21/32] test: broaden suggestion mapping in test --- argilla/tests/unit/test_record_ingestion.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/argilla/tests/unit/test_record_ingestion.py b/argilla/tests/unit/test_record_ingestion.py index 8ac8ab9968..d1501b95ff 100644 --- a/argilla/tests/unit/test_record_ingestion.py +++ b/argilla/tests/unit/test_record_ingestion.py @@ -53,6 +53,7 @@ def test_ingest_record_from_dict(dataset): def test_ingest_record_from_dict_with_mapped_suggestions(dataset): mock_mapping = { "my_prompt": "prompt", + "my_label": "label.suggestion.value", "score": "label.suggestion.score", "model": "label.suggestion.agent", } @@ -60,7 +61,7 @@ def test_ingest_record_from_dict_with_mapped_suggestions(dataset): records=[ { "my_prompt": "What is the capital of France?", - "label": "positive", + "my_label": "positive", "score": 0.9, "model": "model_name", } From 99235b2f77f6528ea102192f81e962f73dbedfea Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 3 Jul 2024 09:03:58 +0200 Subject: [PATCH 22/32] feat: extract dot notation with regex not string splitting --- argilla/src/argilla/records/_mapping.py | 43 +++++++++++++++++++------ 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index 37ea396513..d10b852b72 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re import warnings from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union, Tuple from uuid import UUID from pydantic import BaseModel @@ -155,19 +156,17 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) for attribute_mapping in mapped_attributes: - # Split the attribute mapping into its parts based on the '.' delimiter and create an AttributeRoute object. - attribute_mapping = attribute_mapping.split(".") - attribute_name = attribute_mapping[0] - schema_item = self._schema.get(attribute_name) - type_ = AttributeType(attribute_mapping[1]) if len(attribute_mapping) > 1 else None - parameter = ParameterType(attribute_mapping[2]) if len(attribute_mapping) > 2 else ParameterType.VALUE + attribute_name, type_, parameter = self._parse_dot_notation(attribute_mapping) + + type_ = AttributeType(type_) if type_ else None + parameter = ParameterType(parameter) if parameter else ParameterType.VALUE attribute_route = AttributeRoute( source=source_key, name=attribute_name, type=type_, parameters=[AttributeParameter(parameter=parameter, source=source_key)], ) - attribute_route = self._select_attribute_type(attribute=attribute_route, schema_item=schema_item) + attribute_route = self._select_attribute_type(attribute=attribute_route) # Add the attribute route to the schematized map based on the attribute type. if attribute_route.name in schematized_map[attribute_route.type]: @@ -180,11 +179,37 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s return RecordAttributesMap(**schematized_map) - def _select_attribute_type(self, attribute, schema_item: Optional[object] = None): + def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str], Optional[str]]: + """Parses a string in the format of 'attribute.type.parameter' into its parts using regex.""" + + # Get the available attributes, types, and parameters from the schema. + available_attributes = list(self._schema.keys()) + ["id"] + available_types = [type_.value for type_ in AttributeType] + available_parameters = [param.value for param in ParameterType] + + # The pattern is in the format of 'attribute[.type[.parameter]]' where type and parameter are optional. + pattern = re.compile( + rf"^({'|'.join(available_attributes)})" + rf"(?:\.({'|'.join(available_types)}))?" + rf"(?:\.({'|'.join(available_parameters)}))?$" + ) + + match = pattern.match(attribute_mapping) + if not match: + raise ValueError( + f"Invalid attribute mapping format: {attribute_mapping}. " + "Attribute mapping must be in the format of 'attribute[.type[.parameter]]'." + f"Available attributes: {available_attributes}, types: {available_types}, parameters: {available_parameters}." + ) + attribute_name, type_, parameter = match.groups() + return attribute_name, type_, parameter + + def _select_attribute_type(self, attribute: AttributeRoute) -> AttributeRoute: """Selects the attribute type based on the schema item and the attribute type. This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. If the attribute type is not provided, it will be inferred based on the schema item. """ + schema_item = self._schema.get(attribute.name) if isinstance(schema_item, QuestionPropertyBase) and ( attribute.type is None or attribute.type == AttributeType.SUGGESTION ): From 3ca8932355ed5d7499398817ec666d8c6d28012c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jul 2024 07:04:23 +0000 Subject: [PATCH 23/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/records/_mapping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index d10b852b72..545f6d148b 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -157,7 +157,7 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s mapped_attributes = [value] if isinstance(value, str) else list(value) for attribute_mapping in mapped_attributes: attribute_name, type_, parameter = self._parse_dot_notation(attribute_mapping) - + type_ = AttributeType(type_) if type_ else None parameter = ParameterType(parameter) if parameter else ParameterType.VALUE attribute_route = AttributeRoute( @@ -181,7 +181,7 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str], Optional[str]]: """Parses a string in the format of 'attribute.type.parameter' into its parts using regex.""" - + # Get the available attributes, types, and parameters from the schema. available_attributes = list(self._schema.keys()) + ["id"] available_types = [type_.value for type_ in AttributeType] From 3eb8c0d3045cd184d7efc97f457501f54eafcaf9 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 3 Jul 2024 13:43:37 +0200 Subject: [PATCH 24/32] docs: typo in docs --- argilla/docs/how_to_guides/record.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argilla/docs/how_to_guides/record.md b/argilla/docs/how_to_guides/record.md index 58f0aae7fe..6b80fd3dda 100644 --- a/argilla/docs/how_to_guides/record.md +++ b/argilla/docs/how_to_guides/record.md @@ -318,7 +318,7 @@ Suggestions refer to suggested responses (e.g. model predictions) that you can a You can add suggestions as a dictionary, where the keys correspond to the `name`s of the labels that were configured for your dataset. Remember that you can also use the `mapping` parameter to specify the data structure. ```python - # Add records to the dataset with the label 'my_label' + # Add records to the dataset with the label question 'my_label' data = [ { "question": "Do you need oxygen to breathe?", From b10fbe854651837784d3a1d17feab39c68528229 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 3 Jul 2024 13:43:56 +0200 Subject: [PATCH 25/32] feat: improve record switch in ingest method --- argilla/src/argilla/records/_dataset_records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/argilla/src/argilla/records/_dataset_records.py b/argilla/src/argilla/records/_dataset_records.py index 1359057179..4f53a2672f 100644 --- a/argilla/src/argilla/records/_dataset_records.py +++ b/argilla/src/argilla/records/_dataset_records.py @@ -373,7 +373,7 @@ def _ingest_records( record_mapper = IngestedRecordMapper(mapping=mapping, dataset=self.__dataset, user_id=user_id) for record in records: try: - if not isinstance(record, Record): + if isinstance(record, dict): record = record_mapper(data=record) elif isinstance(record, Record): record.dataset = self.__dataset From db27e1b57511ab56449bad4647306197b6fc8dac Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 3 Jul 2024 13:44:18 +0200 Subject: [PATCH 26/32] feat: refactor id mapping away from dict to type --- argilla/src/argilla/records/_mapping.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index 545f6d148b..b973364645 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -82,8 +82,8 @@ class RecordAttributesMap(BaseModel): field: Dict[str, AttributeRoute] metadata: Dict[str, AttributeRoute] vector: Dict[str, AttributeRoute] - id: Dict[str, AttributeRoute] + id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) class IngestedRecordMapper: """IngestedRecordMapper is a class that is used to map data into a record object. @@ -122,7 +122,7 @@ def __call__(self, data: Dict[str, Any], user_id: Optional[UUID] = None) -> Reco """ - record_id = data.get(self.mapping.id["id"].source) + record_id = data.get(self.mapping.id.source) suggestions = self._map_suggestions(data=data, mapping=self.mapping.suggestion) responses = self._map_responses(data=data, user_id=user_id or self.user_id, mapping=self.mapping.response) fields = self._map_attributes(data=data, mapping=self.mapping.field) @@ -151,7 +151,6 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s "field": {}, "metadata": {}, "vector": {}, - "id": {}, } for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) @@ -169,7 +168,9 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s attribute_route = self._select_attribute_type(attribute=attribute_route) # Add the attribute route to the schematized map based on the attribute type. - if attribute_route.name in schematized_map[attribute_route.type]: + if attribute_route.type is AttributeType.ID: + schematized_map["id"] = attribute_route + elif attribute_route.name in schematized_map[attribute_route.type]: # Some attributes may be mapped to multiple source values, so we need to append the parameters. schematized_map[attribute_route.type][attribute_route.name].parameters.extend( attribute_route.parameters @@ -240,10 +241,6 @@ def _schematize_default_attributes(self, mapping: RecordAttributesMap) -> Record RecordAttributesMap: The updated mapping object. """ - if len(mapping.id) == 0: - # If the id is not provided in the mapping, we will map the 'id' key to the 'id' attribute. - mapping.id["id"] = AttributeRoute(source="id", name="id", type=AttributeType.ID) - # Map keys that match question names to the suggestion attribute type. for question in self._dataset.settings.questions: if question.name not in mapping.suggestion: From 716dfa86b12644ff8092235cc82d9e5c015be582 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:45:16 +0000 Subject: [PATCH 27/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/records/_mapping.py | 1 + 1 file changed, 1 insertion(+) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index b973364645..d9966df2e5 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -85,6 +85,7 @@ class RecordAttributesMap(BaseModel): id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) + class IngestedRecordMapper: """IngestedRecordMapper is a class that is used to map data into a record object. It maps values in ingested data to the appropriate record attributes, based on the user provided mapping and the schema of the dataset. From 6a7e9eb5e2f100e5c40a0c663ad7c66337b50e16 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Wed, 3 Jul 2024 14:01:22 +0200 Subject: [PATCH 28/32] feat: use class methods for type and parameter values --- argilla/src/argilla/records/_mapping.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index b973364645..51182cfa7b 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -39,6 +39,10 @@ class ParameterType(str, Enum): SCORE = "score" AGENT = "agent" + @classmethod + def values(cls) -> List[str]: + return [param.value for param in cls] + class AttributeType(str, Enum): """Attribute types are the different types of attributes a record can have.""" @@ -50,6 +54,10 @@ class AttributeType(str, Enum): VECTOR = "vector" ID = "id" + @classmethod + def values(cls) -> List[str]: + return [attr.value for attr in cls] + class AttributeParameter(BaseModel): """Attribute parameters are the different 'sub' values of a records attribute. @@ -85,6 +93,7 @@ class RecordAttributesMap(BaseModel): id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) + class IngestedRecordMapper: """IngestedRecordMapper is a class that is used to map data into a record object. It maps values in ingested data to the appropriate record attributes, based on the user provided mapping and the schema of the dataset. @@ -183,10 +192,9 @@ def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[s def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str], Optional[str]]: """Parses a string in the format of 'attribute.type.parameter' into its parts using regex.""" - # Get the available attributes, types, and parameters from the schema. available_attributes = list(self._schema.keys()) + ["id"] - available_types = [type_.value for type_ in AttributeType] - available_parameters = [param.value for param in ParameterType] + available_parameters = ParameterType.values() + available_types = AttributeType.values() # The pattern is in the format of 'attribute[.type[.parameter]]' where type and parameter are optional. pattern = re.compile( From 40ff5b604665c7329c41d95f72880eca16f1c910 Mon Sep 17 00:00:00 2001 From: Paco Aranda Date: Thu, 4 Jul 2024 11:32:06 +0200 Subject: [PATCH 29/32] [REFACTOR] generate default mapping and extends it with custom mapping dict (#5151) Closes # **Type of change** - Refactor (change restructuring the codebase without changing functionality) **How Has This Been Tested** **Checklist** - I added relevant documentation - follows the style guidelines of this project - I did a self-review of my code - I made corresponding changes to the documentation - I confirm My changes generate no new warnings - I have added tests that prove my fix is effective or that my feature works - I have added relevant notes to the CHANGELOG.md file (See https://keepachangelog.com/) --- argilla/pdm.lock | 35 ++--- argilla/src/argilla/records/_mapping.py | 175 +++++++++++++----------- 2 files changed, 111 insertions(+), 99 deletions(-) diff --git a/argilla/pdm.lock b/argilla/pdm.lock index 0f22c0b9c9..afe7c98dd6 100644 --- a/argilla/pdm.lock +++ b/argilla/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:a941796b5f600d43fa2ce12600fb3737618867160ac4ea70f0facb61f3b1ed0c" +content_hash = "sha256:3d833bbb6c0275eec4ecadc7eb398544f9e57b3fea2023512712d86a66d882fb" [[package]] name = "aiohttp" @@ -118,7 +118,7 @@ version = "1.29.0a0" requires_python = "<3.13,>=3.8" path = "../argilla-v1" summary = "Open-source tool for exploring, labeling, and monitoring data for NLP projects." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "backoff", "deprecated~=1.2.0", @@ -141,7 +141,7 @@ extras = ["listeners"] requires_python = "<3.13,>=3.8" path = "../argilla-v1" summary = "Open-source tool for exploring, labeling, and monitoring data for NLP projects." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "argilla-v1 @ file:///${PROJECT_ROOT}/../argilla-v1", "schedule~=1.1.0", @@ -199,7 +199,7 @@ name = "backoff" version = "2.2.1" requires_python = ">=3.7,<4.0" summary = "Function decoration for backoff and retry" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, @@ -445,7 +445,7 @@ name = "click" version = "8.1.7" requires_python = ">=3.7" summary = "Composable command line interface toolkit" -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "colorama; platform_system == \"Windows\"", ] @@ -535,7 +535,7 @@ name = "deprecated" version = "1.2.14" requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" summary = "Python @deprecated decorator to deprecate old python classes, functions or methods." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "wrapt<2,>=1.10", ] @@ -1388,7 +1388,7 @@ files = [ name = "monotonic" version = "1.6" summary = "An implementation of time.monotonic() for Python 2 & < 3.3" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "monotonic-1.6-py2.py3-none-any.whl", hash = "sha256:68687e19a14f11f26d140dd5c86f3dba4bf5df58003000ed467e0e2a69bca96c"}, {file = "monotonic-1.6.tar.gz", hash = "sha256:3a55207bcfed53ddd5c5bae174524062935efed17792e9de2ad0205ce9ad63f7"}, @@ -1562,7 +1562,7 @@ name = "numpy" version = "1.26.4" requires_python = ">=3.9" summary = "Fundamental package for array computing in Python" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "numpy-1.26.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:9ff0f4f29c51e2803569d7a51c2304de5554655a60c5d776e35b4a41413830d0"}, {file = "numpy-1.26.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2e4ee3380d6de9c9ec04745830fd9e2eccb3e6cf790d39d7b98ffd19b0dd754a"}, @@ -1599,7 +1599,7 @@ name = "packaging" version = "24.1" requires_python = ">=3.8" summary = "Core utilities for Python packages" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, @@ -1619,10 +1619,11 @@ name = "pandas" version = "2.2.2" requires_python = ">=3.9" summary = "Powerful data structures for data analysis, time series, and statistics" -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "numpy>=1.22.4; python_version < \"3.11\"", "numpy>=1.23.2; python_version == \"3.11\"", + "numpy>=1.26.0; python_version >= \"3.12\"", "python-dateutil>=2.8.2", "pytz>=2020.1", "tzdata>=2022.7", @@ -2092,7 +2093,7 @@ name = "python-dateutil" version = "2.9.0.post0" requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" summary = "Extensions to the standard Python datetime module" -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "six>=1.5", ] @@ -2105,7 +2106,7 @@ files = [ name = "pytz" version = "2024.1" summary = "World timezone definitions, modern and historical" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "pytz-2024.1-py2.py3-none-any.whl", hash = "sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319"}, {file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"}, @@ -2461,7 +2462,7 @@ name = "schedule" version = "1.1.0" requires_python = ">=3.6" summary = "Job scheduling for humans." -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "schedule-1.1.0-py2.py3-none-any.whl", hash = "sha256:617adce8b4bf38c360b781297d59918fbebfb2878f1671d189f4f4af5d0567a4"}, {file = "schedule-1.1.0.tar.gz", hash = "sha256:e6ca13585e62c810e13a08682e0a6a8ad245372e376ba2b8679294f377dfc8e4"}, @@ -2472,7 +2473,7 @@ name = "six" version = "1.16.0" requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" summary = "Python 2 and 3 compatibility utilities" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, @@ -2602,7 +2603,7 @@ name = "typer" version = "0.9.4" requires_python = ">=3.6" summary = "Typer, build great CLIs. Easy to code. Based on Python type hints." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "click<9.0.0,>=7.1.1", "typing-extensions>=3.7.4.3", @@ -2628,7 +2629,7 @@ name = "tzdata" version = "2024.1" requires_python = ">=2" summary = "Provider of IANA time zone data" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "tzdata-2024.1-py2.py3-none-any.whl", hash = "sha256:9068bc196136463f5245e51efda838afa15aaeca9903f49050dfa2679db4d252"}, {file = "tzdata-2024.1.tar.gz", hash = "sha256:2674120f8d891909751c38abcdfd386ac0a5a1127954fbc332af6b5ceae07efd"}, @@ -2731,7 +2732,7 @@ name = "wrapt" version = "1.14.1" requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" summary = "Module for decorators, wrappers and monkey patching." -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "wrapt-1.14.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:80bb5c256f1415f747011dc3604b59bc1f91c6e7150bd7db03b19170ee06b320"}, {file = "wrapt-1.14.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:07f7a7d0f388028b2df1d916e94bbb40624c59b48ecc6cbc232546706fac74c2"}, diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping.py index 51182cfa7b..1ffc58d540 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union, Tuple from uuid import UUID -from pydantic import BaseModel +from pydantic import BaseModel, Field from argilla.records._resource import Record from argilla.responses import Response @@ -64,7 +64,7 @@ class AttributeParameter(BaseModel): And the source in the data that the parameter is coming from. """ - parameter: ParameterType = ParameterType.VALUE + parameter_type: ParameterType = ParameterType.VALUE source: str @@ -76,23 +76,51 @@ class AttributeRoute(BaseModel): type: Optional[AttributeType] = None parameters: List[AttributeParameter] = [] - @property - def has_value(self) -> bool: - """All attributes must have a value parameter to be valid.""" - return any(param.parameter == ParameterType.VALUE for param in self.parameters) + def set_parameter(self, parameter: AttributeParameter): + """Set a parameter for the route. + An existing parameter with same parameter type will be replaced by this new one. + """ + for p in self.parameters: + if p.parameter_type == parameter.parameter_type: + self.parameters.remove(p) + break + self.parameters.append(parameter) class RecordAttributesMap(BaseModel): """RecordAttributesMap is a representation of a record attribute mapping that is used to parse data into a record.""" - suggestion: Dict[str, AttributeRoute] - response: Dict[str, AttributeRoute] - field: Dict[str, AttributeRoute] - metadata: Dict[str, AttributeRoute] - vector: Dict[str, AttributeRoute] + suggestion: Dict[str, AttributeRoute] = Field(default_factory=dict) + response: Dict[str, AttributeRoute] = Field(default_factory=dict) + field: Dict[str, AttributeRoute] = Field(default_factory=dict) + metadata: Dict[str, AttributeRoute] = Field(default_factory=dict) + vector: Dict[str, AttributeRoute] = Field(default_factory=dict) id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) + def _get_routes_group_by_type(self, type: AttributeType): + return { + AttributeType.SUGGESTION: self.suggestion, + AttributeType.RESPONSE: self.response, + AttributeType.FIELD: self.field, + AttributeType.METADATA: self.metadata, + AttributeType.VECTOR: self.vector, + AttributeType.ID: self.id, + }[type] + + def get_by_name_and_type(self, name: str, type: AttributeType) -> Optional[AttributeRoute]: + """Get a route by name and type""" + if name == "id" and AttributeType.ID: + return self.id + return self._get_routes_group_by_type(type).get(name) + + def add_route(self, attribute_route: AttributeRoute) -> None: + """Ad a new mapping route""" + if attribute_route.type == AttributeType.ID: + self.id = attribute_route + else: + self._get_routes_group_by_type(attribute_route.type)[attribute_route.name] = attribute_route + class IngestedRecordMapper: """IngestedRecordMapper is a class that is used to map data into a record object. @@ -114,10 +142,8 @@ def __init__( self._schema = dataset.schema self.user_id = user_id - mapping = mapping or {} - _mapping = self._schematize_mapped_attributes(mapping=mapping) - _mapping = self._schematize_default_attributes(mapping=_mapping) - self.mapping: RecordAttributesMap = _mapping + default_mapping = self._schematize_default_attributes() + self.mapping = self._schematize_mapped_attributes(mapping=mapping or {}, default_mapping=default_mapping) def __call__(self, data: Dict[str, Any], user_id: Optional[UUID] = None) -> Record: """Maps a dictionary of data to a record object. @@ -152,42 +178,36 @@ def __call__(self, data: Dict[str, Any], user_id: Optional[UUID] = None) -> Reco # Private helper functions - Build Mapping ########################################## - def _schematize_mapped_attributes(self, mapping: Dict[str, Union[str, Sequence[str]]]) -> RecordAttributesMap: - """Converts a mapping dictionary to a schematized mapping object.""" - schematized_map = { - "suggestion": {}, - "response": {}, - "field": {}, - "metadata": {}, - "vector": {}, - } + def _schematize_mapped_attributes( + self, + mapping: Dict[str, Union[str, Sequence[str]]], + default_mapping: RecordAttributesMap, + ) -> RecordAttributesMap: + """Extends the default mapping with a schematized mapping object provided from a dict""" + for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) for attribute_mapping in mapped_attributes: - attribute_name, type_, parameter = self._parse_dot_notation(attribute_mapping) - - type_ = AttributeType(type_) if type_ else None - parameter = ParameterType(parameter) if parameter else ParameterType.VALUE - attribute_route = AttributeRoute( - source=source_key, - name=attribute_name, - type=type_, - parameters=[AttributeParameter(parameter=parameter, source=source_key)], - ) - attribute_route = self._select_attribute_type(attribute=attribute_route) - - # Add the attribute route to the schematized map based on the attribute type. - if attribute_route.type is AttributeType.ID: - schematized_map["id"] = attribute_route - elif attribute_route.name in schematized_map[attribute_route.type]: - # Some attributes may be mapped to multiple source values, so we need to append the parameters. - schematized_map[attribute_route.type][attribute_route.name].parameters.extend( - attribute_route.parameters - ) + attribute_name, attr_type, parameter = self._parse_dot_notation(attribute_mapping) + + attr_type = AttributeType(attr_type or AttributeType.SUGGESTION) + parameter = AttributeParameter(parameter_type=parameter or ParameterType.VALUE, source=source_key) + + attribute_route = default_mapping.get_by_name_and_type(name=attribute_name, type=attr_type) + if attribute_route: + attribute_route.source = source_key + attribute_route.set_parameter(parameter) else: - schematized_map[attribute_route.type][attribute_route.name] = attribute_route + attribute_route = AttributeRoute( + name=attribute_name, + source=source_key, + type=attr_type, + parameters=[parameter], + ) + attribute_route = self._select_attribute_type(attribute=attribute_route) + default_mapping.add_route(attribute_route) - return RecordAttributesMap(**schematized_map) + return default_mapping def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str], Optional[str]]: """Parses a string in the format of 'attribute.type.parameter' into its parts using regex.""" @@ -238,53 +258,44 @@ def _select_attribute_type(self, attribute: AttributeRoute) -> AttributeRoute: warnings.warn(message=f"Record attribute {attribute.name} is not in the schema or mapping so skipping.") return attribute - def _schematize_default_attributes(self, mapping: RecordAttributesMap) -> RecordAttributesMap: - """Updates the mapping with default attributes that are not provided in the mapping. - Uses the schema of the dataset to infer the default attributes and add them to the mapping. - - Parameters: - mapping: The mapping object to update with default attributes. + def _schematize_default_attributes(self) -> RecordAttributesMap: + """Creates the mapping with default attributes. Uses the schema of the dataset to infer + the default attributes and add them to the mapping. Returns: - RecordAttributesMap: The updated mapping object. + RecordAttributesMap: The mapping object. """ + mapping = RecordAttributesMap() # Map keys that match question names to the suggestion attribute type. for question in self._dataset.settings.questions: - if question.name not in mapping.suggestion: - mapping.suggestion[question.name] = AttributeRoute( - source=question.name, - name=question.name, - type=AttributeType.SUGGESTION, - parameters=[AttributeParameter(source=question.name)], - ) - - elif not mapping.suggestion[question.name].has_value: - mapping.suggestion[question.name].parameters.append(AttributeParameter(source=question.name)) + mapping.suggestion[question.name] = AttributeRoute( + source=question.name, + name=question.name, + type=AttributeType.SUGGESTION, + parameters=[AttributeParameter(source=question.name)], + ) for field in self._dataset.settings.fields: - if field.name not in mapping.field: - mapping.field[field.name] = AttributeRoute( - source=field.name, - name=field.name, - type=AttributeType.FIELD, - ) + mapping.field[field.name] = AttributeRoute( + source=field.name, + name=field.name, + type=AttributeType.FIELD, + ) for metadata in self._dataset.settings.metadata: - if metadata.name not in mapping.metadata: - mapping.metadata[metadata.name] = AttributeRoute( - source=metadata.name, - name=metadata.name, - type=AttributeType.METADATA, - ) + mapping.metadata[metadata.name] = AttributeRoute( + source=metadata.name, + name=metadata.name, + type=AttributeType.METADATA, + ) for vector in self._dataset.settings.vectors: - if vector.name not in mapping.vector: - mapping.vector[vector.name] = AttributeRoute( - source=vector.name, - name=vector.name, - type=AttributeType.VECTOR, - ) + mapping.vector[vector.name] = AttributeRoute( + source=vector.name, + name=vector.name, + type=AttributeType.VECTOR, + ) return mapping @@ -309,7 +320,7 @@ def _map_suggestions(self, data: Dict[str, Any], mapping) -> List[Suggestion]: for name, route in mapping.items(): if route.source not in data: continue - parameters = {param.parameter: data.get(param.source) for param in route.parameters} + parameters = {param.parameter_type: data.get(param.source) for param in route.parameters} schema_item = self._dataset.schema.get(name) suggestion = Suggestion( **parameters, From 667ad546d64281562f2e889a690a24c587017197 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 4 Jul 2024 11:51:55 +0200 Subject: [PATCH 30/32] refactor: migrate mapper into module from file --- .../src/argilla/records/_mapping/__init__.py | 15 ++ .../{_mapping.py => _mapping/_mapper.py} | 180 ++++++------------ .../src/argilla/records/_mapping/_routes.py | 113 +++++++++++ 3 files changed, 181 insertions(+), 127 deletions(-) create mode 100644 argilla/src/argilla/records/_mapping/__init__.py rename argilla/src/argilla/records/{_mapping.py => _mapping/_mapper.py} (64%) create mode 100644 argilla/src/argilla/records/_mapping/_routes.py diff --git a/argilla/src/argilla/records/_mapping/__init__.py b/argilla/src/argilla/records/_mapping/__init__.py new file mode 100644 index 0000000000..9165f5c2ef --- /dev/null +++ b/argilla/src/argilla/records/_mapping/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from argilla.records._mapping._mapper import IngestedRecordMapper # noqa: F401 diff --git a/argilla/src/argilla/records/_mapping.py b/argilla/src/argilla/records/_mapping/_mapper.py similarity index 64% rename from argilla/src/argilla/records/_mapping.py rename to argilla/src/argilla/records/_mapping/_mapper.py index 1ffc58d540..fcf2e59905 100644 --- a/argilla/src/argilla/records/_mapping.py +++ b/argilla/src/argilla/records/_mapping/_mapper.py @@ -14,117 +14,33 @@ import re import warnings -from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union, Tuple from uuid import UUID -from pydantic import BaseModel, Field - from argilla.records._resource import Record from argilla.responses import Response from argilla.settings import TextField, VectorField from argilla.settings._metadata import MetadataPropertyBase from argilla.settings._question import QuestionPropertyBase from argilla.suggestions import Suggestion +from argilla.records._mapping._routes import ( + AttributeRoute, + RecordAttributesMap, + AttributeType, + ParameterType, + AttributeParameter, +) if TYPE_CHECKING: from argilla.datasets import Dataset -class ParameterType(str, Enum): - """Parameter types are the different 'sub' values of a records attribute. - For example, the value, score, or agent of a suggestion.""" - - VALUE = "value" - SCORE = "score" - AGENT = "agent" - - @classmethod - def values(cls) -> List[str]: - return [param.value for param in cls] - - -class AttributeType(str, Enum): - """Attribute types are the different types of attributes a record can have.""" - - FIELD = "field" - SUGGESTION = "suggestion" - RESPONSE = "response" - METADATA = "metadata" - VECTOR = "vector" - ID = "id" - - @classmethod - def values(cls) -> List[str]: - return [attr.value for attr in cls] - - -class AttributeParameter(BaseModel): - """Attribute parameters are the different 'sub' values of a records attribute. - And the source in the data that the parameter is coming from. - """ - - parameter_type: ParameterType = ParameterType.VALUE - source: str - - -class AttributeRoute(BaseModel): - """AttributeRoute is a representation of a record attribute that is mapped to a source value in the data.""" - - source: str - name: str - type: Optional[AttributeType] = None - parameters: List[AttributeParameter] = [] - - def set_parameter(self, parameter: AttributeParameter): - """Set a parameter for the route. - An existing parameter with same parameter type will be replaced by this new one. - """ - for p in self.parameters: - if p.parameter_type == parameter.parameter_type: - self.parameters.remove(p) - break - self.parameters.append(parameter) - - -class RecordAttributesMap(BaseModel): - """RecordAttributesMap is a representation of a record attribute mapping that is used to parse data into a record.""" - - suggestion: Dict[str, AttributeRoute] = Field(default_factory=dict) - response: Dict[str, AttributeRoute] = Field(default_factory=dict) - field: Dict[str, AttributeRoute] = Field(default_factory=dict) - metadata: Dict[str, AttributeRoute] = Field(default_factory=dict) - vector: Dict[str, AttributeRoute] = Field(default_factory=dict) - - id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) - - def _get_routes_group_by_type(self, type: AttributeType): - return { - AttributeType.SUGGESTION: self.suggestion, - AttributeType.RESPONSE: self.response, - AttributeType.FIELD: self.field, - AttributeType.METADATA: self.metadata, - AttributeType.VECTOR: self.vector, - AttributeType.ID: self.id, - }[type] - - def get_by_name_and_type(self, name: str, type: AttributeType) -> Optional[AttributeRoute]: - """Get a route by name and type""" - if name == "id" and AttributeType.ID: - return self.id - return self._get_routes_group_by_type(type).get(name) - - def add_route(self, attribute_route: AttributeRoute) -> None: - """Ad a new mapping route""" - if attribute_route.type == AttributeType.ID: - self.id = attribute_route - else: - self._get_routes_group_by_type(attribute_route.type)[attribute_route.name] = attribute_route - - class IngestedRecordMapper: """IngestedRecordMapper is a class that is used to map data into a record object. - It maps values in ingested data to the appropriate record attributes, based on the user provided mapping and the schema of the dataset. + It maps values in ingested data to the appropriate record attributes, based on the user + provided mapping and the schema of the dataset. + + The Mapper builds and uses a `RecordAttributesMap` object to map the data to the appropriate record attributes. Attributes: dataset: The dataset the record will be added to. @@ -132,6 +48,8 @@ class IngestedRecordMapper: user_id: The user id to associate with the record responses. """ + mapping: RecordAttributesMap = None + def __init__( self, dataset: "Dataset", @@ -187,30 +105,30 @@ def _schematize_mapped_attributes( for source_key, value in mapping.items(): mapped_attributes = [value] if isinstance(value, str) else list(value) - for attribute_mapping in mapped_attributes: - attribute_name, attr_type, parameter = self._parse_dot_notation(attribute_mapping) + for attr_mapping in mapped_attributes: + attr_name, attr_type, parameter = self._parse_dot_notation(attr_mapping) attr_type = AttributeType(attr_type or AttributeType.SUGGESTION) parameter = AttributeParameter(parameter_type=parameter or ParameterType.VALUE, source=source_key) - attribute_route = default_mapping.get_by_name_and_type(name=attribute_name, type=attr_type) - if attribute_route: - attribute_route.source = source_key - attribute_route.set_parameter(parameter) + attr_route = default_mapping.get_by_name_and_type(name=attr_name, type=attr_type) + if attr_route: + attr_route.source = source_key + attr_route.set_parameter(parameter) else: - attribute_route = AttributeRoute( - name=attribute_name, + attr_route = AttributeRoute( + name=attr_name, source=source_key, type=attr_type, parameters=[parameter], ) - attribute_route = self._select_attribute_type(attribute=attribute_route) - default_mapping.add_route(attribute_route) + attr_route = self._select_attribute_type(attribute_route=attr_route) + default_mapping.add_route(attr_route) return default_mapping def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str], Optional[str]]: - """Parses a string in the format of 'attribute.type.parameter' into its parts using regex.""" + """Parses a string in the format of 'attribute.type.parameter' into its attribute parts parts using regex.""" available_attributes = list(self._schema.keys()) + ["id"] available_parameters = ParameterType.values() @@ -230,44 +148,44 @@ def _parse_dot_notation(self, attribute_mapping: str) -> Tuple[str, Optional[str "Attribute mapping must be in the format of 'attribute[.type[.parameter]]'." f"Available attributes: {available_attributes}, types: {available_types}, parameters: {available_parameters}." ) - attribute_name, type_, parameter = match.groups() - return attribute_name, type_, parameter + attr_name, type_, parameter = match.groups() + return attr_name, type_, parameter - def _select_attribute_type(self, attribute: AttributeRoute) -> AttributeRoute: + def _select_attribute_type(self, attribute_route: AttributeRoute) -> AttributeRoute: """Selects the attribute type based on the schema item and the attribute type. This method implements the logic to infer the attribute type based on the schema item if the attribute type is not provided. If the attribute type is not provided, it will be inferred based on the schema item. """ - schema_item = self._schema.get(attribute.name) + schema_item = self._schema.get(attribute_route.name) if isinstance(schema_item, QuestionPropertyBase) and ( - attribute.type is None or attribute.type == AttributeType.SUGGESTION + attribute_route.type is None or attribute_route.type == AttributeType.SUGGESTION ): # Suggestions are the default destination for questions. - attribute.type = AttributeType.SUGGESTION - elif isinstance(schema_item, QuestionPropertyBase) and attribute.type == AttributeType.RESPONSE: - attribute.type = AttributeType.RESPONSE + attribute_route.type = AttributeType.SUGGESTION + elif isinstance(schema_item, QuestionPropertyBase) and attribute_route.type == AttributeType.RESPONSE: + attribute_route.type = AttributeType.RESPONSE elif isinstance(schema_item, TextField): - attribute.type = AttributeType.FIELD + attribute_route.type = AttributeType.FIELD elif isinstance(schema_item, VectorField): - attribute.type = AttributeType.VECTOR + attribute_route.type = AttributeType.VECTOR elif isinstance(schema_item, MetadataPropertyBase): - attribute.type = AttributeType.METADATA - elif attribute.name == "id": - attribute.type = AttributeType.ID + attribute_route.type = AttributeType.METADATA + elif attribute_route.name == "id": + attribute_route.type = AttributeType.ID else: - warnings.warn(message=f"Record attribute {attribute.name} is not in the schema or mapping so skipping.") - return attribute + warnings.warn(message=f"Record attribute {attribute_route.name} is not in the schema or mapping so skipping.") + return attribute_route def _schematize_default_attributes(self) -> RecordAttributesMap: - """Creates the mapping with default attributes. Uses the schema of the dataset to infer - the default attributes and add them to the mapping. + """Creates the mapping with default attribute routes. Uses the schema of the dataset to determine + the default attributes and add them to the mapping with their names as keys. This means that + keys in the data that match the names of dataset attributes will be mapped to them by default. Returns: RecordAttributesMap: The mapping object. """ mapping = RecordAttributesMap() - # Map keys that match question names to the suggestion attribute type. for question in self._dataset.settings.questions: mapping.suggestion[question.name] = AttributeRoute( source=question.name, @@ -321,6 +239,8 @@ def _map_suggestions(self, data: Dict[str, Any], mapping) -> List[Suggestion]: if route.source not in data: continue parameters = {param.parameter_type: data.get(param.source) for param in route.parameters} + if parameters.get(ParameterType.VALUE) is None: + continue schema_item = self._dataset.schema.get(name) suggestion = Suggestion( **parameters, @@ -345,8 +265,11 @@ def _map_responses(self, data: Dict[str, Any], user_id: UUID, mapping) -> List[R responses = [] for name, route in mapping.items(): + value = data.get(route.source) + if value is None: + continue response = Response( - value=data.get(route.source), + value=value, question_name=name, user_id=user_id, ) @@ -357,10 +280,13 @@ def _map_responses(self, data: Dict[str, Any], user_id: UUID, mapping) -> List[R def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRoute]) -> Dict[str, Any]: """Converts a dictionary to a dictionary of attributes for use by the add or update methods.""" attributes = {} + for name, route in mapping.items(): if route.source not in data: continue value = data.get(route.source) - if value is not None: - attributes[name] = value + if value is None: + continue + attributes[name] = value + return attributes diff --git a/argilla/src/argilla/records/_mapping/_routes.py b/argilla/src/argilla/records/_mapping/_routes.py new file mode 100644 index 0000000000..9e316997e6 --- /dev/null +++ b/argilla/src/argilla/records/_mapping/_routes.py @@ -0,0 +1,113 @@ +# Copyright 2024-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Dict, List, Optional + +from pydantic import BaseModel, Field + + +class ParameterType(str, Enum): + """Parameter types are the different 'sub' values of a records attribute. + For example, the value, score, or agent of a suggestion.""" + + VALUE = "value" + SCORE = "score" + AGENT = "agent" + + @classmethod + def values(cls) -> List[str]: + return [param.value for param in cls] + + +class AttributeType(str, Enum): + """Attribute types are the different types of attributes a record can have.""" + + FIELD = "field" + SUGGESTION = "suggestion" + RESPONSE = "response" + METADATA = "metadata" + VECTOR = "vector" + ID = "id" + + @classmethod + def values(cls) -> List[str]: + return [attr.value for attr in cls] + + +class AttributeParameter(BaseModel): + """Attribute parameters are reference connections between the `ParameterType`'s of a records `AttributeType`. + The connect the source key to the sub value of the attribute. i.e. column name 'score' to the 'score' of a suggestion. + """ + + parameter_type: ParameterType = ParameterType.VALUE + source: str + + +class AttributeRoute(BaseModel): + """AttributeRoute is a reference connection between a record's attribute and a source value in the data. + It connects the source key with the attribute name and type. For example, connecting the columns 'score' + and 'y' to the 'score' and 'value' of a suggestion. + """ + + source: str + name: str + type: Optional[AttributeType] = None + parameters: List[AttributeParameter] = [] + + def set_parameter(self, parameter: AttributeParameter): + """Set a parameter for the route. + An existing parameter with same parameter type will be replaced by this new one. + """ + for p in self.parameters: + if p.parameter_type == parameter.parameter_type: + self.parameters.remove(p) + break + self.parameters.append(parameter) + + +class RecordAttributesMap(BaseModel): + """RecordAttributesMap is a representation of a record attribute mapping that is used to parse data into a record.""" + + suggestion: Dict[str, AttributeRoute] = Field(default_factory=dict) + response: Dict[str, AttributeRoute] = Field(default_factory=dict) + field: Dict[str, AttributeRoute] = Field(default_factory=dict) + metadata: Dict[str, AttributeRoute] = Field(default_factory=dict) + vector: Dict[str, AttributeRoute] = Field(default_factory=dict) + + id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) + + def _get_routes_group_by_type(self, type: AttributeType): + """ Utility method to facilitate getting the routes by type.""" + return { + AttributeType.SUGGESTION: self.suggestion, + AttributeType.RESPONSE: self.response, + AttributeType.FIELD: self.field, + AttributeType.METADATA: self.metadata, + AttributeType.VECTOR: self.vector, + AttributeType.ID: self.id, + }[type] + + def get_by_name_and_type(self, name: str, type: AttributeType) -> Optional[AttributeRoute]: + """Utility method to get a route by name and type""" + if name == "id" and AttributeType.ID: + return self.id + return self._get_routes_group_by_type(type).get(name) + + def add_route(self, attribute_route: AttributeRoute) -> None: + """Utility method to get a new mapping route""" + if attribute_route.type == AttributeType.ID: + self.id = attribute_route + else: + self._get_routes_group_by_type(attribute_route.type)[attribute_route.name] = attribute_route From 94988eb068f8c03dd5774e379b1c60505a060b36 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 Jul 2024 09:52:47 +0000 Subject: [PATCH 31/32] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- argilla/src/argilla/records/_mapping/_mapper.py | 10 ++++++---- argilla/src/argilla/records/_mapping/_routes.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/argilla/src/argilla/records/_mapping/_mapper.py b/argilla/src/argilla/records/_mapping/_mapper.py index fcf2e59905..08c3e6ed6f 100644 --- a/argilla/src/argilla/records/_mapping/_mapper.py +++ b/argilla/src/argilla/records/_mapping/_mapper.py @@ -173,12 +173,14 @@ def _select_attribute_type(self, attribute_route: AttributeRoute) -> AttributeRo elif attribute_route.name == "id": attribute_route.type = AttributeType.ID else: - warnings.warn(message=f"Record attribute {attribute_route.name} is not in the schema or mapping so skipping.") + warnings.warn( + message=f"Record attribute {attribute_route.name} is not in the schema or mapping so skipping." + ) return attribute_route def _schematize_default_attributes(self) -> RecordAttributesMap: """Creates the mapping with default attribute routes. Uses the schema of the dataset to determine - the default attributes and add them to the mapping with their names as keys. This means that + the default attributes and add them to the mapping with their names as keys. This means that keys in the data that match the names of dataset attributes will be mapped to them by default. Returns: @@ -280,7 +282,7 @@ def _map_responses(self, data: Dict[str, Any], user_id: UUID, mapping) -> List[R def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRoute]) -> Dict[str, Any]: """Converts a dictionary to a dictionary of attributes for use by the add or update methods.""" attributes = {} - + for name, route in mapping.items(): if route.source not in data: continue @@ -288,5 +290,5 @@ def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRout if value is None: continue attributes[name] = value - + return attributes diff --git a/argilla/src/argilla/records/_mapping/_routes.py b/argilla/src/argilla/records/_mapping/_routes.py index 9e316997e6..6602921b8e 100644 --- a/argilla/src/argilla/records/_mapping/_routes.py +++ b/argilla/src/argilla/records/_mapping/_routes.py @@ -57,7 +57,7 @@ class AttributeParameter(BaseModel): class AttributeRoute(BaseModel): """AttributeRoute is a reference connection between a record's attribute and a source value in the data. - It connects the source key with the attribute name and type. For example, connecting the columns 'score' + It connects the source key with the attribute name and type. For example, connecting the columns 'score' and 'y' to the 'score' and 'value' of a suggestion. """ @@ -89,7 +89,7 @@ class RecordAttributesMap(BaseModel): id: AttributeRoute = AttributeRoute(source="id", name="id", type=AttributeType.ID) def _get_routes_group_by_type(self, type: AttributeType): - """ Utility method to facilitate getting the routes by type.""" + """Utility method to facilitate getting the routes by type.""" return { AttributeType.SUGGESTION: self.suggestion, AttributeType.RESPONSE: self.response, From 27697938e556a1d68dacd48b5e4e64d6f0942be5 Mon Sep 17 00:00:00 2001 From: Ben Burtenshaw Date: Thu, 4 Jul 2024 12:04:18 +0200 Subject: [PATCH 32/32] fix: raise error not warn when mapped attribute is unknown --- argilla/src/argilla/records/_mapping/_mapper.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/argilla/src/argilla/records/_mapping/_mapper.py b/argilla/src/argilla/records/_mapping/_mapper.py index fcf2e59905..c43ad7dd31 100644 --- a/argilla/src/argilla/records/_mapping/_mapper.py +++ b/argilla/src/argilla/records/_mapping/_mapper.py @@ -13,10 +13,10 @@ # limitations under the License. import re -import warnings from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union, Tuple from uuid import UUID +from argilla._exceptions import RecordsIngestionError from argilla.records._resource import Record from argilla.responses import Response from argilla.settings import TextField, VectorField @@ -60,8 +60,9 @@ def __init__( self._schema = dataset.schema self.user_id = user_id + mapping = mapping or {} default_mapping = self._schematize_default_attributes() - self.mapping = self._schematize_mapped_attributes(mapping=mapping or {}, default_mapping=default_mapping) + self.mapping = self._schematize_mapped_attributes(mapping=mapping, default_mapping=default_mapping) def __call__(self, data: Dict[str, Any], user_id: Optional[UUID] = None) -> Record: """Maps a dictionary of data to a record object. @@ -173,12 +174,12 @@ def _select_attribute_type(self, attribute_route: AttributeRoute) -> AttributeRo elif attribute_route.name == "id": attribute_route.type = AttributeType.ID else: - warnings.warn(message=f"Record attribute {attribute_route.name} is not in the schema or mapping so skipping.") + raise RecordsIngestionError(f"Mapped attribute is not a valid dataset attribute: {attribute_route.name}.") return attribute_route def _schematize_default_attributes(self) -> RecordAttributesMap: """Creates the mapping with default attribute routes. Uses the schema of the dataset to determine - the default attributes and add them to the mapping with their names as keys. This means that + the default attributes and add them to the mapping with their names as keys. This means that keys in the data that match the names of dataset attributes will be mapped to them by default. Returns: @@ -280,7 +281,7 @@ def _map_responses(self, data: Dict[str, Any], user_id: UUID, mapping) -> List[R def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRoute]) -> Dict[str, Any]: """Converts a dictionary to a dictionary of attributes for use by the add or update methods.""" attributes = {} - + for name, route in mapping.items(): if route.source not in data: continue @@ -288,5 +289,5 @@ def _map_attributes(self, data: Dict[str, Any], mapping: Dict[str, AttributeRout if value is None: continue attributes[name] = value - + return attributes