Skip to content

Commit

Permalink
Refactor bench_fw to support train, build & search in parallel
Browse files Browse the repository at this point in the history
Summary:
**Context**
Design Doc: [Faiss Benchmarking](https://docs.google.com/document/d/1c7zziITa4RD6jZsbG9_yOgyRjWdyueldSPH6QdZzL98/edit)

**In this diff**
1. Be able to reference codec and index from blobstore (bucket & path) outside the experiment
2. To support facebookresearch#1, naming is moved to descriptors.
3. Build index can be written as well.
4. You can run benchmark with train and then refer it in index built and then refer index built in knn search. Index serialization is optional. Although not yet exposed through index descriptor.
5. Benchmark can support index with different datasets sizes
6. Working with varying dataset now support multiple ground truth. There may be small fixes before we could use this.
7. Added targets for bench_fw_range, ivf, codecs and optimize.

Differential Revision: D57236543
  • Loading branch information
kuarora authored and facebook-github-bot committed Jun 18, 2024
1 parent e758973 commit 302e689
Show file tree
Hide file tree
Showing 9 changed files with 906 additions and 265 deletions.
763 changes: 598 additions & 165 deletions benchs/bench_fw/benchmark.py

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion benchs/bench_fw/benchmark_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def clone(self):
def __post_init__(self):
self.cached_ds = {}

# TODO(kuarora): rename it as get_local_file
def get_local_filename(self, filename):
if len(filename) > 184:
fn, ext = os.path.splitext(filename)
Expand All @@ -61,6 +62,9 @@ def get_local_filename(self, filename):
)
return os.path.join(self.path, filename)

def get_remote_filepath(self, filename) -> Optional[str]:
return None

def download_file_from_blobstore(
self,
filename: str,
Expand Down Expand Up @@ -219,7 +223,7 @@ def read_index(
fn = self.download_file_from_blobstore(filename, bucket, path)
logger.info(f"Loading index {fn}")
ext = os.path.splitext(fn)[1]
if ext in [".faiss", ".codec"]:
if ext in [".faiss", ".codec", ".index"]:
index = faiss.read_index(fn)
elif ext == ".pkl":
with open(fn, "rb") as model_file:
Expand Down
215 changes: 211 additions & 4 deletions benchs/bench_fw/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from dataclasses import dataclass
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import faiss # @manual=//faiss/python:pyfaiss_gpu

from .benchmark_io import BenchmarkIO
from .utils import timer

logger = logging.getLogger(__name__)


@dataclass
class IndexDescriptor:
class IndexDescriptorClassic:
bucket: Optional[str] = None
# either path or factory should be set,
# but not both at the same time.
Expand Down Expand Up @@ -45,7 +48,6 @@ class IndexDescriptor:
def __hash__(self):
return hash(str(self))


@dataclass
class DatasetDescriptor:
# namespace possible values:
Expand Down Expand Up @@ -81,7 +83,7 @@ def __hash__(self):

def get_filename(
self,
prefix: str = None,
prefix: Optional[str] = None,
) -> str:
filename = ""
if prefix is not None:
Expand Down Expand Up @@ -116,3 +118,208 @@ def k_means(self, io, k, dry_run):
else:
t = io.read_json(meta_filename)["k_means_time"]
return kmeans_vectors, t, None

@dataclass
class IndexBaseDescriptor:
d: int
metric: str
desc_name: Optional[str] = None
flat_desc_name: Optional[str] = None
bucket: Optional[str] = None
path: Optional[str] = None
num_threads: int = 1

def get_name(self) -> str:
raise NotImplementedError()

def get_path(self, benchmark_io: BenchmarkIO) -> Optional[str]:
if self.path is not None:
return self.path
self.path = benchmark_io.get_remote_filepath(self.desc_name)
return self.path

@staticmethod
def param_dict_list_to_name(param_dict_list):
if not param_dict_list:
return ""
l = 0
n = ""
for param_dict in param_dict_list:
n += IndexBaseDescriptor.param_dict_to_name(param_dict, f"cp{l}")
l += 1
return n

@staticmethod
def param_dict_to_name(param_dict, prefix="sp"):
if not param_dict:
return ""
n = prefix
for name, val in param_dict.items():
if name == "snap":
continue
if name == "lsq_gpu" and val == 0:
continue
if name == "use_beam_LUT" and val == 0:
continue
n += f"_{name}_{val}"
if n == prefix:
return ""
n += "."
return n


@dataclass
class CodecDescriptor(IndexBaseDescriptor):
# either path or factory should be set,
# but not both at the same time.
factory: Optional[str] = None
construction_params: Optional[List[Dict[str, int]]] = None
training_vectors: Optional[DatasetDescriptor] = None

def __post_init__(self):
self.get_name()

def is_trained(self):
return self.factory is None and self.path is not None

def is_valid(self):
return self.factory is not None or self.path is not None

def get_name(self) -> str:
if self.desc_name is not None:
return self.desc_name
if self.factory is not None:
self.desc_name = self.name_from_factory()
return self.desc_name
if self.path is not None:
self.desc_name = self.name_from_path()
return self.desc_name
raise ValueError("name, factory or path must be set")

def flat_name(self) -> str:
if self.flat_desc_name is not None:
return self.flat_desc_name
self.flat_desc_name = f"Flat.d_{self.d}.{self.metric.upper()}."
return self.flat_desc_name

def path(self, benchmark_io) -> str:
if self.path is not None:
return self.path
return benchmark_io.get_remote_filepath(self.get_name())

def name_from_factory(self) -> str:
assert self.factory is not None
name = f"{self.factory.replace(',', '_')}."
assert self.d is not None
assert self.metric is not None
name += f"d_{self.d}.{self.metric.upper()}."
if self.factory != "Flat":
assert self.training_vectors is not None
name += self.training_vectors.get_filename("xt")
name += IndexBaseDescriptor.param_dict_list_to_name(self.construction_params)
return name

def name_from_path(self):
assert self.path is not None
filename = os.path.basename(self.path)
ext = filename.split(".")[-1]
if filename.endswith(ext):
name = filename[:-len(ext)]
else: # should never hit this rather raise value error
name = filename
return name

def alias(self, benchmark_io : BenchmarkIO):
if hasattr(benchmark_io, "bucket"):
return CodecDescriptor(desc_name=self.get_name(), bucket=benchmark_io.bucket, path=self.get_path(benchmark_io), d=self.d, metric=self.metric)
return CodecDescriptor(desc_name=self.get_name(), d=self.d, metric=self.metric)


@dataclass
class IndexDescriptor(IndexBaseDescriptor):
codec_desc: Optional[CodecDescriptor] = None
database_desc: Optional[DatasetDescriptor] = None

def __hash__(self):
return hash(str(self))

def __post_init__(self):
self.get_name()

def is_built(self):
return self.codec_desc is None and self.database_desc is None

def get_name(self) -> str:
if self.desc_name is None:
self.desc_name = self.codec_desc.get_name() + self.database_desc.get_filename(prefix="xb")

return self.desc_name

def flat_name(self):
if self.flat_desc_name is not None:
return self.flat_desc_name
self.flat_desc_name = self.codec_desc.flat_name() + self.database_desc.get_filename(prefix="xb")
return self.flat_desc_name

# alias is used to refer when index is uploaded to blobstore and refered again
def alias(self, benchmark_io: BenchmarkIO):
if hasattr(benchmark_io, "bucket"):
return IndexDescriptor(desc_name=self.get_name(), bucket=benchmark_io.bucket, path=self.get_path(benchmark_io), d=self.d, metric=self.metric)
return IndexDescriptor(desc_name=self.get_name(), d=self.d, metric=self.metric)

@dataclass
class KnnDescriptor(IndexBaseDescriptor):
index_desc: Optional[IndexDescriptor] = None
gt_index_desc: Optional[IndexDescriptor] = None
query_dataset: Optional[DatasetDescriptor] = None
search_params: Optional[Dict[str, int]] = None
reconstruct: bool = False
# range metric definitions
# key: name
# value: one of the following:
#
# radius
# [0..radius) -> 1
# [radius..inf) -> 0
#
# [[radius1, score1], ...]
# [0..radius1) -> score1
# [radius1..radius2) -> score2
#
# [[radius1_from, radius1_to, score1], ...]
# [radius1_from, radius1_to) -> score1,
# [radius2_from, radius2_to) -> score2
range_metrics: Optional[Dict[str, Any]] = None
radius: Optional[float] = None
k: int = 1

range_ref_index_desc: Optional[str] = None

def __hash__(self):
return hash(str(self))

def get_name(self):
name = self.index_desc.get_name()
name += IndexBaseDescriptor.param_dict_to_name(self.search_params)
name += self.query_dataset.get_filename("q")
name += f"k_{self.k}."
name += f"t_{self.num_threads}."
if self.reconstruct:
name += "rec."
else:
name += "knn."
return name

def flat_name(self):
if self.flat_desc_name is not None:
return self.flat_desc_name
name = self.index_desc.flat_name()
name += self.query_dataset.get_filename("q")
name += f"k_{self.k}."
name += f"t_{self.num_threads}."
if self.reconstruct:
name += "rec."
else:
name += "knn."
self.flat_desc_name = name
return name
Loading

0 comments on commit 302e689

Please sign in to comment.