Skip to content

Commit

Permalink
Fix indexing in PII Modifier (NVIDIA#55)
Browse files Browse the repository at this point in the history
* Fix pii index issue

Signed-off-by: Ryan Wolf <[email protected]>

* Add sequential wrapper

Signed-off-by: Ryan Wolf <[email protected]>

* Fix pii tests

Signed-off-by: Ryan Wolf <[email protected]>

---------

Signed-off-by: Ryan Wolf <[email protected]>
Signed-off-by: Nicole Luo <[email protected]>
  • Loading branch information
ryantwolf authored and nicoleeeluo committed May 20, 2024
1 parent 0bab063 commit 9849164
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 5 deletions.
27 changes: 27 additions & 0 deletions docs/user-guide/QualityFiltering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,33 @@ Here is the ``WordCountFilter`` rewritten to use batches in the ``keep_document`
pass_max = score <= self._max_words
return pass_min & pass_max
When you use the ``batched`` decorator, the index of the series returned from the function must remain the same as the index that was passed in.
The index may not be continuous due to filters being applied prior to the current filter.
In the above code, the index will be the same automatically so no change is required.
However, when writing functions that transform the series into a different structure like a list, special care is needed.
The following code example demonstrates what this error may look like, and how to fix it.

.. code-block:: python
class BuggyLengthFilter(DocumentFilter):
@batched
def score_document(self, documents: pd.Series):
scores = []
for document in documents:
scores.append(len(document))
return pd.Series(scores) # Bad! Does not preserve the index
class CorrectLengthFilter(DocumentFilter):
@batched
def score_document(self, documents: pd.Series):
scores = []
for document in documents:
scores.append(len(document))
return pd.Series(scores, index=documents.index) # Good! Preserves the index
-----------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions nemo_curator/filters/classifier_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, model_path=None, label="__label__hq", alpha=3, seed=42):
self._name = "fasttext_quality_filter"

@batched
def score_document(self, df):
def score_document(self, df: pd.Series):
model_attr = f"{self._name}_{self._model_path}"
try:
model = load_object_on_worker(model_attr, self._load_model, {})
Expand All @@ -56,7 +56,7 @@ def _score_document(text):
return df.apply(_score_document)

@batched
def keep_document(self, df):
def keep_document(self, df: pd.Series):
return np.random.pareto(self._alpha, size=len(df)) > 1 - df

def _load_model(self):
Expand All @@ -82,7 +82,7 @@ def __init__(self, model_path=None, min_langid_score=0.3):
dask.config.set({"dataframe.convert-string": False})

@batched
def score_document(self, df):
def score_document(self, df: pd.Series):
model_attr = f"{self._name}_{self._model_path}"
try:
model = load_object_on_worker(model_attr, self._load_model, {})
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from .c4 import BoilerPlateStringModifier
from .doc_modifier import DocumentModifier
from .fasttext import FastTextLabelModifier
from .pii_modifier import PiiModifier
from .unicode_reformatter import UnicodeReformatter

__all__ = [
"DocumentModifier",
"BoilerPlateStringModifier",
"FastTextLabelModifier",
"UnicodeReformatter",
"PiiModifier",
]
4 changes: 2 additions & 2 deletions nemo_curator/modifiers/pii_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def modify_document(self, text: pd.Series, partition_info: Dict = None):
logging.error(
f"Encountered error {str(e)} in partition {partition_info['number']}"
)
return pd.Series([True])
output: pd.Series = pd.Series(output)
return pd.Series([True], index=text.index)
output: pd.Series = pd.Series(output, text.index)
return output

def load_deidentifier(self):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,23 @@ def test_score_type(self, letter_count_data):
expected_scores == scores.compute()
), f"Expected {expected_scores} but got {scores}"

def test_chain_filter(self, letter_count_data):
letter_count_filter = LetterCountFilter(min_count=4)
length_filter = BatchedLengthFilter(min_length=8, max_length=11)
filters = Sequential(
[
ScoreFilter(letter_count_filter, text_field="documents"),
ScoreFilter(length_filter, text_field="documents"),
]
)
filtered_data = filters(letter_count_data)

expected_indices = [2]
expected_data = DocumentDataset(letter_count_data.df.loc[expected_indices])
assert all_equal(
expected_data, filtered_data
), f"Expected {expected_data} but got {filtered_data}"


class TestHeuristicFilters:
def test_nonalpha(self):
Expand Down
68 changes: 68 additions & 0 deletions tests/test_pii_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
import re
from pathlib import Path

import pandas as pd
import pytest
from dask import dataframe as dd
from dask.distributed import Client, LocalCluster

import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import DocumentFilter
from nemo_curator.modifiers import PiiModifier
from nemo_curator.pii.algorithm import PiiDeidentifier
from nemo_curator.utils.decorators import batched

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,3 +126,63 @@ def test_batch_accuracy(self):
match = all(compare_outputs(x, y) for x, y in zip(outputs, targets))
print("Matches:", "No" if not match else "Yes")
assert match == True


class BatchedLengthFilter(DocumentFilter):
"""
Keeps documents of a given length
"""

def __init__(self, min_length=5, max_length=10):
super().__init__()
self.min_length = min_length
self.max_length = max_length

@batched
def score_document(self, df):
return df.str.len()

@batched
def keep_document(self, scores):
min_threshold = self.min_length <= scores
max_threshold = scores <= self.max_length
return min_threshold & max_threshold


class TestPIIModule:
def test_filter_chain(self):
inputs = [
"Alice goes on a walk",
"Bob goes on a walk",
"Someone named Charlie goes on a walk",
"A human walking is David",
"A human walking is Eliza",
]
targets = [
"***** goes on a walk",
"*** goes on a walk",
"A human walking is *****",
"A human walking is *****",
]
input_df = pd.DataFrame({"text": inputs})
target_df = pd.DataFrame({"text": targets})
with LocalCluster(n_workers=1, threads_per_worker=1) as cluster:
with Client(cluster):
input_dataset = DocumentDataset(dd.from_pandas(input_df, npartitions=1))
pipeline = nc.Sequential(
[
nc.ScoreFilter(
BatchedLengthFilter(min_length=0, max_length=25)
),
nc.Modify(
PiiModifier(
language="en", anonymize_action="mask", device="cpu"
)
),
]
)
output_dataset = pipeline(input_dataset)

output_df = output_dataset.df.compute().reset_index(drop=True)
match = all(output_df["text"] == target_df["text"])
assert match

0 comments on commit 9849164

Please sign in to comment.