Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Introducing Loading/Writing Layer in Native KNN Engines #2033

Closed
0ctopus13prime opened this issue Sep 4, 2024 · 22 comments · Fixed by #2139
Closed

[RFC] Introducing Loading/Writing Layer in Native KNN Engines #2033

0ctopus13prime opened this issue Sep 4, 2024 · 22 comments · Fixed by #2139
Assignees
Labels
enhancement indexing-improvements This label should be attached to all the github issues which will help improving the indexing time. Roadmap:Vector Database/GenAI Project-wide roadmap label v2.19.0

Comments

@0ctopus13prime
Copy link
Contributor

0ctopus13prime commented Sep 4, 2024

Introducing Loading Layer in Native KNN Engines

1. Goal

FAISS and Nmslib, two native engines, have been integral to delivering advanced vector search capabilities in OpenSearch. Alongside the official Lucene vector format, these engines have played a significant role in meeting the growing vector search needs of customers, especially in scenarios where Lucene alone might not suffice.
However, the tight coupling in the way vector indexes are loaded during searches has made it challenging for OpenSearch to scale as a vector search solution across various Directory implementations. As of this writing, OpenSearch only supports FSDirectory, limiting its compatibility with other network-based Directory implementations, such as those backed by S3 or NFS.

This document provides an overview of a solution designed to cut this dependency, making OpenSearch compatible with multiple Directory implementations. In the following sections, it will guide the audience through the importance of introducing an abstract loading layer within each native engine. This layer will enable transparent loading of vector indexes, regardless of the specific Directory implementation used.

Related Issues

2. Scope

In this document, we focus exclusively on two types of native engines: FAISS and Nmslib. Lucene vector search is not covered here, as it is already integrated with Directory implementations.

Among the native engines, we will delve deeper into FAISS, while providing only high-level conceptual sketches for Nmslib. The primary reason for this is that, unlike FAISS, Nmslib lacks a loading interface (e.g., FAISS’s IOReader). However, the approach in Nmslib will closely mirror the work in FAISS, where we first introduce a loading interface, then build a mediator that indirectly calls IndexInput to copy bytes upon it.

As we are still in the proposal phase, detailed performance impacts will be addressed in the next phase, after benchmarks have been conducted and real data analyzed. In this initial phase, our focus is solely on creating a scalable interface that allows OpenSearch to integrate with multiple Directory implementations, while keeping the native engines unchanged. We will not be modifying any of the assumptions made by the native engines at this stage. Although further optimizations could be achieved by adjusting these assumptions, that will be the subject of future discussions and is beyond the scope of this document.

For example, it is out of scope for now and we leave it as the next opportunity room for improvement, FAISS loads all data into physical memory before performing a search, a behavior that is also true for Nmslib. Now, imagine a scenario where a user configures an S3-backed directory in OpenSearch. Due to this way FAISS operates, the S3-backed directory would need to download the requested vector index from S3, which could significantly worsen the p99 query time, as KNNWeight lazily loads the index (Code). As a result, query execution would be delayed until the entire vector index has been fully downloaded from S3 before the search can begin.

3. Problem Definitions

3.1. Problem We Are Solving

To enable compatibility with various Directory implementations, we need to decouple from FSDirectory and make it extensible in OpenSearch.

Current implementation in OpenSearch is assuming a vector index exists in normal file system (ex: ext4), passing the absolute path of the vector index to underlying native engines.
For example, in FAISS, it would end up invoking Index* read_index(const char* fname, int io_flags = 0) method, and in Nmslib, void LoadIndex(const string& location) method will be called eventually. In which, both native engines will try to read bytes and load the entire index into physical memory. Although there seems to be better strategies of loading index — lazy loading or mmap loading etc — but as we aligned in 2. Scope, we will not attempt to alter the philosophy of theirs.

FSDirectory is the only Directory supported in OpenSearch. Due to this tight coupling, many network-based implementations (such as S3-backed or NFS-based Directories) and potential future Directory implementations cannot be integrated with native vector search engines in OpenSearch.

3.2. [Optional] Lucene Directory Design

Let’s briefly review Lucene’s Directory design before we continue with the discussion. If you're already familiar with its design and functionalities, feel free to skip ahead to the next section — 4. Requirements

This overview is included here rather than in the appendix, as it provides essential background before delving into the proposal.

3.2.1. Directory

Directory represents a logical file system with files organized in a tree structure, allowing users to perform CRUD operations (Create, Read, Update, Delete) through its APIs. The underlying Directory implementation must not only support creation and deletion but also provide an IndexInput stream, enabling the caller to read content from a specific offset.

For now, think of IndexInput as a random access interface, which we will revisit in section 3.2.3. IndexInput.

/**
 * Open a random access read stream for a content of a file 
 * located in the given `path`.
 */
public abstract IndexInput openInput(String path, IOContext ioContext);

/**
 * Creates a new, empty file in the directory and returns an IndexOutput instance 
 * for appending data to this file. 
 * This method must throw FileAlreadyExistsException if the file already exists.
 */
public abstract IndexOutput createOutput(String path, IOContext ioContext);

Renown Directory Implementations

  1. NIOFSDirectory
  2. MMapDirectory

3.2.2. DataInput

DataInput provides sequential read APIs and serves as the base class for IndexInput, which will be covered in the next section. Each DataInput implementation is expected to internally track the last read offset. However, DataInput itself does not offer an API for updating this offset. To modify the offset, users must inherit from DataInput and define their own IndexInput class.

/**
 * Read one single byte since the last offset that was read previously.
 * After read, it is expected to increase internal offset by 1.
 */
public abstract byte readByte();

/**
 * Copies `length` bytes since the last offset into dest[offset:].
 * After read, it is expected to increase internal offset by `length`.
 */
public abstract void readBytes(byte[] dest, int offset, int length);

3.2.3. IndexInput

IndexInput inherits from DataInput and includes an API for resetting the offset, in addition to all the features provided by DataInput. The caller can use this API to update the internal offset. Once updated, all read operations in DataInput will start from the new reset offset.

/**
 * Update internal offset with the given `newOffset`. 
 * Every subsequent read attempts must start read bytes from the new offset.
 */
public abstract void seek(long newOffset) throws IOException;

Directory provides an IndexInput as the read stream. While loading a vector index into physical memory typically involves sequential reading and does not necessitate random access, we will use IndexInput for sequential reading, as it is the type returned by the Directory.

3.2.4. DataOutput

An abstract base class for performing write operations on bytes. This serves as the foundation for the IndexOutput class, which will be discussed in detail in the next section. Each DataOutput implementation must internally track the next byte offset for writing.

/**
 * Write the length of bytes located in `offset` in the given byte array.
 */
void writeBytes(byte[] b, int offset, int length);

3.2.5. IndexOutput

IndexOutput inherits from the DataOutput class with extra getter methods that return the internal offset where the next byte will be written. It provides two APIs: 1. A basic getter method. 2. An aligned offset adjustment method.

Note that the aligned offset method appends dummy bytes to ensure the offset is a multiple of the given alignment.
For example, if the current offset is 121 and the required alignment is 8, the method will append 7 dummy bytes to adjust the offset to 128, which is a multiple of 8.

/**
 * Returns offset which next bytes will be written.
 */
long getFilePointer();

/**
 * Adjust internal offset to be multiple of alignment then return the aligned offset.
 */
long alignFilePointer(int alignmentBytes);

4. Requirements

4.1. Functional Requirements

  • P0
    • Define a mediator component to facilitate interaction between the native engines and Lucene’s IndexInput, IndexOutput. Specifically, the mediator must:
      • The reader mediator component must internally maintain a reference to the IndexInput returned by the Directory.
      • The writer mediator component must internally maintain a reference to the IndexOutput returned by the Directory.
      • Once triggered by a native engine, the mediator must appropriately delegate the request to the IndexInput, IndexOutput to retrieve the bytes for reading.
      • The reader mediator component must be responsible for properly copying data from Java bytes to C++ bytes so that native engines can process it.
      • The writer mediator component must be responsible for properly delegating internal IndexOutput to flush bytes copied from native engine library.
    • Define a glue component for each native engine that must:
      • Reading perspective:
        • The glue component must implement the loading interface (ex: IOReader) for each native engine.
        • The glue component must delegate to the mediator component to ensure that the desired bytes are copied to the specified destination.
      • Writing perspective:
        • The glue component must implement the writing interface (ex: IOWriter) for each native engine.
        • The glue component must delegate to the mediator component ensure Lucene’s output stream to flush copied bytes from native engine library.
  • P1
    • [For Nmslib only] Define a vector index loading and writing interface and use it to load bytes into physical memory. This is not required for FAISS, as its IOReader, IOWriter already serves the purposes.

4.2. Non-functional Requirements.

  1. This introduction must maintain backward compatibility. Given the same settings, it should produce the exact same outputs as the baseline configuration.
  2. This introduction should not affect the performance currently achieved with FSDirectory.
  3. Any exceptions must be explicitly caught and re-thrown to the Java layer. They should not be silently swallowed.
  4. If additional memory is required, the total increase in memory usage must be limited by a constant factor.

5. Solution Proposal

5.1. High Level Overview

5.1.1. Loading Vector Index (FAISS only)

[Image: Image.jpg]

5.1.2. Constructing Vector Index (FAISS only)

[Image: Image.jpg]

5.2. Low Level Details - FAISS

5.2.1. [Reading] Vector Index Loading Low Level Details

5.2.1.1. Define C++ Mediator.

The mediator component is responsible for invoking the IndexInput instance to obtain bytes and then copying them into the specified memory location in C++ (e.g., performing a Java-to-C++ byte copy).

//
// In Java
//

class BufferedIndexInput {
  public BufferedIndexInput(@NotNull IndexInput indexInput) {
    this.indexInput = indexInput;
  }

  private int copyBytes(long nbytes) {
    final long readBytes = Math.min(nbytes, buffer.length);
    readBytes(buffer, 0, readBytes);
    return (int) readBytes;
  }

  private IndexInput indexInput;
  // 4K buffer.
  private byte[] buffer = new byte[4 * 1024];
};  // BufferedIndexInput


//
// In C++
//

class NativeEngineIndexInputMediator {
 public:
  // Expect BufferedIndexInput is given as `_indexInput`.
  explicit NativeEngineIndexInputMediator(jclass _indexInput)
    : indexInput(_indexInput) {
  }
 
  void copyBytes(int32_t nbytes, uint8_t* destination) {
    //
    // NOTE : In the 'real' implementation, I will have meta variables like `jclass` as static. 
    //        So there will be no overhead of looking up meta information during runtime.
    //
  
    // `BufferedIndexInput` jclass.
    jclass clazz = (*env)->GetObjectClass(env, indexInput);

    // `copyBytes` method signature.
    jmethodID copyBytesMethod = (*env)->GetMethodID(env, clazz, "copyBytes", "(J)I");
    
    // Buffer fields.
    jfieldID bufferFieldId = env->GetFieldID(clazz, "buffer", "[B");
    jbyteArray bufferArray = 
      (jbyteArray) env->GetObjectField(indexInput, bufferFieldId);
    
    while (nbytes > 0) {
      // Call `copyBytes` to read bytes as many as possible.
      const auto readBytes = 
        (*env)->CallIntMethod(env, indexInput, copyBytesMethod, nbytes);
    
      // === Critical Section Start ===
    
      // Get primitive array pointer, no copy is happening in OpenJDK.
      jbyte* primitiveArray =
        (*env)->GetPrimitiveArrayCritical(env, bufferArray, NULL);
      
      // Copy Java bytes to C++ destination address.
      std::memcpy(destination, primitiveArray, readBytes);
    
      // Release the acquired primitive array pointer.
      // JNI_ABORT tells JVM to directly free memory without copying back to Java byte[].
      // Since we're merely copying data, we don't need to copying back.
      (*env)->ReleasePrimitiveArrayCritical(env, bufferArray, primitiveArray, JNI_ABORT);
    
      // === Critical Section End ===
      
      destination += readBytes;
      nbytes -= readBytes;
    }
  }
  
  // `BufferedIndexInput` instance obtained from `Directory` for reading.
  jclass indexInput;
};

5.2.1.2. NativeMemoryEntryContext.IndexEntryContext

IndexEntryContext contains essential information for loading the vector index. The current implementation only includes the logical index path, which it uses to construct a physical absolute path in the file system for access.
The constructor will be updated to include an additional parameter, Directory, which will serve as the source of IndexInput.


public IndexEntryContext(
  Directory directory,
   String indexPath,
   NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy,
   Map<String, Object> parameters,
   String openSearchIndexName,
   String modelId
) {
  super(indexPath);
  this.directory = directory;
   this.indexLoadStrategy = indexLoadStrategy;
   this.openSearchIndexName = openSearchIndexName;
   this.parameters = parameters;
   this.modelId = modelId;
}

5.2.1.3. NativeMemoryLoadStrategy.IndexLoadStrategy

IndexLoadStrategy now falls back to the baseline approach if the given Directory is file-based. Otherwise, it allows the native engine to fetch bytes from IndexInput. This change helps prevent potential performance degradation due to JNI call overheads and redundant byte copying.


public NativeMemoryAllocation.IndexAllocation load(
         NativeMemoryEntryContext.IndexEntryContext indexEntryContext) {
  Directory directory = indexEntryContext.getDirectory();
  if (directory instanceof FSDirectory) {
    // Use the optimized way to load a file based vector index instead.
    // This will fallback to the current implementation where constructing an absolute
    // file path and let native engine load it.
    // This way, we can avoid redundant bytes copies from Java to C++.
    loadFileBasedVectorIndex(indexEntryContext);
    return;
  }
  
  // Get the vector index path (note that it's logical path)
  final String indexPath = indexEntryContext.getKey();
  
  // Open a read stream from directory.
  try(IndexInput readStream = directory.openInput(indexPath, IOContext.READONCE)) {
    long indexAddress =
        JNIService.loadIndex(readStream,
                             indexEntryContext.getParameters(),
                             knnEngine);
    ... // Create NativeMemoryAllocation.IndexAllocation and return
  }
}

5.2.1.4 File Watcher Integration

Whenever NativeMemoryLoadStrategy delegates a task of loading an index to native engines, it attaches a monitor object to remove the corresponding entry from the cached map managed by NativeMemoryCacheManager when a vector file is removed from the Directory. (Code) This behavior will remain unchanged even after extended the current implementation to pass IndexInput to native engines. The cached pair in the map will continue to be properly removed and cleaned up as before.

5.2.1.5. JNIService

JNIService serves as the entry point for interacting with the underlying native engines. Similar to how the current implementation passes the Java string value of the index path, it will now pass the reference to the provided IndexInput.

public static long loadIndex(
         IndexInput indexInput,
         Map<String, Object> parameters,
         KNNEngine knnEngine) {
  BufferedIndexInput bufferedIndexInput =
    new BufferedIndexInput(indexInput)
         
  if (KNNEngine.NMSLIB == knnEngine) {
    return NmslibService.loadIndex(
                bufferedIndexInput, parameters);
  }

   if (KNNEngine.FAISS == knnEngine) {
    if (IndexUtil.isBinaryIndex(knnEngine, parameters)) {
      return FaissService.loadBinaryIndex(bufferedIndexInput);
    } else {
       return FaissService.loadIndex(bufferedIndexInput);
    }
  }

  throw new IllegalArgumentException(String.format("LoadIndex not supported for provided engine : %s", knnEngine.getName()));
}

5.2.1.6. FaissService, Glue Component

The glue component is responsible for creating an adapter for [IOReader](https://github.com/facebookresearch/faiss/blob/924c24db23b00053fc1c49e67d8787f0a3460ceb/faiss/impl/io.h#L27) and passing it to the FAISS API.
It first creates a NativeEngineIndexInputMediator on the local stack, then wraps it with a FaissMediatorWrapper.

//
// In FAISS.
//

namespace knn_jni::faiss_wrapper {

                        
jlong knn_jni::faiss_wrapper::LoadIndex(
        knn_jni::JNIUtilInterface* jniUtil,
        JNIEnv* env,
        IOReader* ioReader) {
    if (ioReader == nullptr)  [[unlikely]] {
      throw std::runtime_error("IOReader cannot be null");
    }
    
    faiss::Index* indexReader =
      faiss::read_index(ioReader,
                        faiss::IO_FLAG_READ_ONLY
                        | faiss::IO_FLAG_PQ_SKIP_SDC_TABLE
                        | faiss::IO_FLAG_SKIP_PRECOMPUTE_TABLE);

    return (jlong) indexReader;
  }

  jlong knn_jni::faiss_wrapper::LoadBinaryIndex(
          knn_jni::JNIUtilInterface* jniUtil,
          JNIEnv* env,
          IOReader* ioReader) {
    if (ioReader == nullptr) [[unlikely]] {
        throw std::runtime_error("IOReader cannot be null");
    }

    faiss::IndexBinary* indexReader =
      faiss::read_index_binary(ioReader,
                               faiss::IO_FLAG_READ_ONLY
                               | faiss::IO_FLAG_PQ_SKIP_SDC_TABLE
                               | faiss::IO_FLAG_SKIP_PRECOMPUTE_TABLE);

    return (jlong) indexReader;
  }


}  // namespace knn_jni::faiss_wrapper



//
// FAISS Glue Component
//

class FaissMideatorWrapper final : public IOReader {
 public:
  FaissMedeatorWrapper(NativeEngineIndexInputMediator* _mediator)
    : IOReader{.name = "FaissMideatorWrapper"},
      mediator(_mediator) {
  }
  
  size_t operator()(void* ptr, size_t size, size_t nitems) final {
    const auto readBytes = size * nitems;
    if (readBytes > 0) {
      // Mediator calls IndexInput, then copy read bytes to `ptr`.
      mediator->copyBytes(ptr, readBytes);
    }
    return readBytes;
  }
   
  int filedescriptor() final {
    throw not supported exception;
  }
    
 private:
  NativeEngineIndexInputMediator* mediator;
};  // class FaissMideatorWrapper



JNIEXPORT jlong JNICALL Java_org_opensearch_knn_jni_FaissService_loadIndex(
    JNIEnv* env, jclass cls, jclass indexInput) {
  try {
    // Create a mediator locally. 
    // Note that `indexInput` is `BufferedIndexInput` type.
    NativeEngineIndexInputMediator mediator {indexInput};
      
    // Wrap the mediator with a glue code inheriting IOReader.
    FaissMideatorWrapper faissMideatorWrapper {&mediator};
    
    // Pass IOReader to Faiss for loading vector index.
    return knn_jni::faiss_wrapper::LoadIndex(
             &jniUtil,
             env,
             &faissMideatorWrapper);
  } catch (...) {
      jniUtil.CatchCppExceptionAndThrowJava(env);
  }
  
  return NULL;
}

5.2.2. [Writing] Constructing Vector Index Low Level Details

By the time a vector index is requested to be written to underlying storage, the vector graph structure should already be properly trained and reside in memory. Below low-level details are intended to abstract away the IO processing logic, making the persistence of data in a file system seamless and transparent.
As a result, the vector transfer processing logic will remain unchanged, even after the proposed introduction of an intermediate layer in native engines.

5.2.2.1. Define IndexOutput wrapper

public class IndexOutputWithBuffer {
    public IndexOutputWithBuffer(IndexOutput indexOutput) {
        this.indexOutput = indexOutput;
    }

    // This method will be called in JNI layer whici precisely knows 
    // the amount of bytes need to be written.
    public void writeBytes(int length) {
        try {
            // Delegate Lucene `indexOuptut` to write bytes.
            indexOutput.writeBytes(buffer, 0, length);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    // Underlying `IndexOutput` obtained from Lucene's Directory.
    private IndexOutput indexOutput;
    // Write buffer. Native engine will copy bytes into this buffer.
    private byte[] buffer = new byte[4 * 1024];
}

5.2.2.2. Define C++ mediator

To minimize frequent context switching between C++ and Java, the writer mediator first attempts to retain bytes in its buffer. Once the buffer is full, it copies the bytes (e.g. uint8_t[]) to a Java buffer (e.g. byte[]) and then triggers IndexOutputWithBuffer to flush the bytes via the underlying IndexOutput.

As a result, it needs double the memory copies compared to the baseline. (Copying bytes in C++ first, and then a second copy to Java byte[]).
But I don’t believe this will significantly impact performance, as glibc’s memcpy typically achieves throughput between 10-30GB/sec. At the worst case, this would likely add only a few seconds to the process of building a vector index.
Most of the performance degradation will be coming from Lucene’s IndexOutput implementation.

For the performance, please refer to 7.1. Analylsis

[Image: Image.jpg]

// Mediator component indirectly invoke Java's instance.
// C++ class

class NativeEngineIndexOutputMediator {
 public:
  NativeEngineIndexOutputMediator(JNIEnv * _env, 
                                  jobject _indexOutput, 
                                  int32_t _bufferSize = 4 * 1024)
      ...
  }

  ~NativeEngineIndexOutputMediator() {
      if (offset > 0) {
          // Ensure flushing remaining bytes.
          flush();
      }
  }

  // This will copy bytes into its internal buffer and flush bytes once it is full.
  // By flushing bytes, it will eventually call Lucene IndexOutput to process the passed
  // bytes.
  void writeBytes(uint8_t* data, size_t nbytes) {
      auto left = (int32_t) nbytes;
      while (left > 0) {
          const auto copyBytes = std::min(left, bufferSize - offset);
          // Copy bytes to C++ buffer.
          std::memcpy(buffer.get() + offset, data, copyBytes);
          offset += copyBytes;
          data += copyBytes;
          left -= copyBytes;

          // Buffer is full! Let's flush it.
          if (offset >= bufferSize) {
              flush();
          }
      }
  }

 private:
  void flush() {
      // Note: In the production code, I will make below JNI meta classes look ups
      // to be static for caching purposes to save unnecessary calls.
  
      // Acquire `IndexOutputWithBuffer` jclass.
      jclass clazz = env->GetObjectClass(indexOutput);

      // Acquire `writeBytes` method.
      jmethodID writeBytesMethod = env->GetMethodID(clazz, "writeBytes", "(I)V");

      // Acquire the buffer field in `IndexOutputWithBuffer`.
      jfieldID bufferFieldId = env->GetFieldID(clazz, "buffer", "[B");
      jbyteArray bufferArray =
        (jbyteArray) (env->GetObjectField(indexOutput, bufferFieldId));
      const auto length = (int32_t) env->GetArrayLength(bufferArray);
      
      // Start copy C++ bytes into Java byte[].
      auto left = offset;
      int32_t copiedIndex = 0;
      while (left > 0) {
          const auto copyBytes = std::min(left, length);

          // === Critical Section Start ===
          jbyte* primitiveArray =
            (jbyte*) env->GetPrimitiveArrayCritical(bufferArray, NULL);

          // Copy C++ bytes to Java buffer.
          std::memcpy(primitiveArray, buffer.get() + copiedIndex, copyBytes);

          // Release the acquired primitive array pointer.
          // JNI_ABORT tells JVM to directly free memory without copying back to Java byte[].
          // Since we're merely copying data, we don't need to copying back.
          env->ReleasePrimitiveArrayCritical(bufferArray, primitiveArray, JNI_ABORT);
          // === Critical Section End ===

          // Indirectly calling the internal out stream to process copied bytes.
          // Expecting it to everntually trigger Lucene's IndexOutput to flush bytes.
          // Java method : public void writeBytes(int length)
          env->CallVoidMethod(indexOutput, writeBytesMethod, copyBytes);

          left -= copyBytes;
          copiedIndex += copyBytes;
      }

      offset = 0;
  }

  JNIEnv * env;
  jobject indexOutput;
  int32_t bufferSize;
  // Write buffer
  std::unique_ptr<uint8_t[]> buffer;
  int32_t offset;
};  // NativeEngineIndexOutputMediator

5.2.2.3. Expanding IndexService

void IndexService::writeIndexWithStream(JNIEnv *env, jobject outputStream, jlong idMapAddress) {
    std::unique_ptr<faiss::IndexIDMap> idMap (
        reinterpret_cast<faiss::IndexIDMap *> (idMapAddress));

    // Creating a mediator.
    stream::NativeEngineIndexOutputMediator mediator {env, outputStream};
    // A glue component implementing IOWriter mediating Faiss <-> OpenSearch.
    stream::FaissOpenSearchIOWriter writer {&mediator};

    try {
        // Persist the in-memory graph to a file system.
        faissMethods->writeIndexWithWriter(idMap.get(), &writer);
    } catch(const std::exception &e) {
        throw std::runtime_error("Failed to write index to disk");
    }
}

5.3. Miscellaneous

Since we will be fully relying on Lucene’s Directory, few DTO classes now need to include Directory instance as a member field.

Also, a few classes should be modified to use the passed Directory instance process for building and loading vector indices instead of using the casted FSDirectory.

5.4. Pros and Cons

5.4.1. Pros

5.4.2. Cons

  • Could not think of any cons really. The implementation is straightforward and unlikely to violate any of our concerns.
  • As outlined in 2. Scope, our approach involves adding an additional layer to the engines while preserving their existing design. We are not altering their fundamental structure.
    • Consequently, any potential increase in p99 query time caused by downloading from network-based Directory implementations is beyond the scope of this discussion and should be addressed in a separate dedicated review.

5.5. [P1] Nmslib - Introducing Similar Concept of IOReader in FAISS.

We need to implement some changes in Nmslib to make layers available. Since only two index types are being used currently —Hnsw and Hnsw— the definition of Hnsw is the only place requiring modification.

Currently, Hnsw has methods that accept std::istream and std::ostream for reading and writing bytes. However, these methods are private, which prevents JNI from passing a stream object and subsequently let it utilize Lucene’s IndexInput and IndexOutput for IO operations.


private: <-------- its scope is private

// 1. Optimized index.
void SaveOptimizedIndex(std::ostream& output);
void LoadOptimizedIndex(std::istream& input);

// 2. Regular index binary.
void SaveRegularIndexBin(std::ostream& output);
void LoadRegularIndexBin(std::istream& input);
        
// 3. Re.gular index text.
void SaveRegularIndexText(std::ostream& output);
void LoadRegularIndexText(std::istream& input);

5.5.1. Required Patches [DONE]

5.5.1.1. Loading With Stream

template <typename dist_t>
void Hnsw<dist_t>::LoadIndex(const string &location) {
    LOG(LIB_INFO) << "Loading index from " << location;
    std::ifstream input(location, 
                        std::ios::binary); /* text files can be opened in binary mode as well */
    CHECK_MSG(input, "Cannot open file '" + location + "' for reading");
    input.exceptions(ios::badbit | ios::failbit);
    LoadIndexWithStream(input);
    input.close();
}

template <typename dist_t>
void Hnsw<dist_t>::LoadIndexWithStream(std::istream& input) {
#if USE_TEXT_REGULAR_INDEX
    LoadRegularIndexText(input);
#else
    unsigned int optimIndexFlag = 0;
    readBinaryPOD(input, optimIndexFlag);

    if (!optimIndexFlag) {
        LoadRegularIndexBin(input);
    } else {
        LoadOptimizedIndex(input);
    }
#endif

    LOG(LIB_INFO) << "Finished loading index";
    visitedlistpool = new VisitedListPool(1, totalElementsStored_);
}

5.5.1.2. Writing With Stream

template <typename dist_t>
void Hnsw<dist_t>::SaveIndex(const string &location) {
    std::ofstream output(location,
                            std::ios::binary /* text files can be opened in binary mode as well */);
    CHECK_MSG(output, "Cannot open file '" + location + "' for writing");
    output.exceptions(ios::badbit | ios::failbit);
    SaveIndexWithStream(output);
    output.close();
}

template <typename dist_t>
void Hnsw<dist_t>::SaveIndexWithStream(std::ostream& output) {
    unsigned int optimIndexFlag = data_level0_memory_ != nullptr;

    writeBinaryPOD(output, optimIndexFlag);

    if (!optimIndexFlag) {
#if USE_TEXT_REGULAR_INDEX
        SaveRegularIndexText(output);
#else
        SaveRegularIndexBin(output);
#endif
    } else {
        SaveOptimizedIndex(output);
    }
}

5.5.2. Read Stream Buffer

class NmslibMediatorStreamBuffer final : public std::streambuf {
 public:
  explicit NmslibMediatorStreamBuffer(NativeEngineIndexInputMediator *_mediator)
      : std::streambuf(),
        mediator(_mediator) {
  }

 protected:
  std::streamsize xsgetn(std::streambuf::char_type *destination, 
                         std::streamsize count) final {
    if (count > 0) {
      mediator->copyBytes((int32_t) count, (uint8_t *) destination);
    }
    return count;
  }

 private:
  NativeEngineIndexInputMediator *mediator;
};  // NmslibMediatorStreamBuffer

5.5.3. Write Stream Buffer

class NmslibMediatorOutputStreamBuffer final : public std::streambuf {
 public:
  explicit NmslibMediatorOutputStreamBuffer(NativeEngineIndexOutputMediator *_mediator)
      : std::streambuf(),
        mediator(_mediator) {
  }

 protected:
  std::streamsize xsputn(const std::streambuf::char_type* source, 
                         std::streamsize count) final {
    if (count > 0) {
      mediator->writeBytes((uint8_t*) source, (int32_t) count);
    }
    return count;
  }

 private:
  NativeEngineIndexOutputMediator *mediator;
};  // NmslibMediatorInputStreamBuffer

6. Backward Compatibility And Miscellaneous Performance Issues

It should fall back to the existing implementation when the given Directory is file-based, ensuring no backward compatibility issues.
Apart from the inherent overhead of IndexInput (note that we cannot prevent users from importing inefficient IndexInput implementations in OpenSearch!), what are the costs of the proposed solution?

From a performance perspective, the primary impact will come from context switching between Java and C++ due to JNI calls. However, since each JNI call transition typically takes only nanoseconds, the overall performance degradation is expected to be minimal. Thus, the performance impact from JNI calls is likely to be negligible.

From a memory consumption perspective, additional memory allocation will be limited to a constant factor, with a maximum of approximately 4KB for the copy buffer. Additionally, because GetPrimitiveArrayCritical provides a pointer to the primitive array without performing a data copy in OpenJDK, we don’t need extra memory allocations other than 4K buffer.

6.1. File Watching Mechanism Issue

With this change, we need to modify the file-watching mechanism, as we are no longer relying on OS-level files.
The purpose of the file watcher is to allow us to evict outdated vector indexes as soon as the corresponding file is deleted. This ensures we maintain the necessary memory space efficiently.

I realized that including this section would make the document overly lengthy, so I will move this topic to a separate sub-document where all alternatives will be explored in detail.

7. Performance Benchmark

7.1. Benchmark Environment

7.1.1. Traffic Loader

  • OS : Amazon Linux 2023
  • EC2 Instance : t2.xlarge
    • 4 vcpu
    • 16 GB RAM
  • JDK : jdk-17.0.12
  • Data set :
    • XXX
    • 512MB

7.1.2. OpenSearch Engine

  • OS : Amazon Linux 2023
  • EC2 Instance : c5.4xlarge
    • 16 vcpu
    • 32 GB RAM
  • EBS
    • 100GB
  • OS Configuration
    • vm.max_map_count=262144
  • JDK : jdk-17.0.12
  • JVM Options
    • -Xms24g
    • -Xms24g
  • Candidate Copy Buffer : 4K

7.1. Analylsis

For the details, please refer to Appendix 1. Performance Details.

The best benchmark result for each case was selected after three trials. Overall search performance remained consistent, with the baseline occasionally outperforming the candidate and vice versa. This variation is expected, as the loading layer merely involves reading bytes into memory. Since the query process operates on in-memory data, introducing this layer should not significantly impact search performance.

We can see at the worst -20% performance degradation in bulk ingestion, resulted in 2 seconds added latency. And this is some what expected in As a result, it needs double the memory copies compared to the baseline. (Copying bytes in C++ first, and then…

Metric Task Baseline-Value Candidate-Value Diff Ratio (B - C) / B Diff (B - C) Unit
100th percentile service time custom-vector-bulk 11928.1 9483.93 20.49086 2444.17 ms

  1. Milestones

  2. Prepare the next round of design meeting for write part.

    1. Date : 2024-09-10
  3. POC (Reading part only)

    1. Demo date : 2024-09-13
    2. Directory : S3 backed directory. (Dummy Directory implementations)
      1. Comment out the fallback, and use FSDirectory.
      2. Or we can use wrapper to it.
    3. Goal : Verify the concept (reading + writing) is working.
    4. Blockers:
      1. Do we have a dev account where I can create my own testing bucket to test?
  4. Product-ionize (Reading part only)

    1. Target date : 2024-09-25
    2. Backward compatibility verification.
      1. Functionality.
      2. Performance.
    3. Make sure it is working with other Directory implementations. (Except for FSDirectory)
    4. Tests:
      1. Unit tests.
      2. Integ testing.
      3. Benchmark testing
        1. Whether to keep file based approach.
        2. Perf testing with variable buffer size.
    5. This will include both native engines.
  5. Propose an introduction of loading layer in Nmslib.

    1. Target date : 2024-09-30

9. Demo : S3 Vector Index Snapshot

[Image: Image.jpg]

9.1. Demo Steps

For the details commands, please refer to Appendix 2. Demo Scripts.

  1. Make sure S3 bucket is empty.
  2. Create an index.
  3. Bulk ingest vector data.
  4. Run a query and make sure we are getting a valid result.
  5. Take a snapshot and make sure we have a vector index in S3.
  6. Delete the index from OpenSearch.
    1. Confirm that we deleted the index in OpenSearch.
  7. Restore the index.
    1. Confirm that now we have a vector index restored.
  8. Run a query against the restored vector index.
    1. Make sure we are getting a valid result.

Appendix 1. Performance Details

Metric Task Baseline-Value Candidate-Value Diff (B - C) Diff ((B - C) / B) Unit
Cumulative indexing time of primary shards 30.5506 31.8307 -1.2801 -0.0419 min
Min cumulative indexing time across primary shards 0.00012 0.00025 -0.00013 -1.14285 min
Median cumulative indexing time across primary shards 15.2753 15.9153 -0.64 -0.0419 min
Max cumulative indexing time across primary shards 30.5505 31.8304 -1.2799 -0.04189 min
Cumulative indexing throttle time of primary shards 0 0 0 #DIV/0! min
Min cumulative indexing throttle time across primary shards 0 0 0 #DIV/0! min
Median cumulative indexing throttle time across primary shards 0 0 0 #DIV/0! min
Max cumulative indexing throttle time across primary shards 0 0 0 #DIV/0! min
Cumulative merge time of primary shards 182.972 200.509 -17.537 -0.09585 min
Cumulative merge count of primary shards 113 107 6 0.0531
Min cumulative merge time across primary shards 0 0 0 #DIV/0! min
Median cumulative merge time across primary shards 91.4861 100.254 -8.7679 -0.09584 min
Max cumulative merge time across primary shards 182.972 200.509 -17.537 -0.09585 min
Cumulative merge throttle time of primary shards 1.33108 1.67535 -0.34427 -0.25864 min
Min cumulative merge throttle time across primary shards 0 0 0 #DIV/0! min
Median cumulative merge throttle time across primary shards 0.66554 0.83768 -0.17213 -0.25864 min
Max cumulative merge throttle time across primary shards 1.33108 1.67535 -0.34427 -0.25864 min
Cumulative refresh time of primary shards 1.25782 1.2378 0.02002 0.01592 min
Cumulative refresh count of primary shards 92 91 1 0.01087
Min cumulative refresh time across primary shards 3.17E-04 0.00042 -0.0001 -0.31579 min
Median cumulative refresh time across primary shards 0.62891 0.6189 0.01001 0.01591 min
Max cumulative refresh time across primary shards 1.26E+00 1.23738 0.02012 0.016 min
Cumulative flush time of primary shards 11.9259 11.4079 0.518 0.04343 min
Cumulative flush count of primary shards 61 58 3 0.04918
Min cumulative flush time across primary shards 0 0 0 #DIV/0! min
Median cumulative flush time across primary shards 5.96296 5.70395 0.25901 0.04344 min
Max cumulative flush time across primary shards 11.9259 11.4079 0.518 0.04343 min
Total Young Gen GC time 0.288 0.301 -0.013 -0.04514 s
Total Young Gen GC count 18 17 1 0.05556
Total Old Gen GC time 0 0 0 #DIV/0! s
Total Old Gen GC count 0 0 0 #DIV/0!
Store size 29.8163 25.8162 4.0001 0.13416 GB
Translog size 5.82E-07 5.82E-07 0 0 GB
Heap used for segments 0 0 0 #DIV/0! MB
Heap used for doc values 0 0.00E+00 0 #DIV/0! MB
Heap used for terms 0 0 0 #DIV/0! MB
Heap used for norms 0 0 0 #DIV/0! MB
Heap used for points 0 0 0 #DIV/0! MB
Heap used for stored fields 0 0 0 #DIV/0! MB
Segment count 2 2 0 0
Min Throughput custom-vector-bulk 5948.76 5462.82 485.94 0.08169 docs/s
Mean Throughput custom-vector-bulk 10542.8 10833.4 -290.6 -0.02756 docs/s
Median Throughput custom-vector-bulk 9988.27 10243.3 -255.03 -0.02553 docs/s
Max Throughput custom-vector-bulk 17833.4 17447.1 386.3 0.02166 docs/s
50th percentile latency custom-vector-bulk 72.8052 54.5385 18.2667 0.2509 ms
90th percentile latency custom-vector-bulk 161.131 153.681 7.45 0.04624 ms
99th percentile latency custom-vector-bulk 295.278 294.875 0.403 0.00136 ms
99.9th percentile latency custom-vector-bulk 1.60E+03 1623.76 -21.41 -0.01336 ms
99.99th percentile latency custom-vector-bulk 2.42E+03 3373.36 -953.6 -0.39409 ms
100th percentile latency custom-vector-bulk 2.75E+03 8909.9 -6163.93 -2.24472 ms
50th percentile service time custom-vector-bulk 7.28E+01 54.5385 18.2667 0.2509 %
90th percentile service time force-merge-segments 1.61E+02 153.681 7.45 0.04624 ops/s
99th percentile service time force-merge-segments 295.278 294.875 0.403 0.00136 ops/s
99.9th percentile service time force-merge-segments 1602.35 1623.76 -21.41 -0.01336 ops/s
99.99th percentile service time force-merge-segments 2419.76 3373.36 -953.6 -0.39409 ops/s
100th percentile service time force-merge-segments 2745.97 8909.9 -6163.93 -2.24472 ms
error rate force-merge-segments 0 0 0 #DIV/0! ms
Min Throughput force-merge-segments 0 0 0 #DIV/0! %
Mean Throughput warmup-indices 0 0 0 #DIV/0! ops/s
Median Throughput warmup-indices 0 0 0 #DIV/0! ops/s
Max Throughput warmup-indices 0 0.00E+00 0 #DIV/0! ops/s
100th percentile latency warmup-indices 6.63E+06 7.22E+06 -590670 -0.08915 ops/s
100th percentile service time warmup-indices 6.63E+06 7.22E+06 -590670 -0.08915 ms
error rate warmup-indices 0 0.00E+00 0 #DIV/0! ms
Min Throughput warmup-indices 0.27 2.50E-01 0.02 0.07407 %
Mean Throughput prod-queries 0.27 0.25 0.02 0.07407 ops/s
Median Throughput prod-queries 0.27 0.25 0.02 0.07407 ops/s
Max Throughput prod-queries 0.27 0.25 0.02 0.07407 ops/s
100th percentile latency prod-queries 3656.95 3944.55 -287.6 -0.07864 ops/s
100th percentile service time prod-queries 3656.95 3944.55 -287.6 -0.07864 ms
error rate prod-queries 0 0 0 #DIV/0! ms
Min Throughput prod-queries 0.72 0.77 -0.05 -0.06944 ms
Mean Throughput prod-queries 3.26 5.51 -2.25 -0.69018 ms
Median Throughput prod-queries 0.72 0.77 -0.05 -0.06944 ms
Max Throughput prod-queries 8.33 14.97 -6.64 -0.79712 ms
50th percentile latency prod-queries 8.07474 8.12443 -0.04969 -0.00615 ms
90th percentile latency prod-queries 8.96103 9.02022 -0.05919 -0.00661 ms
99th percentile latency prod-queries 25.3968 24.7477 0.6491 0.02556 %
100th percentile latency prod-queries 1382.39 1290.87 91.52 0.0662
50th percentile service time prod-queries 8.07474 8.12443 -0.04969 -0.00615
90th percentile service time prod-queries 8.96103 9.02022 -0.05919 -0.00661
99th percentile service time prod-queries 25.3968 24.7477 0.6491 0.02556
100th percentile service time prod-queries 1382.39 1290.87 91.52 0.0662
error rate prod-queries 0 0 0 #DIV/0!
Mean recall@k prod-queries 0.29 0.24 0.05 0.17241
Mean recall@1 prod-queries 0.42 0.4 0.02 0.04762

Appendix 2. Loading Time Comparison

The numbers below were measured through time curl -X GET http://localhost:9200/_plugins/_knn/warmup/target_index.
I made two different experiments of loading a FAISS vector index with different buffer sizes.

  1. After dropped all file cache from memory. (sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches')
  2. With file cache in the memory.

Conclusion:
Buffer size in InputIndexWithBuffer does not impact loading time.
Then there's no reason to use more than 4KB buffer. If anything, it cost more space and takes more time between JNI critical section.

When a new index file was just created, and it's not system cached, then there's no trivial different to loading time between baseline and streaming fashion.
But the file is already loaded in system cache, then baseline (e.g. the one using fread) is slightly faster than streaming fashion. (3.584 VS 4.664).
But considering such case is rare (except for rebooting an engine during rolling restart), and it is expected that most cases it would load a newly created vector index, I think it would not seriously deteriorate performance overall.
Once an index was loaded, then query would be processed against to in-memory data structure, therefore there wasn't search performance between baseline versus streaming version. (Refer to above table for more details).

Experiment

Index size : 6.4G

1. Baseline (Using fread)

  1. After dropped : 51.097 seconds
  2. With cached : 3.584 seconds

2. Using Stream

2.1. 4KB

  1. After dropped : 51.354 seconds
  2. With cached : 4.664 seconds

2.2. 64KB

  1. After dropped : 51.491 seconds
  2. With cached : 4.318 seconds

2.3. 1M

  1. After dropped : 51.518 seconds
  2. With cached : 4.201 seconds

Appendix 3. Demo Scripts

0. Set up AWS credential

1. Make sure S3 bucket is empty.

https://us-east-1.console.aws.amazon.com/s3/buckets/kdooyong-opensearch?region=us-east-1&bucketType=general&tab=objects

2. Create an index.

curl -X PUT 'http://localhost:9200/knn-index/' -H 'Content-Type: application/json' -d '
{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 100,
      "`use_compound_file": false`
    }
  },
  "mappings": {
    "properties": {
      "my_vector": {
        "type": "knn_vector",
        "dimension": 2,
        "method": {
          "engine": "faiss",
          "name": "hnsw"
        }
      }
    },
    "dynamic": false
  }
}
' | jq .

3. Bulk ingest vector data.

curl -X POST 'http://localhost:9200/_bulk' -H 'Content-Type: application/json' -d '{ "index": { "_index": "knn-index", "_id": "1" } }
   { "my_vector": [1.5, 2.5], "price": 12.2 }
   { "index": { "_index": "knn-index", "_id": "2" } }
   { "my_vector": [2.5, 3.5], "price": 7.1 }
   { "index": { "_index": "knn-index", "_id": "3" } }
   { "my_vector": [3.5, 4.5], "price": 12.9 }
   { "index": { "_index": "knn-index", "_id": "4" } }
   { "my_vector": [5.5, 6.5], "price": 1.2 }
   { "index": { "_index": "knn-index", "_id": "5" } }
   { "my_vector": [4.5, 5.5], "price": 3.7 }
' | jq .

4. Run a query and make sure we are getting a valid result.

curl -X GET 'http://localhost:9200/knn-index/_search' -H 'Content-Type: application/json' -d '{
  "size": 2,
  "query": {
    "knn": {
      "my_vector": {
        "vector": [2, 3],
        "k": 2
      }
    }
  }
}' | jq .


5. Create a repository.

# Create a repo

curl -X PUT 'http://localhost:9200/_snapshot/my-s3-repository' -H 'Content-Type: application/json' -d '{
  "type": "s3",
  "settings": {
    "bucket": "kdooyong-opensearch",
    "base_path": "demo-searchable-snapshot"
  }
}' | jq .


6. Look up the repository we created.

curl -X GET 'http://localhost:9200/_snapshot/my-s3-repository' | jq .

7. Take the snapshot

curl -X PUT 'http://localhost:9200/_snapshot/my-s3-repository/1' | jq .

8. Get the snapshot info.

curl -X GET 'http://localhost:9200/_snapshot/my-s3-repository/1' | jq .

9. Delete the index from OpenSearch. Confirm that we deleted the index in OpenSearch.

curl -X DELETE http://localhost:9200/knn-index | jq .

10. Confirm we don’t have any indices

curl -X GET http://localhost:9200/_cat/indices


# This querying must fail
curl -X GET 'http://localhost:9200/knn-index/_search' -H 'Content-Type: application/json' -d '{
  "size": 2,
  "query": {
    "knn": {
      "my_vector": {
        "vector": [2, 3],
        "k": 2
      }
    }
  }
}' | jq .

11. Restore the searchable index. Confirm that now we have a vector index restored.

curl -X POST http://localhost:9200/_snapshot/my-s3-repository/1/_restore -H 'Content-Type: application/json' -d '{
  "storage_type": "remote_snapshot",
  "indices": "knn-index"
}' | jq .

curl -X GET http://localhost:9200/_cat/indices

12. Run a query against the restored vector index. Make sure we are getting a valid result.

curl -X GET 'http://localhost:9200/knn-index/_search' -H 'Content-Type: application/json' -d '{
  "size": 2,
  "query": {
    "knn": {
      "my_vector": {
        "vector": [2, 3],
        "k": 2
      }
    }
  }
}' | jq .

@0ctopus13prime 0ctopus13prime changed the title [FEATURE] [RFC] Introducing Loading Layer in Native KNN Engines Sep 4, 2024
@heemin32 heemin32 added v2.18.0 and removed untriaged labels Sep 6, 2024
@navneet1v navneet1v added the indexing-improvements This label should be attached to all the github issues which will help improving the indexing time. label Sep 14, 2024
@vamshin vamshin added the Roadmap:Vector Database/GenAI Project-wide roadmap label label Sep 27, 2024
@0ctopus13prime 0ctopus13prime changed the title [RFC] Introducing Loading Layer in Native KNN Engines [RFC] Introducing Loading/Writing Layer in Native KNN Engines Sep 27, 2024
@0ctopus13prime
Copy link
Contributor Author

@jmazanec15
Hi Jack, please refer to section 5.5. [P1] Nmslib - Introducing Similar Concept of IOReader in FAISS. for future changes that I will make for NMSLIB.

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Sep 27, 2024

NMSLIB patch was merged - #2144
Will raise a new PR for stream support in NMSLIB.

Current PR : #2139

@0ctopus13prime
Copy link
Contributor Author

Memory monitoring during performance benchmark.

Hi Navneet, ran a benchmark (including both bulk ingestion + searching), I could not find any evidences that showing memory peak during searching.
Please let me know if it looks good on you and we can merge this!
Thank you.

Streaming

Screenshot 2024-10-01 at 7 24 46 PM

Baseline

Screenshot 2024-10-01 at 7 28 20 PM

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Oct 3, 2024

Interim Progress Report

After #2139 PR got merged, there are two more PRs to be followed shortly.

  1. Loading layer in NMSLIB.
  2. Decomissioning FileWatcher

After above two PRs, introducing a loading layer is officially available in OpenSearch.
But for the writing layer, we need to tune the performance, as we can see from the benchmark from POC, it is likely to have 20% performance degradation and we need to fix it.
Once we fixed the perf issue, we can raise one single PR to introduce writing layer in both native engines.

@0ctopus13prime
Copy link
Contributor Author

PR for introducing a loading layer in NMSLIB
#2185

@0ctopus13prime
Copy link
Contributor Author

[NMSLIB] Memory monitoring results comparison

From memory stand-point, not major changes I could observe from benchmark.

Baseline

nmslib_baseline

Candidate

nmslib-candidate-1

@0ctopus13prime
Copy link
Contributor Author

[NMSLIB] Loading Time Comparison

The numbers below were measured through time curl -X GET http://localhost:9200/_plugins/_knn/warmup/target_index.
I made two different experiments of loading a FAISS vector index with different buffer sizes.

  1. After dropped all file cache from memory. (sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches')
  2. With file cache in the memory.

Observation

Unlike FAISS, it took almost 81% more time when loading a system cached file.
Of course, this case will be rare, as it is expected that KNN will load a vector index whenever a new segment file is baked.
And the newly baked segment file likely is not system cached.
Increasing buffer size didn't help. Need to find a better way to transfer data from JNI to Java.

Experiment

Index size : 30G

1. Baseline (Using fread)

  1. After dropped : 50.448 seconds
  2. With cached : 4.144 seconds

2. Using Stream (4KB)

  1. After dropped : 52.779 seconds
  2. With cached : 7.503 seconds
  3. With cached, 64KB : 6.919 seconds
  4. With cached, 1M : 6.99 seconds 🤔

@0ctopus13prime
Copy link
Contributor Author

[NMSLIB] Performance Benchmark

Machine : -XMS63G -XMX63G
JVM Args : c5ad.12xlarge
Data : random-s-128-10m-euclidean.hdf5

Metric Task Baseline-Value Candidate-Value Change Unit
Cumulative indexing time of primary shards 33.7147 34.2115 1.47% min
Min cumulative indexing time across primary shards 0.000133333 0.00015 12.50% min
Median cumulative indexing time across primary shards 16.8573 17.1057 1.47% min
Max cumulative indexing time across primary shards 33.7146 34.2113 1.47% min
Cumulative indexing throttle time of primary shards 0 0 0.00% min
Min cumulative indexing throttle time across primary shards 0 0 0.00% min
Median cumulative indexing throttle time across primary shards 0 0 0.00% min
Max cumulative indexing throttle time across primary shards 0 0 0.00% min
Cumulative merge time of primary shards 282.601 282.996 0.14% min
Cumulative merge count of primary shards 125 122 2.40%
Min cumulative merge time across primary shards 0 0 0.00% min
Median cumulative merge time across primary shards 141.3 141.498 0.14% min
Max cumulative merge time across primary shards 282.601 282.996 0.14% min
Cumulative merge throttle time of primary shards 1.04818 1.61307 53.89% min
Min cumulative merge throttle time across primary shards 0 0 0.00% min
Median cumulative merge throttle time across primary shards 0.524092 0.806533 53.89% min
Max cumulative merge throttle time across primary shards 1.04818 1.61307 53.89% min
Cumulative refresh time of primary shards 1.1042 1.14667 3.85% min
Cumulative refresh count of primary shards 88 85 3.41%
Min cumulative refresh time across primary shards 0.000333333 0.000383333 15.00% min
Median cumulative refresh time across primary shards 0.5521 0.573333 3.85% min
Max cumulative refresh time across primary shards 1.10387 1.14628 3.84% min
Cumulative flush time of primary shards 11.5126 10.9446 4.93% min
Cumulative flush count of primary shards 56 53 5.36%
Min cumulative flush time across primary shards 0 0 0.00% min
Median cumulative flush time across primary shards 5.75628 5.47229 4.93% min
Max cumulative flush time across primary shards 11.5126 10.9446 4.93% min
Total Young Gen GC time 0.338 0.342 1.18% s
Total Young Gen GC count 19 19 0.00%
Total Old Gen GC time 0 0 0.00% s
Total Old Gen GC count 0 0 0.00%
Store size 29.8586 29.8584 0.00% GB
Translog size 5.83E-07 5.83E-07 0.00% GB
Heap used for segments 0 0 0.00% MB
Heap used for doc values 0 0 0.00% MB
Heap used for terms 0 0 0.00% MB
Heap used for norms 0 0 0.00% MB
Heap used for points 0 0 0.00% MB
Heap used for stored fields 0 0 0.00% MB
Segment count 2 2 0.00%
Min Throughput custom-vector-bulk 5390.31 5466.23 1.41% docs/s
Mean Throughput custom-vector-bulk 11041.1 10866 1.59% docs/s
Median Throughput custom-vector-bulk 10377.9 10065.6 3.01% docs/s
Max Throughput custom-vector-bulk 20105.1 19337.8 3.82% docs/s
50th percentile latency custom-vector-bulk 78.4349 75.8132 3.34% ms
90th percentile latency custom-vector-bulk 165.667 158.129 4.55% ms
99th percentile latency custom-vector-bulk 331.043 318.269 3.86% ms
99.9th percentile latency custom-vector-bulk 1486.47 1487.08 0.04% ms
99.99th percentile latency custom-vector-bulk 2300.48 2598.04 12.93% ms
100th percentile latency custom-vector-bulk 5049.72 4535.08 10.19% ms
50th percentile service time custom-vector-bulk 78.4349 75.8132 3.34% ms
90th percentile service time custom-vector-bulk 165.667 158.129 4.55% ms
99th percentile service time custom-vector-bulk 331.043 318.269 3.86% ms
99.9th percentile service time custom-vector-bulk 1486.47 1487.08 0.04% ms
99.99th percentile service time custom-vector-bulk 2300.48 2598.04 12.93% ms
100th percentile service time custom-vector-bulk 5049.72 4535.08 10.19% ms
error rate custom-vector-bulk 0 0 0.00% %
Min Throughput force-merge-segments 0 0 0.00% ops/s
Mean Throughput force-merge-segments 0 0 0.00% ops/s
Median Throughput force-merge-segments 0 0 0.00% ops/s
Max Throughput force-merge-segments 0 0 0.00% ops/s
100th percentile latency force-merge-segments 1.16E+07 1.13E+07 2.84% ms
100th percentile service time force-merge-segments 1.16E+07 1.13E+07 2.84% ms
error rate force-merge-segments 0 0 0.00% %
Min Throughput warmup-indices 0.24 0.14 41.67% ops/s
Mean Throughput warmup-indices 0.24 0.14 41.67% ops/s
Median Throughput warmup-indices 0.24 0.14 41.67% ops/s
Max Throughput warmup-indices 0.24 0.14 41.67% ops/s
100th percentile latency warmup-indices 4162.87 7127.78 71.22% ms
100th percentile service time warmup-indices 4162.87 7127.78 71.22% ms
error rate warmup-indices 0 0 0.00% %
Min Throughput prod-queries 0.66 0.64 3.03% ops/s
Mean Throughput prod-queries 0.66 0.64 3.03% ops/s
Median Throughput prod-queries 0.66 0.64 3.03% ops/s
Max Throughput prod-queries 0.66 0.64 3.03% ops/s
50th percentile latency prod-queries 3.5832 3.83349 6.99% ms
90th percentile latency prod-queries 4.75317 4.64172 2.34% ms
99th percentile latency prod-queries 22.1628 23.8439 7.59% ms
100th percentile latency prod-queries 1508.36 1571.86 4.21% ms
50th percentile service time prod-queries 3.5832 3.83349 6.99% ms
90th percentile service time prod-queries 4.75317 4.64172 2.34% ms
99th percentile service time prod-queries 22.1628 23.8439 7.59% ms
100th percentile service time prod-queries 1508.36 1571.86 4.21% ms
error rate prod-queries 0 0 0.00% %
Mean recall@k prod-queries 0.42 0.43 2.38%
Mean recall@1 prod-queries 0.6 0.63 5.00%

@0ctopus13prime
Copy link
Contributor Author

[NMSLIB] Streaming Flamegraph

Screenshot 2024-10-08 at 5 44 14 PM

@0ctopus13prime
Copy link
Contributor Author

1. NMSLIB Loading Perf Issue Analysis

2. Performance Degradation In FAISS

After switching from direct file API usage to an abstract IO loading layer, additional overhead was introduced due to JNI calls and buffer copying via std::memcpy. This change resulted in a 30% increase in loading time compared to the baseline in FAISS. The baseline took 3.584 seconds to load a 6GB vector index, while the modified version increased the load time to 4.664 seconds.

In NMSLIB, we expected a similar level of performance regression as seen in FAISS. However, we're observing a 70% increase in load time when loading a 6GB vector index. (baseline=4.144 sec, the modified one=7.503 sec)
Why is the performance impact in NMSLIB more than twice as severe as in FAISS?

3. Why is it more than twice as severe as in FAISS?

The key performance difference in index loading between FAISS and NMSLIB stems from their file formats.
In NMSLIB, this difference results in JNI calls being made O(N) times, where N is the number of vectors, whereas in FAISS, the number of JNI calls is O(1).

FAISS stores chunks of the neighbor list in a single location and loads them all at once. See the code below:

static void read_HNSW(HNSW* hnsw, IOReader* f) {
    READVECTOR(hnsw->assign_probas);
    READVECTOR(hnsw->cum_nneighbor_per_level);
    READVECTOR(hnsw->levels);
    READVECTOR(hnsw->offsets);
    READVECTOR(hnsw->neighbors);

    READ1(hnsw->entry_point);
    READ1(hnsw->max_level);
    READ1(hnsw->efConstruction);
    READ1(hnsw->efSearch);
    READ1(hnsw->upper_beam);
}

In NMSLIB, each neighbor list is stored individually, requiring O(N) reads, where N is the total number of vectors.
As shown in the code below, we need totalElementsStored_ read operations.
Note that input.read() ultimately calls JNI to delegate Lucene’s IndexInput to read bytes thanks to the introduced loading layer. As a result, the number of input.read() calls directly corresponds to the number of JNI calls.

for (size_t i = 0; i < totalElementsStored_; i++) {
   ...
    } else {
        linkLists_[i] = (char *)malloc(linkListSize);
        CHECK(linkLists_[i]);
        input.read(linkLists_[i], linkListSize); <--------- THIS!
    }
    data_rearranged_[i] = new Object(data_level0_memory_ + (i)*memoryPerObject_ + offsetData_);
}

4. Solution 1. Patch in NMSLIB

We can patch NMSLIB to avoid making JNI calls for each vector element. The idea is to load data in bulk, then parse the neighbor lists from that buffer, rather than reading bytes individually. This approach would reduce the number of JNI calls to O(Index size / Buffer size).

For example, with a 6GB vector index containing 1 million vectors and a 64KB buffer size, the required JNI calls would be reduced to O(6GB / 64KB) = 98,304, which is a significant improvement over 1 million calls, achieving nearly a 90% reduction in operations.

Result: Surprisingly, it is 8% faster than the baseline. (Note: I reindexed on a new single node, which is why the loading time differs from the one mentioned earlier in the issue.)

  1. Baseline : 4.538 sec
  2. Modified version with 64KB buffer : 4.19 sec

4.1 Pros

  1. No performance degradation. If anything, it is even faster than the baseline.
  2. We can maintain unified set of loading APIs for both NMSLIB and FAISS.

4.2 Cons

  1. Medium size of patch is required in NMSLIB. This may increase burdens on code maintenance.

4.3. Patch in hnsw.cc

template <typename dist_t>
void Hnsw<dist_t>::LoadOptimizedIndex(NmslibIOReader& input) {
    ...

    const size_t bufferSize = 64 * 1024;  // 64KB
    std::unique_ptr<char[]> buffer (new char[bufferSize]);
    uint32_t end = 0;
    uint32_t pos = 0;
    const bool isLTE = _isLittleEndian();
    
    for (size_t i = 0, remainingBytes = input.remaining(); i < totalElementsStored_; i++) {
        // Read linkList size integer.
        if ((pos + sizeof(SIZEMASS_TYPE)) >= end) {
            // Underflow, load bytes in bulk.
            const auto firstPartLen = end - pos;
            if (firstPartLen > 0) {
                std::memcpy(buffer.get(), buffer.get() + pos, firstPartLen);
            }
            const auto copyBytes = std::min(remainingBytes, bufferSize - firstPartLen);
            input.read(buffer.get() + firstPartLen, copyBytes);
            remainingBytes -= copyBytes;
            end = copyBytes + firstPartLen;
            pos = 0;
        }
    
        // Read data size. SIZEMASS_TYPE -> uint32_t
        SIZEMASS_TYPE linkListSize = 0;
        if (isLTE) {
            linkListSize = _readIntLittleEndian(buffer[pos], buffer[pos + 1], buffer[pos + 2], buffer[pos + 3]);
        } else {
            linkListSize = _readIntBigEndian(buffer[pos], buffer[pos + 1], buffer[pos + 2], buffer[pos + 3]);
        }
        pos += 4;
    
        if (linkListSize == 0) {
            linkLists_[i] = nullptr;
        } else {
            // Now we load neighbor list.
            linkLists_[i] = (char *) malloc(linkListSize);
            CHECK(linkLists_[i]);
    
            SIZEMASS_TYPE leftLinkListData = linkListSize;
            auto dataPtr = linkLists_[i];
            while (leftLinkListData > 0) {
                if (pos >= end) {
                    // Underflow, load bytes in bulk.
                    const auto copyBytes = std::min(remainingBytes, bufferSize);
                    input.read(buffer.get(), copyBytes);
                    remainingBytes -= copyBytes;
                    end = copyBytes;
                    pos = 0;
                }
        
                const auto copyBytes = std::min(leftLinkListData, end - pos);
                std::memcpy(dataPtr, buffer.get() + pos, copyBytes);
                dataPtr += copyBytes;
                leftLinkListData -= copyBytes;
                pos += copyBytes;
            }  // End while
        }  // End if
    
        data_rearranged_[i] = new Object(data_level0_memory_ + (i)*memoryPerObject_ + offsetData_);
    }  // End for
                
...            
              

5. Solution 2. Disable Streaming When FSDirectory

Since we're deprecating NMSLIB in version 3.x, we can disable loading layer in NMSLIB until then.
Or, we can selectively allow streaming in NMSLIB depending on whether the given Directory is FSDirectory implementation.

if (directory instance of Directory) {
  loadIndexByFilePath(...);
} else {
  loadIndexByStreaming(...);
}

5.1. Pros :

  1. Simple.

5.2. Cons :

  1. Until 3.x, we need to maintain duplicated and similar version of APIs in both Java and JNI.

6. Solution 3. Live with it :)

Since we're deprecating NMSLIB in version 3.x, we can tolerate this issue in the short term.
However, I personally don't favor this approach, as it impacts the p99 latency metrics, which are rare but could still affect overall cluster performance at the worst case.

7. Micro Tuning Results

  1. CallNonvirtualIntMethod → No impacts.
  2. AVX 2 intrinsic copy → No impacts.
  3. Use native ByteBuffer + one additional bytes copy → Made it worse.
  4. Increasing buffer size → Increasing 4KB to 64KB at least reduced the warm-up time by 0.8 seconds in NMSLIB.

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Oct 12, 2024

We decided to go with the solution 1 for NMSLIB
PR - #2185

@dblock
Copy link
Member

dblock commented Oct 14, 2024

[Catch All Triage - 1, 2, 3, 4]

@navneet1v
Copy link
Collaborator

As the issue is linked with the PR and PR get closed the issue is also getting closed, which we don't want. Thanks @dblock for removing the untriaged label.

@navneet1v
Copy link
Collaborator

@0ctopus13prime just checking, what items are left for this feature to complete?

@0ctopus13prime
Copy link
Contributor Author

@navneet1v
Writing part for native engine is the left part!
Stuck in deprecating FileWatcher, but we can first have the writing layer to be added.

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Oct 23, 2024

Writing Layer Latency Analysis

1. Goal

This analysis report explores the potential impact of replacing the std::ofstream I/O mechanism with a writing layer in native engines (FAISS, NMSLIB). In this new approach, all I/O writing relies on an interface that internally uses Lucene’s IndexOutput to manage byte flushing. Depending on the provided directory, it can flush bytes to the host's file system or to S3. The introduction of additional logic for handling bytes—such as multiple JNI calls and virtual calls—may introduce some latency.
However, the extent of this latency increase is currently unknown. This document details the methodology I used to reach my conclusion, with the aim of aligning on the understanding that the writing layer will not cause noticeable performance degradation.

2. Conclusion

After multiple rounds of benchmark and impact deep dive analysis, I concluded that the writing layer’s contribution to latency overhead is minimal, account for at most 1% of the shard indexing time. While some sections of the benchmark results suggested potential performance degradation due to the change, after serial benchmarks indicated that those inconsistent numbers are likely noise. Therefore, my conclusion is that hardly expect severe performance degradation coming from writing layer.

3. Writing Layer’s Contribution in Shard Indexing Latency

What proportion does the writing layer contribute to the overall shard-level indexing process?
The vector indexing process within each shard consists of three stages:

  1. Buffering vectors and ID lists into memory involves transferring Java float[] and int[] arrays to the JNI layer, where they are then loaded into memory allocated by C++.
  2. Passing the buffered memory to construct the vector index is the core step in the HNSW indexing mechanism.
    It uses the provided vectors to build a multi-layered, small-world graph.
  3. After ingesting all vectors, the constructed index is flushed to the underlying storage, which can be a local filesystem, NFS, or S3 depending on which Directory was given.
    In the baseline, std::ofstream is being used.
    Writing layer is at this stage.

3.1. Test Set-up (For FAISS only)

3.1.1. Testing Environment

  • c5ad.12xlarge
    • 48 vcpu
    • 128G
  • -Xmx63G -Xms63G
  • Storage : gp3
  • Data
    • 1M vectors
    • 128 dimensions

I created a standalone program to load prepared vector data and feed it to the JNIService, which manages the vector index as a whole. In my standalone testing, I used 1 million vectors, each with 128 dimensions (random-s-128-10m-euclidean.hdf5). In this program, I tested the following three scenarios:

  1. Use a stream that internally uses Lucene’s IndexOutput to flush the vector index.
  2. Use std::fstream to flush the vector index, which is the baseline.
  3. By combining the two approaches mentioned above, I created a new IOWriter implementation that uses std::fstream internally to write bytes directly to a file without relying on IndexOutput. This implementation can only be used when an FSDirectory is provided.

The total indexing time was 5 minutes and 50 seconds, with only 1 second (this is the worst case I can imagine, the actual numbers I got were less than 1 seconds) spent on flushing the vector index (the final step), which accounts for 0.2% of the total time. This indicates that the majority of the time in vector indexing is spent constructing the in-memory vector index, while only 0.2% is allocated to I/O processing. Even though flushing takes more than twice as long after the introduction of the writing layer, its overall impact is marginal, resulting in a total time of 5 minutes and 51 seconds.

Test case Time (micro) Time Diff
Baseline 450436
Streaming 656856 0.45827
Hybrid 457356 0.01513

The resulting file size is 634 MB. The baseline took approximately 0.45 seconds to flush, while the streaming approach took 0.65 seconds. The hybrid approach was nearly identical to the baseline, taking 0.457 seconds. As observed, there was indeed an increase in flushing time from 0.45 seconds to 0.65 seconds. However, since both vector transfer and constructing the vector index account for 99.8% of the total time, I anticipate that the writing layer will have minimal impact on overall performance.

4. Benchmark Results

  • c5ad.12xlarge
    • 48 vcpu
    • 128G
  • -Xmx63G -Xms63G
  • Storage : gp3

I excluded sections from benchmark results according to below conditions.

  1. Diff <= 5%. Which is likely noise.
  2. Value < 1 seconds. For example, the section 'Min cumulative indexing time across primary shards' whose values are typically 0.002 seconds.

4.1. Faiss

4.1.1. Conclusion

From the results, I can see that cumulative indexing time and merging time are almost identical between the baseline and writing layer. Only p100 latency is likely impacted from the writing layer.

4.1.2. Benchmark Results

Note that the values in the writing layer represent the average of five benchmark results.

Metric Task Baseline Writing Layer Diff Unit
Min Throughput custom-vector-bulk 5793.51 5225.818 -0.10863 docs/s
Max Throughput custom-vector-bulk 19347.1 17716.92 -0.09201 docs/s
100th percentile latency custom-vector-bulk 3157.52 4727.68 0.33212 ms

4.2. NMSLIB

4.2.1. Conclusion

Surprisingly, the overall performance has improved compared to the baseline. I believe this enhancement is due to the use of 64 KB I/O buffering for writing, as opposed to the 4 KB buffer size used by std::ofstream.

4.2.2. Benchmark Results

I applied the same filtering rule mentioned in Faiss to exclude specific sections.
Note that the values in the writing layer represent the average of three benchmark results.

Metric Task Baseline Candidate-1 Diff Unit
Cumulative merge throttle time of primary shards 1.73867 1.33853 -0.23014 min
Median cumulative merge throttle time across primary shards 0.86933 0.66926 -0.23014 min
Max cumulative merge throttle time across primary shards 1.73867 1.33853 -0.23014 min
Min Throughput custom-vector-bulk 3279.76 4316.82 0.3162 docs/s
99.99th percentile latency custom-vector-bulk 3193.2 2751.545 -0.3245 ms
100th percentile latency custom-vector-bulk 6304.56 4634.195 -0.35049 ms

5. Alternatives - Hybrid

Based on the results, I believe we can safely go with the writing layer. However, if we want to be more conservative, we can adopt the hybrid approach (see Case 3. Use hybrid approach). In this approach, we implement the IOReader interface but use std::ofstream to flush bytes when an FSDirectory is provided.

From the micro-performance testing, we can see that the time spent is nearly identical to the baseline (baseline = 0.45 sec, hybrid = 0.457 sec). One of the greatest advantages of this approach is that it allows us to have a single codebase for I/O operations using the writing layer, while also being simple and easy to implement. Additionally, it guarantees users an identical indexing experience.

Overall, I don't see any downsides to this approach.

Appendix 1. Stand alone measuring vector indexing.

Main

public static void main(String... args) throws IOException {
    final String dataPath = "/home/ec2-user/dump_to_print/data.json";
    final String tmpDirectory = "tmp-" + UUID.randomUUID();
    final Directory directory = new MMapDirectory(Path.of(tmpDirectory));
    final int numData = 10000;
    final int dim = 128;
    Map<String, Object> parameters = new HashMap<>();
    parameters.put("name", "hnsw");
    parameters.put("data_type", "float");
    parameters.put("index_description", "HNSW16,Flat");
    parameters.put("spaceType", "l2");

    Map<String, Object> innerParameters = new HashMap<>();
    innerParameters.put("ef_search", 10);
    innerParameters.put("ef_construction", 100);

    Map<String, Object> encoderParameters = new HashMap<>();
    encoderParameters.put("name", "flat");
    encoderParameters.put("parameters", Collections.emptyMap());
    innerParameters.put("encoder", encoderParameters);
    parameters.put("parameters", innerParameters);

    parameters.put("indexThreadQty", 1);
    
    final String fullPath = tmpDirectory + "/output";

    try (final IndexOutput indexOuptut = directory.createOutput("output", IOContext.DEFAULT)) {
        System.out.println("Output : " + tmpDirectory + "/output");
        IndexOutputWithBuffer indexOutputWithBuffer = new IndexOutputWithBuffer(indexOuptut);
        FaissService.kdyBench(numData, dim, dataPath, parameters, indexOutputWithBuffer, fullPath);
       // For NMSLIB testing, uncomment below.
       // NmslibService.kdyBench(numData, dim, dataPath, parameters, indexOutputWithBuffer);
    }
    System.out.println("OUT!!!!!!!!");
}

 

HDF5 Data Dump

import h5py
import json

# file_name = 'sift-128-euclidean.hdf5'
file_name = 'random-s-128-10m-euclidean.hdf5'
f = h5py.File(file_name, 'r')

data = f['train']
L = min(1000000, len(data))
for i in range(L):
  print(json.dumps(data[i].tolist()))

f.close()

//////////////////////////////////////
// Result
//////////////////////////////////////
[5.838372982574612, 2.711372391128072, 5.014160838955179, -8.336033892368915, ...]

Case 1. Use stream

std::cout << "Stream!!!!!!!!" << std::endl;
auto start = std::chrono::high_resolution_clock::now();

knn_jni::faiss_wrapper::WriteIndex(&jniUtil, env, indexOutput, indexAddress, &indexService);

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Stream file version -> Execution time: " << duration.count() << " microseconds" << std::endl;

Case 2. Use std::ofstream

std::cout << "FStream!!!!!!" << std::endl;
const std::string destPath = jniUtil.ConvertJavaStringToCppString(env, fullPathj);
auto start = std::chrono::high_resolution_clock::now();

knn_jni::faiss_wrapper::WriteIndexLegacy(&jniUtil, env, destPath, indexAddress, &indexService);

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Legacy file version -> Execution time: " << duration.count() << " microseconds" << std::endl;

Case 3. Use hybrid approach

const std::string destPath = jniUtil.ConvertJavaStringToCppString(env, fullPathj);
auto start = std::chrono::high_resolution_clock::now();

 knn_jni::faiss_wrapper::WriteIndexKdy(&jniUtil, env, destPath, indexAddress, &indexService);
 
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> duration = end - start;
std::cout << "Kdy Hybrid Stream file version -> Execution time: "
          << duration.count() << " microseconds" << std::endl;




void knn_jni::faiss_wrapper::WriteIndexKdy(knn_jni::JNIUtilInterface * jniUtil, JNIEnv * env,
                                           const std::string& path, jlong index_ptr, IndexService* indexService) {

  knn_jni::stream::KdyFaissIOWriter writer {path};

  // Create index
  indexService->writeIndex(&writer, index_ptr);
}




class KdyFaissIOWriter final : public faiss::IOWriter {
 public:
  explicit KdyFaissIOWriter(const std::string& _path)
      : faiss::IOWriter(),
        path(_path),
        stream(_path, std::ios::binary) {
    name = "FaissOpenSearchIOWriter";
  }

  size_t operator()(const void *ptr, size_t size, size_t nitems) final {
    const auto writeBytes = size * nitems;
    if (writeBytes > 0) {
      stream.write((char*) ptr, writeBytes);
    }
    return nitems;
  }

  // return a file number that can be memory-mapped
  int filedescriptor() final {
    throw std::runtime_error("filedescriptor() is not supported in KdyFaissIOWriter.");
  }

 private:
  std::string path;
  std::ofstream stream;
};  // class FaissOpenSearchIOWriter


Parsing Vectors and Indexing

std::unique_ptr<knn_jni::faiss_wrapper::FaissMethods> faissMethods(
    new knn_jni::faiss_wrapper::FaissMethods());
knn_jni::faiss_wrapper::IndexService indexService(std::move(faissMethods));

const std::string dataPath = jniUtil.ConvertJavaStringToCppString(env, inputDataPathJ);
std::ifstream in (dataPath);
std::string line;
std::vector<float> vectors;
int64_t id = 0;
std::vector<int64_t> ids;
while (std::getline(in, line)) {
    int s = 1;
    for (int i = s ; i < line.size() ; ) {
        while (line[i] != ',' && line[i] != ']') {
            ++i;
        }
        while (s < line.size() && line[s] == ' ') {
            ++s;
        }
        std::string value = line.substr(s, (i - s));
        const float fvalue = std::stod(value);
        vectors.push_back(fvalue);
        s = ++i;
    }

    ids.push_back(id++);
}

numData = ids.size();
std::cout << "dim=" << dim
            << ", numData=" << numData
            << ", |vectors|=" << vectors.size() << std::endl;

indexService.insertToIndex(dim, numData, 1,
                            (int64_t) (&vectors),
                            ids,
                            indexAddress);
std::cout << "Insert to index is done!" << std::endl;

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Oct 23, 2024

@navneet1v
I think we can safely go with the writing layer, from the analysis I think having it only affects 1% of the total shard vector index processing.
Had multiple rounds of benchmark + micro benchmark, but overall, those numbers in benchmark are likely noise. Meaning that represented degradation might be noise, and I don't believe those dropped numbers are because of the writing layer.

Please feel free to share your thoughts on it.
Once we get aligned on it is safe to go, will work on carving down the code and will continue add IT tests + unit tests.

Thank you!

@navneet1v
Copy link
Collaborator

@0ctopus13prime thanks for sharing the results. I am aligned with going with the writing layer and shouldn't build a hybrid approach. The minimal degradation we are seeing is just noise which is very prominent with indexing in general. Thanks for sharing the detailed analysis. From my side I would say lets write the code.

On a separate PR please include these micro-benchmarks too in the repo so that it can be used later.

@0ctopus13prime
Copy link
Contributor Author

Final Benchmark Environment (Cluster)

  • OpenSearch Version : 2.18 (backporting this change to 2.xx branch)
  • Primary shards : 3
  • No replica
  • Data node : r7gd.4xlarge (16 vCPU, 128 memory)
  • Storage : SSD, EBS, 5000 IOPS
  • JVM Heap : -Xmx32g -Xms32g

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Nov 5, 2024

Faiss benchmark results conclusion (3 shards)

From the results, it's expected that total cumulative indexing time will be increased up to 2% (67.67min -> 68.98min), thus it means that the bulk indexing throughput can be decreased down to 1.4% (7861 -> 7744).

Faiss benchmark details

<style> </style>
Metric Task Baseline Candidate Diff Unit
Cumulative indexing time of primary shards   67.6766 68.9868 0.01935972 min
Min cumulative indexing time across primary shards   0 0 0 min
Median cumulative indexing time across primary shards   10.92455 11.3572 0.039603462 min
Max cumulative indexing time across primary shards   22.9782 23.4515 0.020597784 min
Cumulative indexing throttle time of primary shards   0 0 0 min
Min cumulative indexing throttle time across primary shards   0 0 0 min
Median cumulative indexing throttle time across primary shards   0 0 0 min
Max cumulative indexing throttle time across primary shards   0 0 0 min
Cumulative merge time of primary shards   455.27 463.033 0.01705142 min
Cumulative merge count of primary shards   628 631 0.00477707  
Min cumulative merge time across primary shards   0 0 0 min
Median cumulative merge time across primary shards   74.92695 75.6642 0.009839584 min
Max cumulative merge time across primary shards   154.315 158.499 0.027113372 min
Cumulative merge throttle time of primary shards   25.78985 22.7165 -0.119168975 min
Min cumulative merge throttle time across primary shards   0 0 0 min
Median cumulative merge throttle time across primary shards   3.979025 3.18752 -0.198919333 min
Max cumulative merge throttle time across primary shards   9.53849 8.29422 -0.130447272 min
Cumulative refresh time of primary shards   0.8357415 0.8457 0.011915766 min
Cumulative refresh count of primary shards   475.5 473 -0.005257624  
Min cumulative refresh time across primary shards   0 0 0 min
Median cumulative refresh time across primary shards   0.134925 0.132867 -0.015252918 min
Max cumulative refresh time across primary shards   0.2874665 0.302833 0.053454924 min
Cumulative flush time of primary shards   15.55735 15.5443 -0.000838832 min
Cumulative flush count of primary shards   350 349 -0.002857143  
Min cumulative flush time across primary shards   0 0 0 min
Median cumulative flush time across primary shards   2.5186 2.5161 -0.000992615 min
Max cumulative flush time across primary shards   5.39728 5.36828 -0.005373077 min
Total Young Gen GC time   1.469 1.706 0.161334241 s
Total Young Gen GC count   160 174 0.0875  
Total Old Gen GC time   0 0 0 s
Total Old Gen GC count   0 0 0  
Store size   340.746 340.744 -5.86947E-06 GB
Translog size   1.43983E-06 1.20E-06 -0.168176688 GB
Heap used for segments   0 0 0 MB
Heap used for doc values   0 0 0 MB
Heap used for terms   0 0 0 MB
Heap used for norms   0 0 0 MB
Heap used for points   0 0 0 MB
Heap used for stored fields   0 0 0 MB
Segment count   5.5 5 -0.090909091  
Min Throughput custom-vector-bulk 6584.245 5835.71 -0.113685776 docs/s
Mean Throughput custom-vector-bulk 7861.24 7744.98 -0.014789015 docs/s
Median Throughput custom-vector-bulk 7596.51 7518.42 -0.010279721 docs/s
Max Throughput custom-vector-bulk 10233.015 9612.79 -0.060610192 docs/s
50th percentile latency custom-vector-bulk 72.44355 73.3195 0.012091484 ms
90th percentile latency custom-vector-bulk 151.5965 155.167 0.023552655 ms
99th percentile latency custom-vector-bulk 260.645 257.376 -0.012541963 ms
99.9th percentile latency custom-vector-bulk 378.015 381.875 0.010211235 ms
99.99th percentile latency custom-vector-bulk 608.632 932.898 0.532778428 ms
100th percentile latency custom-vector-bulk 1080.8535 3556.3 2.290270143 ms
50th percentile service time custom-vector-bulk 72.44355 73.3195 0.012091484 ms
90th percentile service time custom-vector-bulk 151.5965 155.167 0.023552655 ms
99th percentile service time custom-vector-bulk 260.645 257.376 -0.012541963 ms
99.9th percentile service time custom-vector-bulk 378.015 381.875 0.010211235 ms
99.99th percentile service time custom-vector-bulk 608.632 932.898 0.532778428 ms
100th percentile service time custom-vector-bulk 1080.8535 3556.3 2.290270143 ms
error rate custom-vector-bulk 0 0 0 %
Min Throughput force-merge-segments 0 0 0 ops/s
Mean Throughput force-merge-segments 0 0 0 ops/s
Median Throughput force-merge-segments 0 0 0 ops/s
Max Throughput force-merge-segments 0 0 0 ops/s
100th percentile latency force-merge-segments 11015600 1.13E+07 0.026789281 ms
100th percentile service time force-merge-segments 11015600 1.13E+07 0.026789281 ms
error rate force-merge-segments 0 0 0 %
Min Throughput warmup-indices 0.02 0.02 0 ops/s
Mean Throughput warmup-indices 0.02 0.02 0 ops/s
Median Throughput warmup-indices 0.02 0.02 0 ops/s
Max Throughput warmup-indices 0.02 0.02 0 ops/s
100th percentile latency warmup-indices 46760.8 54720 0.170210946 ms
100th percentile service time warmup-indices 46760.8 54720 0.170210946 ms
error rate warmup-indices 0 0 0 %
Min Throughput prod-queries 2.08 1.61 -0.225961538 ops/s
Mean Throughput prod-queries 8.765 10.09 0.151169424 ops/s
Median Throughput prod-queries 8.765 4.43 -0.494580719 ops/s
Max Throughput prod-queries 15.445 24.24 0.569439948 ops/s
50th percentile latency prod-queries 9.361765 10.2061 0.090189724 ms
90th percentile latency prod-queries 11.2586 12.3034 0.092800171 ms
99th percentile latency prod-queries 482.748 557.695 0.155250773 ms
100th percentile latency prod-queries 503.6475 621.016 0.233036995 ms
50th percentile service time prod-queries 9.361765 10.2061 0.090189724 ms
90th percentile service time prod-queries 11.2586 12.3034 0.092800171 ms
99th percentile service time prod-queries 482.748 557.695 0.155250773 ms
100th percentile service time prod-queries 503.6475 621.016 0.233036995 ms
error rate prod-queries 0 0 0 %
Mean recall@k prod-queries 0.34 0.35 0.029411765  
Mean recall@1 prod-queries 0.495 0.4 -0.191919192  

@0ctopus13prime
Copy link
Contributor Author

0ctopus13prime commented Nov 5, 2024

NMSLIB benchmark conclusion (3 shards)

Unlike Faiss, it is expected there will be a slight improvement in indexing related metrics in NMSLIB.
Cumulative indexing time decreased 5.4% (from 70.26min -> 66.5min), mean bulk indexing throughput has been increased 1.5% (from 7797 docs/sec to 7921 docs/sec)

Benchmark details

<style> </style>
Metric Task Baseline-Value Candidate-Value Diff Unit
Cumulative indexing time of primary shards   70.2665 66.4061 -0.054939409 min
Min cumulative indexing time across primary shards   0 0 0 min
Median cumulative indexing time across primary shards   11.1663 10.4298 -0.06595739 min
Max cumulative indexing time across primary shards   24.5895 23.3949 -0.048581712 min
Cumulative indexing throttle time of primary shards   0 0 0 min
Min cumulative indexing throttle time across primary shards   0 0 0 min
Median cumulative indexing throttle time across primary shards   0 0 0 min
Max cumulative indexing throttle time across primary shards   0 0 0 min
Cumulative merge time of primary shards   547.03 546.824 -0.000376579 min
Cumulative merge count of primary shards   650 617 -0.050769231  
Min cumulative merge time across primary shards   0 0 0 min
Median cumulative merge time across primary shards   88.9109 89.2281 0.003567617 min
Max cumulative merge time across primary shards   188.095 186.289 -0.009601531 min
Cumulative merge throttle time of primary shards   18.9961 20.5987 0.084364685 min
Min cumulative merge throttle time across primary shards   0 0 0 min
Median cumulative merge throttle time across primary shards   2.98061 3.06296 0.027628573 min
Max cumulative merge throttle time across primary shards   6.81933 7.71148 0.130826635 min
Cumulative refresh time of primary shards   0.785367 0.735133 -0.063962453 min
Cumulative refresh count of primary shards   484 477 -0.01446281  
Min cumulative refresh time across primary shards   0 0 0 min
Median cumulative refresh time across primary shards   0.127833 0.110533 -0.135332817 min
Max cumulative refresh time across primary shards   0.273 0.276367 0.012333333 min
Cumulative flush time of primary shards   15.2446 14.0613 -0.077620928 min
Cumulative flush count of primary shards   363 355 -0.022038567  
Min cumulative flush time across primary shards   0 0 0 min
Median cumulative flush time across primary shards   2.37743 2.25187 -0.052813332 min
Max cumulative flush time across primary shards   5.28568 4.94943 -0.063615278 min
Total Young Gen GC time   2.127 2.052 -0.035260931 s
Total Young Gen GC count   179 176 -0.016759777  
Total Old Gen GC time   0 0 0 s
Total Old Gen GC count   0 0 0  
Store size   340.825 340.825 0 GB
Translog size   1.20E-06 1.20E-06 0 GB
Heap used for segments   0 0 0 MB
Heap used for doc values   0 0 0 MB
Heap used for terms   0 0 0 MB
Heap used for norms   0 0 0 MB
Heap used for points   0 0 0 MB
Heap used for stored fields   0 0 0 MB
Segment count   5 5 0  
Min Throughput custom-vector-bulk 4198.96 5499.58 0.309748128 docs/s
Mean Throughput custom-vector-bulk 7797.42 7921.59 0.015924498 docs/s
Median Throughput custom-vector-bulk 7560.44 7625.18 0.008562994 docs/s
Max Throughput custom-vector-bulk 9657.7 9963.47 0.031660747 docs/s
50th percentile latency custom-vector-bulk 68.3138 68.0887 -0.003295088 ms
90th percentile latency custom-vector-bulk 154.12 153.953 -0.001083571 ms
99th percentile latency custom-vector-bulk 258.562 258.702 0.000541456 ms
99.9th percentile latency custom-vector-bulk 370.548 387.22 0.044992821 ms
99.99th percentile latency custom-vector-bulk 506.498 582.752 0.150551434 ms
100th percentile latency custom-vector-bulk 2082.6 1156.62 -0.444626909 ms
50th percentile service time custom-vector-bulk 68.3138 68.0887 -0.003295088 ms
90th percentile service time custom-vector-bulk 154.12 153.953 -0.001083571 ms
99th percentile service time custom-vector-bulk 258.562 258.702 0.000541456 ms
99.9th percentile service time custom-vector-bulk 370.548 387.22 0.044992821 ms
99.99th percentile service time custom-vector-bulk 506.498 582.752 0.150551434 ms
100th percentile service time custom-vector-bulk 2082.6 1156.62 -0.444626909 ms
error rate custom-vector-bulk 0 0 0 %
Min Throughput force-merge-segments 0 0 0 ops/s
Mean Throughput force-merge-segments 0 0 0 ops/s
Median Throughput force-merge-segments 0 0 0 ops/s
Max Throughput force-merge-segments 0 0 0 ops/s
100th percentile latency force-merge-segments 1.50E+07 1.47E+07 -0.024653036 ms
100th percentile service time force-merge-segments 1.50E+07 1.47E+07 -0.024653036 ms
error rate force-merge-segments 0 0 0 %
Min Throughput warmup-indices 0.02 0.02 0 ops/s
Mean Throughput warmup-indices 0.02 0.02 0 ops/s
Median Throughput warmup-indices 0.02 0.02 0 ops/s
Max Throughput warmup-indices 0.02 0.02 0 ops/s
100th percentile latency warmup-indices 47770.6 48938.4 0.024445998 ms
100th percentile service time warmup-indices 47770.6 48938.4 0.024445998 ms
error rate warmup-indices 0 0 0 %
Min Throughput prod-queries 1.68 1.75 0.041666667 ops/s
Mean Throughput prod-queries 7.31 10.71 0.465116279 ops/s
Median Throughput prod-queries 7.31 2.39 -0.673050616 ops/s
Max Throughput prod-queries 12.93 27.99 1.164733179 ops/s
50th percentile latency prod-queries 10.2375 9.81192 -0.041570696 ms
90th percentile latency prod-queries 12.6484 12.481 -0.013234876 ms
99th percentile latency prod-queries 491.371 515.637 0.049384274 ms
100th percentile latency prod-queries 592.629 569.577 -0.03889786 ms
50th percentile service time prod-queries 10.2375 9.81192 -0.041570696 ms
90th percentile service time prod-queries 12.6484 12.481 -0.013234876 ms
99th percentile service time prod-queries 491.371 515.637 0.049384274 ms
100th percentile service time prod-queries 592.629 569.577 -0.03889786 ms
error rate prod-queries 0 0 0 %
Mean recall@k prod-queries 0.51 0.53 0.039215686  
Mean recall@1 prod-queries 0.7 0.79 0.128571429  

@0ctopus13prime
Copy link
Contributor Author

It's merged into both 2.x and main branch!
Will keep an eye on nightly benchmark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement indexing-improvements This label should be attached to all the github issues which will help improving the indexing time. Roadmap:Vector Database/GenAI Project-wide roadmap label v2.19.0
Projects
Status: New
Status: Done
Development

Successfully merging a pull request may close this issue.

5 participants