Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix indexing in PII Modifier #55

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/user-guide/QualityFiltering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,33 @@ Here is the ``WordCountFilter`` rewritten to use batches in the ``keep_document`
pass_max = score <= self._max_words
return pass_min & pass_max

When you use the ``batched`` decorator, the index of the series returned from the function must remain the same as the index that was passed in.
The index may not be continuous due to filters being applied prior to the current filter.
In the above code, the index will be the same automatically so no change is required.
However, when writing functions that transform the series into a different structure like a list, special care is needed.
The following code example demonstrates what this error may look like, and how to fix it.

.. code-block:: python

class BuggyLengthFilter(DocumentFilter):

@batched
def score_document(self, documents: pd.Series):
scores = []
for document in documents:
scores.append(len(document))

return pd.Series(scores) # Bad! Does not preserve the index

class CorrectLengthFilter(DocumentFilter):

@batched
def score_document(self, documents: pd.Series):
scores = []
for document in documents:
scores.append(len(document))

return pd.Series(scores, index=documents.index) # Good! Preserves the index


-----------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions nemo_curator/filters/classifier_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, model_path=None, label="__label__hq", alpha=3, seed=42):
self._name = "fasttext_quality_filter"

@batched
def score_document(self, df):
def score_document(self, df: pd.Series):
model_attr = f"{self._name}_{self._model_path}"
try:
model = load_object_on_worker(model_attr, self._load_model, {})
Expand All @@ -56,7 +56,7 @@ def _score_document(text):
return df.apply(_score_document)

@batched
def keep_document(self, df):
def keep_document(self, df: pd.Series):
return np.random.pareto(self._alpha, size=len(df)) > 1 - df

def _load_model(self):
Expand All @@ -82,7 +82,7 @@ def __init__(self, model_path=None, min_langid_score=0.3):
dask.config.set({"dataframe.convert-string": False})

@batched
def score_document(self, df):
def score_document(self, df: pd.Series):
model_attr = f"{self._name}_{self._model_path}"
try:
model = load_object_on_worker(model_attr, self._load_model, {})
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from .c4 import BoilerPlateStringModifier
from .doc_modifier import DocumentModifier
from .fasttext import FastTextLabelModifier
from .pii_modifier import PiiModifier
from .unicode_reformatter import UnicodeReformatter

__all__ = [
"DocumentModifier",
"BoilerPlateStringModifier",
"FastTextLabelModifier",
"UnicodeReformatter",
"PiiModifier",
]
4 changes: 2 additions & 2 deletions nemo_curator/modifiers/pii_modifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def modify_document(self, text: pd.Series, partition_info: Dict = None):
logging.error(
f"Encountered error {str(e)} in partition {partition_info['number']}"
)
return pd.Series([True])
output: pd.Series = pd.Series(output)
return pd.Series([True], index=text.index)
output: pd.Series = pd.Series(output, text.index)
return output

def load_deidentifier(self):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,23 @@ def test_score_type(self, letter_count_data):
expected_scores == scores.compute()
), f"Expected {expected_scores} but got {scores}"

def test_chain_filter(self, letter_count_data):
letter_count_filter = LetterCountFilter(min_count=4)
length_filter = BatchedLengthFilter(min_length=8, max_length=11)
filters = Sequential(
[
ScoreFilter(letter_count_filter, text_field="documents"),
ScoreFilter(length_filter, text_field="documents"),
]
)
filtered_data = filters(letter_count_data)

expected_indices = [2]
expected_data = DocumentDataset(letter_count_data.df.loc[expected_indices])
assert all_equal(
expected_data, filtered_data
), f"Expected {expected_data} but got {filtered_data}"


class TestHeuristicFilters:
def test_nonalpha(self):
Expand Down
68 changes: 68 additions & 0 deletions tests/test_pii_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
import re
from pathlib import Path

import pandas as pd
import pytest
from dask import dataframe as dd
from dask.distributed import Client, LocalCluster

import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import DocumentFilter
from nemo_curator.modifiers import PiiModifier
from nemo_curator.pii.algorithm import PiiDeidentifier
from nemo_curator.utils.decorators import batched

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,3 +126,63 @@ def test_batch_accuracy(self):
match = all(compare_outputs(x, y) for x, y in zip(outputs, targets))
print("Matches:", "No" if not match else "Yes")
assert match == True


class BatchedLengthFilter(DocumentFilter):
"""
Keeps documents of a given length
"""

def __init__(self, min_length=5, max_length=10):
super().__init__()
self.min_length = min_length
self.max_length = max_length

@batched
def score_document(self, df):
return df.str.len()

@batched
def keep_document(self, scores):
min_threshold = self.min_length <= scores
max_threshold = scores <= self.max_length
return min_threshold & max_threshold


class TestPIIModule:
def test_filter_chain(self):
inputs = [
"Alice goes on a walk",
"Bob goes on a walk",
"Someone named Charlie goes on a walk",
"A human walking is David",
"A human walking is Eliza",
]
targets = [
"***** goes on a walk",
"*** goes on a walk",
"A human walking is *****",
"A human walking is *****",
]
input_df = pd.DataFrame({"text": inputs})
target_df = pd.DataFrame({"text": targets})
with LocalCluster(n_workers=1, threads_per_worker=1) as cluster:
with Client(cluster):
input_dataset = DocumentDataset(dd.from_pandas(input_df, npartitions=1))
pipeline = nc.Sequential(
[
nc.ScoreFilter(
BatchedLengthFilter(min_length=0, max_length=25)
),
nc.Modify(
PiiModifier(
language="en", anonymize_action="mask", device="cpu"
)
),
]
)
output_dataset = pipeline(input_dataset)

output_df = output_dataset.df.compute().reset_index(drop=True)
match = all(output_df["text"] == target_df["text"])
assert match