-
Notifications
You must be signed in to change notification settings - Fork 2
Data Preparer and PreparerSteps base class #98
Changes from all commits
2647236
5c2033e
58fdb8b
80bd55a
75decdf
cd13449
dd910b2
ed30660
c7d14d0
d64a524
5f78dd9
a67d197
2e67a71
d4bfeae
9b5018d
ef73579
b2e409a
9576014
85633dc
a8f85ab
de2b0ff
0971f4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,10 @@ | ||
"""Cleaner module for handling the cleaning and shaping of data.""" | ||
|
||
from foreshadow.cleaners.data_cleaner import ( | ||
DataCleaner, | ||
SmartCleaner, | ||
SmartFlatten, | ||
) | ||
|
||
|
||
__all__ = ["SmartCleaner", "DataCleaner", "SmartFlatten"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,323 @@ | ||
"""Cleaner module for cleaning data as step in Foreshadow workflow.""" | ||
from collections import namedtuple | ||
|
||
import pandas as pd | ||
from sklearn.base import BaseEstimator, TransformerMixin | ||
|
||
from foreshadow.core.preparerstep import PreparerStep | ||
from foreshadow.exceptions import InvalidDataFrame | ||
from foreshadow.metrics.internals import avg_col_regex, regex_rows | ||
from foreshadow.transformers.core import SmartTransformer | ||
from foreshadow.transformers.core.notransform import NoTransform | ||
from foreshadow.utils.testing import dynamic_import | ||
from foreshadow.utils.validation import check_df | ||
|
||
|
||
CleanerReturn = namedtuple("CleanerReturn", ["row", "match_lens"]) | ||
|
||
|
||
class DataCleaner(PreparerStep): | ||
"""Determine and perform best data cleaning step.""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
"""Define the single step for DataCleaner, using SmartCleaner. | ||
|
||
Args: | ||
*args: args to PreparerStep constructor. | ||
**kwargs: kwargs to PreparerStep constructor. | ||
|
||
""" | ||
super().__init__(*args, use_single_pipeline=True, **kwargs) | ||
|
||
def get_mapping(self, X): | ||
"""Return the mapping of transformations for the DataCleaner step. | ||
|
||
class DataCleaner(BaseEstimator, TransformerMixin): | ||
"""Wrapper class to determine and perform best data cleaning step.""" | ||
Args: | ||
X: input DataFrame. | ||
|
||
Returns: | ||
Mapping in accordance with super. | ||
|
||
""" | ||
return self.separate_cols( | ||
transformers=[ | ||
[ | ||
SmartFlatten(column_sharer=self.column_sharer), | ||
SmartCleaner(column_sharer=self.column_sharer), | ||
] | ||
for c in X | ||
], | ||
X=X, | ||
) | ||
|
||
|
||
class SmartCleaner(SmartTransformer): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be in smart? |
||
"""Intelligently decide which cleaning function should be applied.""" | ||
|
||
def __init__(self, **kwargs): | ||
"""Stub init method. | ||
super().__init__(**kwargs) | ||
|
||
def pick_transformer(self, X, y=None, **fit_params): | ||
"""Get best transformer for a given column. | ||
|
||
Args: | ||
**kwargs: placeholder. | ||
X: input DataFrame | ||
y: input labels | ||
**fit_params: fit_params | ||
|
||
Returns: | ||
Best data cleaning transformer. | ||
|
||
""" | ||
super().__init__() | ||
from foreshadow.cleaners.internals import __all__ as cleaners | ||
|
||
cleaners = [ | ||
(dynamic_import(cleaner, "foreshadow.cleaners.internals"), cleaner) | ||
for cleaner in cleaners | ||
if cleaner.lower().find("cleaner") != -1 | ||
] | ||
best_score = 0 | ||
best_cleaner = None | ||
for cleaner, name in cleaners: | ||
cleaner = cleaner(column_sharer=self.column_sharer, name=name) | ||
score = cleaner.metric_score(X) | ||
if score > best_score: | ||
best_score = score | ||
best_cleaner = cleaner | ||
|
||
if best_cleaner is None: | ||
return NoTransform(column_sharer=self.column_sharer) | ||
return best_cleaner | ||
|
||
def fit(self, X, y=None, **fit_params): | ||
"""Stub fit method. | ||
|
||
class SmartFlatten(SmartTransformer): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might add a class doc regardless to talk about the general use for SmartFlatten |
||
"""Smartly determine how to flatten an input DataFrame.""" | ||
|
||
def __init__(self, **kwargs): | ||
cchoquette marked this conversation as resolved.
Show resolved
Hide resolved
|
||
super().__init__(**kwargs) | ||
|
||
def pick_transformer(self, X, y=None, **fit_params): | ||
"""Get best transformer for a given column. | ||
|
||
Args: | ||
X: input data | ||
y: labels | ||
**fit_params: params to fit method. | ||
X: input DataFrame | ||
y: input labels | ||
**fit_params: fit_params | ||
|
||
Returns: | ||
Best data flattening transformer | ||
|
||
""" | ||
pass | ||
from foreshadow.cleaners.internals import __all__ as flatteners | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this will be replaced by config? |
||
|
||
flatteners = [ | ||
( | ||
dynamic_import(flattener, "foreshadow.cleaners.internals"), | ||
flattener, | ||
) | ||
for flattener in flatteners | ||
if flattener.lower().find("flatten") != -1 | ||
] | ||
|
||
best_score = 0 | ||
best_flattener = None | ||
for flattener, name in flatteners: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it is worth looking at a combination of these solutions? |
||
flattener = flattener(column_sharer=self.column_sharer, name=name) | ||
score = flattener.metric_score(X) | ||
if score > best_score: | ||
best_score = score | ||
best_flattener = flattener | ||
|
||
if best_flattener is None: | ||
return NoTransform(column_sharer=self.column_sharer) | ||
return best_flattener | ||
|
||
|
||
class BaseCleaner(BaseEstimator, TransformerMixin): | ||
"""Base class for any Cleaner Transformer.""" | ||
|
||
def __init__( | ||
self, | ||
transformations, | ||
output_columns=None, | ||
confidence_computation=None, | ||
default=lambda x: x, | ||
): | ||
"""Construct any cleaner/flattener. | ||
|
||
Args: | ||
transformations: a callable that takes a string and returns a | ||
tuple with the length of the transformed characters and then | ||
transformed string. | ||
output_columns: If none, any lists returned by the transformations | ||
are assumed to be separate columns in the new DataFrame. | ||
Otherwise, pass the names for each desired output | ||
column to be used. | ||
confidence_computation: The dict of {metric: weight} for the | ||
subclass's metric computation. This implies an OVR model. | ||
default: Function that returns the default value for a row if | ||
the transformation failed. Accepts the row as input. | ||
|
||
Raises: | ||
ValueError: If not a list, int, or None specifying expected | ||
output columns. | ||
|
||
""" | ||
if not isinstance(output_columns, (int, list, type(None))): | ||
raise ValueError("output columns not a valid type") | ||
|
||
self.default = default | ||
self.output_columns = output_columns | ||
self.transformations = transformations | ||
self.confidence_computation = {regex_rows: 0.8, avg_col_regex: 0.2} | ||
if confidence_computation is not None: | ||
self.confidence_computation = confidence_computation | ||
|
||
def metric_score(self, X): | ||
"""Compute the score for this cleaner using confidence_computation. | ||
|
||
confidence_computation is passed through init for each subclass. | ||
The confidence determines which cleaner/flattener is picked in an | ||
OVR fashion. | ||
|
||
Args: | ||
X: input DataFrame. | ||
|
||
Returns: | ||
float: confidence value. | ||
|
||
""" | ||
return sum( | ||
[ | ||
metric_fn(X, cleaner=self.transform_row) * weight | ||
for metric_fn, weight in self.confidence_computation.items() | ||
] | ||
) | ||
|
||
def transform_row(self, row_of_feature, return_tuple=True): | ||
"""Perform clean operations on text, that is a row of feature. | ||
|
||
Uses self.transformations determined at init time by the child class | ||
and performs the transformations sequentially. | ||
|
||
Args: | ||
row_of_feature: one row of one column | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add return_tuple |
||
return_tuple: return named_tuple object instead of just the row. | ||
This will often be set to False when passing this method to an | ||
external function (non source code) that will expect the | ||
output to only be the transformed row, such as DataFrame.apply. | ||
|
||
Returns: | ||
NamedTuple object with: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add the docstring below the named tuple and just sphinx reference the object. |
||
.text | ||
the text in row_of_feature transformed by transformations. If | ||
not possible, it will be None. | ||
.match_lens | ||
the number of characters from original text at each step that | ||
was transformed. | ||
|
||
""" | ||
matched_lengths = [] # this does not play nice with creating new | ||
# columns | ||
for transform in self.transformations: | ||
row = row_of_feature | ||
row, match_len = transform(row) | ||
if match_len == 0: | ||
matched_lengths.append(0) | ||
row = self.default(row_of_feature) | ||
break | ||
matched_lengths.append(match_len) | ||
if return_tuple: | ||
return CleanerReturn(row, matched_lengths) | ||
else: | ||
return row | ||
|
||
def fit(self, X, y=None): | ||
"""Empty fit. | ||
|
||
Args: | ||
X: input observations | ||
y: input labels | ||
|
||
Returns: | ||
self | ||
|
||
""" | ||
return self | ||
|
||
def transform(self, X, y=None): | ||
"""Clean string columns. | ||
|
||
Here, we assume that any list output means that these are desired | ||
to be new columns in our dataset. Contractually, this could change | ||
to be that a boolean flag is passed to indicate when this is | ||
desired, as of right now, there should be no need to return a list | ||
for any case other than this case of desiring new column. | ||
|
||
The same is assumed for dicts, where the key is the new column name, | ||
the value is the value for that row in that column. NaNs | ||
are automatically put into the columns that don't exist for given rows. | ||
|
||
Args: | ||
X (:obj:`pandas.Series`): X data | ||
y: input labels | ||
|
||
Returns: | ||
:obj:`pandas.DataFrame`: Transformed data | ||
|
||
Raises: | ||
InvalidDataFrame: If unexpected output returned that was not | ||
handled correctly. This happens if the output specified by the | ||
child does not match what is actually returned. The child | ||
should ensure it's implementation is consistent. | ||
|
||
""" | ||
X = check_df(X, single_column=True) | ||
# Problem: | ||
# I can use .apply to perform all these transformations and that | ||
# works beautifully, except when I want to define a funtion that | ||
# will use the pandas.series.str.split operation. In which case, | ||
# the .apply fails and I don't know why. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we ever find a solution to this? |
||
|
||
# I need each function to accept the row as an argument so that we | ||
# can inspect how much of the text was matched (for determining if | ||
# it should be used). however, doing this means I need to iterate | ||
# over each row for a given column on my own, which requires me to | ||
# leave | ||
|
||
out = X[X.columns[0]].apply(self.transform_row, return_tuple=False) | ||
# access single column as series and apply the list of | ||
# transformations to each row in the series. | ||
if any( | ||
[isinstance(out[i], (list, tuple)) for i in range(out.shape[0])] | ||
): # out are lists == new columns | ||
if not all( | ||
[len(out[0]) == len(out[i]) for i in range(len(out[0]))] | ||
): | ||
raise InvalidDataFrame( | ||
"length of lists: {}, returned not of same value.".format( | ||
[out[i] for i in range(len(out[0]))] | ||
) | ||
) | ||
columns = self.output_columns | ||
if columns is None: | ||
columns = [X.columns[0] + str(c) for c in range(len(out[0]))] | ||
# by default, pandas would have given a unique integer to | ||
# each column, instead, we keep the previous column name and | ||
# add that integer. | ||
X = pd.DataFrame([*out.values], columns=columns) | ||
elif any( | ||
[isinstance(out[i], (dict)) for i in range(out.shape[0])] | ||
): # out are dicts == named new columns | ||
all_keys = dict() | ||
for row in out: | ||
all_keys.update({key: True for key in row}) # get all columns | ||
columns = list(all_keys.keys()) | ||
out = pd.DataFrame([*out.values], columns=columns) | ||
out.columns = [X.columns[0] + "_" + c for c in columns] | ||
X = out | ||
# by default, this will create a DataFrame where if a row | ||
# contains the value, it will be added, if not NaN is added. | ||
else: # no lists, still 1 column output | ||
X[X.columns[0]] = out | ||
return X |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
"""Internal cleaners for handling the cleaning and shaping of data.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep all concrete transformers in concrete so there is one place to look for concrete transformers. |
||
|
||
import glob | ||
import inspect | ||
import os | ||
|
||
from foreshadow.transformers.core import _get_modules | ||
|
||
|
||
def _get_classes(): | ||
"""Return list of classes found in cleaners directory. | ||
|
||
Returns: | ||
list of classes found in cleaners directory | ||
|
||
""" | ||
files = glob.glob(os.path.dirname(__file__) + "/*.py") | ||
imports = [ | ||
os.path.basename(f)[:-3] | ||
for f in files | ||
if os.path.isfile(f) and not f.endswith("__init__.py") | ||
] | ||
modules = [ | ||
__import__(i, globals(), locals(), ["object"], 1) for i in imports | ||
] | ||
classes = [ | ||
c[1] | ||
for m in modules | ||
for c in inspect.getmembers(m) | ||
if inspect.isclass(c[1]) and c[1].__name__.find("Base") == -1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a note to change this once we merge serializer mixing changes in or move this function to utilities. |
||
] | ||
|
||
return classes | ||
|
||
|
||
classes = _get_modules(_get_classes(), globals(), __name__) | ||
__all__ = classes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably inherit from ConcreteSerializerMixin