Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data integration API implementation #53

Merged
merged 24 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d3ff7f0
feat(api): Add new function bdi.match_columns()
aecio Jun 6, 2024
ca8f0b8
feat(api): Add new function bdi.top_matches()
aecio Jun 6, 2024
ee9c09f
feat(api): Added bdi.materialize_mapping() and basic value mappers
aecio Jun 7, 2024
49ab313
Merge branch 'devel' into new_api
aecio Jun 7, 2024
adae6d7
Change column matching API to be stateless
aecio Jun 7, 2024
bc79764
Adding TwoPhase ColumnMatch algorithm based on CTLearning
EduardoPena Jun 10, 2024
ed4d42c
Formating with black, small fix for columns with no matches
EduardoPena Jun 10, 2024
5e1f147
Formating using black variation
EduardoPena Jun 10, 2024
c85de2b
Moving model_name,top_k to construtor parameters for better class reuse
EduardoPena Jun 11, 2024
176fd14
Merge pull request #55 from VIDA-NYU/new_api_column_matching_algs
EduardoPena Jun 11, 2024
838f7e6
feat(api): add match_values() and preview_value_mappings()
aecio Jun 12, 2024
1e71758
test(api): Add end-to-end API integration test
aecio Jun 20, 2024
3fc6b94
feat(api): Add bdi.preview_domains()
aecio Jun 21, 2024
ebe528b
refactor(api): Make API inputs more compatible
aecio Jun 22, 2024
c1d3812
feat(api): Support an object as a method in match_columns()
aecio Jun 24, 2024
c84023a
Cast dictionary key to string
roquelopez Jun 25, 2024
718b3d6
Fix #57
roquelopez Jun 25, 2024
e55f070
Change default algorithm for column mapping
roquelopez Jun 25, 2024
b4f8fb1
Make EditAlgorithm to return values between 0 and 1
roquelopez Jun 25, 2024
39fdfd9
Update threshold to fix test
roquelopez Jun 25, 2024
be7c1bd
refactor: Extract TopkColumnMatcher from ContrastiveLearningAPI
aecio Jun 26, 2024
154f5e9
Additional documentation and implementation improvements
aecio Jun 27, 2024
5f65b62
Rename notebook to doc_gdc_harmonization.ipynb
aecio Jun 27, 2024
732c83d
Fix API documentation
roquelopez Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bdikit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
__version__ = "0.2.0.dev0"
# To shortcut the import path
from bdikit.api import APIManager
from bdikit.functional_api import *
138 changes: 138 additions & 0 deletions bdikit/functional_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from enum import Enum
from os.path import join, dirname
from typing import Union, Type, List, Optional
import pandas as pd
from bdikit.download import get_cached_model_or_download
from bdikit.mapping_algorithms.column_mapping.algorithms import (
BaseColumnMappingAlgorithm,
SimFloodAlgorithm,
ComaAlgorithm,
CupidAlgorithm,
DistributionBasedAlgorithm,
JaccardDistanceAlgorithm,
GPTAlgorithm,
)
from bdikit.mapping_algorithms.value_mapping.value_mappers import ValueMapper
from bdikit.mapping_algorithms.scope_reducing._algorithms.contrastive_learning.cl_api import (
ContrastiveLearningAPI,
)

GDC_DATA_PATH = join(dirname(__file__), "./resource/gdc_table.csv")


class ColumnMappingMethod(Enum):
aecio marked this conversation as resolved.
Show resolved Hide resolved
SIMFLOOD = ("similarity_flooding", SimFloodAlgorithm)
COMA = ("coma", ComaAlgorithm)
CUPID = ("cupid", CupidAlgorithm)
DISTRIBUTION_BASED = ("distribution_based", DistributionBasedAlgorithm)
JACCARD_DISTANCE = ("jaccard_distance", JaccardDistanceAlgorithm)
GPT = ("gpt", GPTAlgorithm)

def __init__(
self, method_name: str, method_class: Type[BaseColumnMappingAlgorithm]
):
self.method_name = method_name
self.method_class = method_class

@staticmethod
def get_instance(method_name: str) -> BaseColumnMappingAlgorithm:
methods = {
method.method_name: method.method_class for method in ColumnMappingMethod
}
try:
return methods[method_name]()
except KeyError:
names = ", ".join(list(methods.keys()))
raise ValueError(
f"The {method_name} algorithm is not supported. "
f"Supported algorithms are: {names}"
)


def match_columns(
source: pd.DataFrame,
target: Union[str, pd.DataFrame] = "gdc",
method: str = ColumnMappingMethod.SIMFLOOD.name,
) -> pd.DataFrame:
"""
Performs schema mapping between the source table and the given target. The target
either is a DataFrame or a string representing a standard data vocabulary.
"""
if isinstance(target, str):
target_table = _load_table_for_standard(target)
else:
target_table = target

matcher_instance = ColumnMappingMethod.get_instance(method)
matches = matcher_instance.map(source, target_table)

return pd.DataFrame(matches.items(), columns=["source", "target"])


def _load_table_for_standard(name: str) -> pd.DataFrame:
"""
Load the table for the given standard data vocabulary. Currently, only the
GDC standard is supported.
"""
if name == "gdc":
return pd.read_csv(GDC_DATA_PATH)
else:
raise ValueError(f"The {name} standard is not supported")


def top_matches(
source: pd.DataFrame,
columns: Optional[List[str]] = None,
target: Union[str, pd.DataFrame] = "gdc",
top_k: int = 10,
) -> pd.DataFrame:
"""
Returns the top-k matches between the source and target tables.
"""

if isinstance(target, str):
target_table = _load_table_for_standard(target)
else:
target_table = target

if columns is not None and len(columns) > 0:
selected_columns = source[columns]
else:
selected_columns = source

model_path = get_cached_model_or_download("cl-reducer-v0.1")
api = ContrastiveLearningAPI(model_path=model_path, top_k=top_k)
_, scopes_json = api.get_recommendations(selected_columns, target=target_table)

dfs = []
for scope in scopes_json:
matches = pd.DataFrame(
scope["Top k columns"], columns=["matches", "similarity"]
)
matches["source"] = scope["Candidate column"]
matches = matches[["source", "matches", "similarity"]]
dfs.append(matches.sort_values(by="similarity", ascending=False))

return pd.concat(dfs, ignore_index=True)


def materialize_mapping(
aecio marked this conversation as resolved.
Show resolved Hide resolved
input_dataframe: pd.DataFrame, target: List[dict]
) -> pd.DataFrame:
output_dataframe = pd.DataFrame()
for mapping_spec in target:
from_column_name = mapping_spec["from"]
to_column_name = mapping_spec["to"]
value_mapper = mapping_spec["mapper"]
output_dataframe[to_column_name] = map_column_values(
input_dataframe[from_column_name], to_column_name, value_mapper
)
return output_dataframe


def map_column_values(
input_column: pd.Series, target: str, value_mapper: ValueMapper
) -> pd.Series:
new_column = value_mapper.map(input_column)
new_column.name = target
return new_column
46 changes: 19 additions & 27 deletions bdikit/mapping_algorithms/column_mapping/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,16 @@


class BaseColumnMappingAlgorithm:
def __init__(self, dataset, global_table):
self._dataset = dataset
self._global_table = global_table

def map(self) -> Dict[str, str]:
def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame) -> Dict[str, str]:
raise NotImplementedError("Subclasses must implement this method")


class ValentineColumnMappingAlgorithm(BaseColumnMappingAlgorithm):
def __init__(self, dataset, global_table, matcher: BaseMatcher):
super().__init__(dataset, global_table)
def __init__(self, matcher: BaseMatcher):
self.matcher = matcher

def map(self) -> Dict[str, str]:
matches: MatcherResults = valentine_match(
self._dataset, self._global_table, self.matcher
)
def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame) -> Dict[str, str]:
matches: MatcherResults = valentine_match(dataset, global_table, self.matcher)
mappings = {}
for match in matches.one_to_one():
dataset_candidate = match[0][1]
Expand All @@ -40,42 +33,41 @@ def map(self) -> Dict[str, str]:


class SimFloodAlgorithm(ValentineColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table, SimilarityFlooding())
def __init__(self):
super().__init__(SimilarityFlooding())


class ComaAlgorithm(ValentineColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table, Coma())
def __init__(self):
super().__init__(Coma())


class CupidAlgorithm(ValentineColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table, Cupid())
def __init__(self):
super().__init__(Cupid())


class DistributionBasedAlgorithm(ValentineColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table, DistributionBased())
def __init__(self):
super().__init__(DistributionBased())


class JaccardDistanceAlgorithm(ValentineColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table, JaccardDistanceMatcher())
def __init__(self):
super().__init__(JaccardDistanceMatcher())


class GPTAlgorithm(BaseColumnMappingAlgorithm):
def __init__(self, dataset, global_table):
super().__init__(dataset, global_table)
def __init__(self):
self.client = OpenAI()

def map(self):
global_columns = self._global_table.columns
def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame):
global_columns = global_table.columns
labels = ", ".join(global_columns)
candidate_columns = self._dataset.columns
candidate_columns = dataset.columns
mappings = {}
for column in candidate_columns:
col = self._dataset[column]
col = dataset[column]
values = col.drop_duplicates().dropna()
if len(values) > 15:
rows = values.sample(15).tolist()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import List
from typing import List, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -37,8 +37,16 @@ def load_checkpoint(self, lm="roberta"):

return model

def get_recommendations(self, table: pd.DataFrame):
gdc_ds = pd.read_csv(GDC_TABLE_PATH)
def get_recommendations(
self, table: pd.DataFrame, target: Optional[Union[str, pd.DataFrame]] = None
):
if target is None or (isinstance(target, str) and target == "gdc"):
gdc_ds = pd.read_csv(GDC_TABLE_PATH)
elif isinstance(target, pd.DataFrame):
gdc_ds = target
else:
raise ValueError("Target must be a DataFrame or 'gdc'")

l_features = self._load_table_tokens(table)
r_features = self._load_table_tokens(gdc_ds)
cosine_sim = cosine_similarity(l_features, r_features)
Expand Down
62 changes: 62 additions & 0 deletions bdikit/mapping_algorithms/value_mapping/value_mappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pandas as pd


class ValueMapper:
"""
A ValueMapper represents objects that transform the values in a input
column to the values from a new output column.
"""

def map(self, input_column: pd.Series) -> pd.Series:
"""
Every concrete ValueMapper should implement this method, which takes a
pandas Series as input and returns a new pandas Series with transformed
values.
"""
pass


class IdentityValueMapper(ValueMapper):
"""
A column mapper that maps each value in input column into itself.
"""

def map(self, input_column: pd.Series) -> pd.Series:
"""
Simply copies the values in input_column to the output column.
"""
return input_column.copy()


class FunctionValueMapper(ValueMapper):
"""
A column mapper that transforms each value in the input column using the
provided custom function.
"""

def __init__(self, function):
self.function = function

def map(self, input_column: pd.Series) -> pd.Series:
"""
Applies the given function to each value in input_column to generate
the output column.
"""
return input_column.map(self.function)


class DictionaryMapper(ValueMapper):
"""
A column mapper that transforms each value in the input column using the
values stored in the provided dictionary.
"""

def __init__(self, dictionary: dict):
self.dictionary = dictionary

def map(self, input_column: pd.Series) -> pd.Series:
"""
Transforms the values in the input_column to the values specified in
the dictionary provided using the object constructor.
"""
return input_column.map(self.dictionary)
Loading
Loading