Skip to content

Commit

Permalink
feat: Column and value matchers based on SPLADE
Browse files Browse the repository at this point in the history
  • Loading branch information
aecio committed Jul 19, 2024
1 parent 2f1a5d9 commit 3c20e91
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 13 deletions.
33 changes: 29 additions & 4 deletions bdikit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
GPTSchemaMatcher,
ContrastiveLearningSchemaMatcher,
TwoPhaseSchemaMatcher,
SpladeSchemaMatcher,
)
from bdikit.mapping_algorithms.value_mapping.value_mappers import ValueMapper
from bdikit.models.contrastive_learning.cl_api import (
DEFAULT_CL_MODEL,
)
from bdikit.mapping_algorithms.column_mapping.topk_matchers import (
TopkColumnMatcher,
CLTopkColumnMatcher,
SpladeMaxSimTopkColumnMatcher,
)
from bdikit.mapping_algorithms.value_mapping.algorithms import (
ValueMatch,
Expand All @@ -33,6 +33,7 @@
EmbeddingValueMatcher,
AutoFuzzyJoinValueMatcher,
FastTextValueMatcher,
SpladeValueMatcher,
)
from bdikit.mapping_algorithms.value_mapping.value_mappers import (
ValueMapper,
Expand All @@ -55,6 +56,7 @@ class SchemaMatchers(Enum):
JACCARD_DISTANCE = ("jaccard_distance", JaccardSchemaMatcher)
GPT = ("gpt", GPTSchemaMatcher)
CT_LEARGNING = ("ct_learning", ContrastiveLearningSchemaMatcher)
SPLADE = ("splade", SpladeSchemaMatcher)
TWO_PHASE = ("two_phase", TwoPhaseSchemaMatcher)

def __init__(self, method_name: str, method_class: Type[BaseSchemaMatcher]):
Expand Down Expand Up @@ -126,11 +128,33 @@ def _load_table_for_standard(name: str) -> pd.DataFrame:
raise ValueError(f"The {name} standard is not supported")


class TopkMatchers(Enum):
CT_LEARGNING = ("ct_learning", CLTopkColumnMatcher)
SPLADE_MAX_SIM = ("splade_max_sim", SpladeMaxSimTopkColumnMatcher)

def __init__(self, method_name: str, method_class: Type[TopkColumnMatcher]):
self.method_name = method_name
self.method_class = method_class

@staticmethod
def get_instance(method_name: str) -> TopkColumnMatcher:
methods = {method.method_name: method.method_class for method in TopkMatchers}
try:
return methods[method_name]()
except KeyError:
names = ", ".join(list(methods.keys()))
raise ValueError(
f"The {method_name} algorithm is not supported. "
f"Supported algorithms are: {names}"
)


def top_matches(
source: pd.DataFrame,
columns: Optional[List[str]] = None,
target: Union[str, pd.DataFrame] = "gdc",
top_k: int = 10,
method: str = "ct_learning",
) -> pd.DataFrame:
"""
Returns the top-k matches between the source and target tables.
Expand All @@ -155,7 +179,7 @@ def top_matches(
else:
selected_columns = source

topk_matcher = CLTopkColumnMatcher(model_name=DEFAULT_CL_MODEL)
topk_matcher = TopkMatchers.get_instance(method_name=method)
top_k_matches = topk_matcher.get_recommendations(
selected_columns, target=target_table, top_k=top_k
)
Expand All @@ -177,6 +201,7 @@ class ValueMatchers(Enum):
AUTOFJ = ("auto_fuzzy_join", AutoFuzzyJoinValueMatcher)
FASTTEXT = ("fasttext", FastTextValueMatcher)
GPT = ("gpt", GPTValueMatcher)
SPLADE = ("splade", SpladeValueMatcher)

def __init__(self, method_name: str, method_class: Type[BaseValueMatcher]):
self.method_name = method_name
Expand Down
17 changes: 17 additions & 0 deletions bdikit/mapping_algorithms/column_mapping/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from bdikit.mapping_algorithms.column_mapping.topk_matchers import (
TopkColumnMatcher,
CLTopkColumnMatcher,
SpladeTopkColumnMatcher,
)


Expand Down Expand Up @@ -137,6 +138,22 @@ def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame):
return self._fill_missing_matches(dataset, matches)


class SpladeSchemaMatcher(BaseSchemaMatcher):
def __init__(self, model_name: str = "naver/splade-cocondenser-ensembledistil"):
self.topk_matcher = SpladeTopkColumnMatcher(model_name=model_name)

def map(self, dataset: pd.DataFrame, global_table: pd.DataFrame):
topk_matches = self.topk_matcher.get_recommendations(
dataset, global_table, top_k=1
)
matches = {}
for column, top_k_match in zip(dataset.columns, topk_matches):
candidate = top_k_match["top_k_columns"][0][0]
if candidate in global_table.columns:
matches[column] = candidate
return self._fill_missing_matches(dataset, matches)


class TwoPhaseSchemaMatcher(BaseSchemaMatcher):

def __init__(
Expand Down
106 changes: 98 additions & 8 deletions bdikit/mapping_algorithms/column_mapping/topk_matchers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import ABCMeta, abstractmethod
from typing import List, NamedTuple, TypedDict
import pandas as pd
import numpy as np
Expand All @@ -6,6 +7,8 @@
ContrastiveLearningAPI,
DEFAULT_CL_MODEL,
)
from bdikit.models import ColumnEmbedder
from bdikit.models.splade import SpladeEmbedder


class ColumnScore(NamedTuple):
Expand All @@ -18,19 +21,17 @@ class TopkMatching(TypedDict):
top_k_columns: List[ColumnScore]


class TopkColumnMatcher:

class TopkColumnMatcher(metaclass=ABCMeta):
@abstractmethod
def get_recommendations(
self, source: pd.DataFrame, target: pd.DataFrame, top_k: int
) -> List[TopkMatching]: # type: ignore
) -> List[TopkMatching]:
pass


class CLTopkColumnMatcher(TopkColumnMatcher):
def __init__(self, model_name: str = DEFAULT_CL_MODEL):
# TODO: we can generalize this api to accept any embedding model
# and not just our contrastive learning model
self.api = ContrastiveLearningAPI(model_name=model_name)
class EmbeddingSimilarityTopkColumnMatcher(TopkColumnMatcher):
def __init__(self, column_embedder: ColumnEmbedder):
self.api = column_embedder

def get_recommendations(
self, source: pd.DataFrame, target: pd.DataFrame, top_k: int = 10
Expand Down Expand Up @@ -59,3 +60,92 @@ def get_recommendations(
)

return top_k_results


class CLTopkColumnMatcher(EmbeddingSimilarityTopkColumnMatcher):
def __init__(self, model_name: str = DEFAULT_CL_MODEL):
super().__init__(column_embedder=ContrastiveLearningAPI(model_name=model_name))


class SpladeTopkColumnMatcher(EmbeddingSimilarityTopkColumnMatcher):
def __init__(self, model_name: str = "naver/splade-cocondenser-ensembledistil"):
super().__init__(column_embedder=SpladeEmbedder(model_id=model_name))


class SpladeMaxSimTopkColumnMatcher(CLTopkColumnMatcher):
def __init__(
self,
):
super().__init__()
self.splade = SpladeEmbedder(model_id="naver/splade-cocondenser-ensembledistil")

def unique_string_values(self, column: pd.Series) -> pd.Series:
if pd.api.types.is_string_dtype(column):
return pd.Series(column.unique(), name=column.name)
else:
return pd.Series(column.unique().astype(str), name=column.name)

def get_recommendations(
self, source: pd.DataFrame, target: pd.DataFrame, top_k: int = 10
) -> List[TopkMatching]:
"""
Returns the top-k matching columns in the target table for each column
in the source table. The ranking is based on the cosine similarity of
the embeddings of the columns in the source and target tables.
"""
top_k_cl = super().get_recommendations(source, target, top_k)
top_k_results = []
for column_topk in top_k_cl:
# rerank top_k_columns based on ColumnSimilarity scores
new_scores: List[ColumnScore] = []
source_column = self.unique_string_values(
source[column_topk["source_column"]]
)
for target_score in column_topk["top_k_columns"]:
target_column = self.unique_string_values(
target[target_score.column_name]
)
score = self.similarity(source_column, target_column)
new_scores.append(
ColumnScore(column_name=target_score.column_name, score=score)
)

new_scores = sorted(new_scores, key=lambda x: x.score, reverse=True)
top_k_results.append(
{
"source_column": column_topk["source_column"],
"top_k_columns": new_scores[:top_k],
}
)

return top_k_results

def similarity(self, source: pd.Series, target: pd.Series) -> float:
assert isinstance(
source.name, str
), f"Column header must be a string but was: {source.name}"
assert isinstance(
target.name, str
), f"Column header must be a string but was: {target.name}"
source_header_emb = self.splade.embed_values([source.name]).column
target_header_emb = self.splade.embed_values([target.name]).column
header_sim = cosine_similarity([source_header_emb], [target_header_emb])[0][0]

source_embeddings = self.splade.embed_values(
source.to_list(), embed_values=True
)
target_embeddings = self.splade.embed_values(
target.to_list(), embed_values=True
)

max_similarities = []
for _, source_value_vec in source_embeddings.values.items():
max_sim = 0
for _, target_value_vec in target_embeddings.values.items():
sim = cosine_similarity([source_value_vec], [target_value_vec])[0][0]
if sim > max_sim:
max_sim = sim
max_similarities.append(sim)
values_sim = np.mean(max_similarities)

return 0.5 * header_sim + 0.5 * values_sim
60 changes: 60 additions & 0 deletions bdikit/mapping_algorithms/value_mapping/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from autofj import AutoFJ
from Levenshtein import ratio
import pandas as pd
import numpy as np
import flair
import torch
from sklearn.metrics.pairwise import cosine_similarity
from bdikit.config import get_device
from bdikit.models.splade import SpladeEmbedder


flair.device = torch.device(get_device())

Expand Down Expand Up @@ -203,3 +207,59 @@ def match(
except Exception as e:
return matches
return matches


class SpladeValueMatcher(BaseValueMatcher):
def __init__(self, model_id: str = "naver/splade-cocondenser-ensembledistil"):
self.splade_embedder = SpladeEmbedder(model_id)

def match(
self,
current_values: List[str],
target_values: List[str],
threshold: float = 0.25,
) -> List[ValueMatch]:

source_embs = self.splade_embedder.embed_values(
current_values, embed_values=True
)
target_embs = self.splade_embedder.embed_values(
target_values, embed_values=True
)

assert (
source_embs.values is not None
), "Embeddings for individual values were not computed"
assert (
target_embs.values is not None
), "Embeddings for individual values were not computed"

l_source_embeddings = []
l_source_values = []
for value_text, value_vector in source_embs.values.items():
l_source_embeddings.append(value_vector)
l_source_values.append(value_text)

r_target_embeddings = []
r_target_value = []
for value_text, value_vector in target_embs.values.items():
r_target_embeddings.append(value_vector)
r_target_value.append(value_text)

cosine_sim = cosine_similarity(l_source_embeddings, r_target_embeddings) # type: ignore

matches = []
for index, similarities in enumerate(cosine_sim):
similarity = similarities[np.argmax(similarities)]
source_value = l_source_values[index]
target_value = r_target_value[np.argmax(similarities)]
if similarity >= threshold:
matches.append(
(
source_value,
target_value,
similarity,
)
)

return matches
20 changes: 20 additions & 0 deletions bdikit/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from abc import ABCMeta, abstractmethod
from typing import List
import pandas as pd
import numpy as np


class ColumnEmbedder(metaclass=ABCMeta):
"""
Base class for column embedding algorithms. Implementations of this class
must create embeddings for each of the columns of a table.
"""

@abstractmethod
def get_embeddings(self, table: pd.DataFrame) -> List[np.ndarray]:
"""
Must compute a vector embedding for each column in the table.
The vectors must be represented as dense np.ndarray vectors and
appear in the same order they appear in the DataFrame `table`.
"""
pass
4 changes: 3 additions & 1 deletion bdikit/models/contrastive_learning/cl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
from bdikit.download import get_cached_model_or_download
from bdikit.models import ColumnEmbedder


dir_path = os.path.dirname(os.path.realpath(__file__))
GDC_TABLE_PATH = os.path.join(dir_path, "../../../../resource/gdc_table.csv")
DEFAULT_CL_MODEL = "bdi-cl-v0.2"


class ContrastiveLearningAPI:
class ContrastiveLearningAPI(ColumnEmbedder):
def __init__(
self,
model_path: Optional[str] = None,
Expand Down
Loading

0 comments on commit 3c20e91

Please sign in to comment.