Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agents-api): Optimize Search Queries NLP processing pipeline #735

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from

Conversation

HamadaSalhab
Copy link
Contributor

@HamadaSalhab HamadaSalhab commented Oct 23, 2024

Important

Optimized NLP processing in nlp.py with caching, batch processing, and enhanced keyword extraction and query building.

  • Performance Optimization:
    • Introduced KeywordMatcher singleton with batch processing in nlp.py for efficient keyword matching.
    • Added lru_cache to clean_keyword() and _create_pattern() for caching results.
    • Optimized extract_keywords() to process spans in a single pass and count frequencies efficiently.
  • Functionality Changes:
    • Modified paragraph_to_custom_queries() to include min_keywords parameter for filtering low-value queries.
    • Enhanced find_proximity_groups() with sorted positions and union-find for efficient grouping.
    • Improved build_query() with cached patterns for query construction.
  • Miscellaneous:
    • Precompiled regex patterns for whitespace and non-alphanumeric characters.
    • Disabled unused components in spaCy pipeline for performance.

This description was created by Ellipsis for e057db8. It will automatically update as commits are pushed.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good to me! Reviewed everything up to e057db8 in 1 minute and 6 seconds

More details
  • Looked at 412 lines of code in 1 files
  • Skipped 0 files when reviewing.
  • Skipped posting 5 drafted comments based on config settings.
1. agents-api/agents_api/common/nlp.py:36
  • Draft comment:
    Consider reducing the cache size for _create_pattern to avoid excessive memory usage, as storing many Doc objects can be memory-intensive.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The use of lru_cache on _create_pattern and clean_keyword is beneficial for performance, but the cache size for _create_pattern might be too large given the potential memory usage of storing many Doc objects. A smaller cache size might be more appropriate.
2. agents-api/agents_api/common/nlp.py:32
  • Draft comment:
    The batch_size and patterns_cache attributes in KeywordMatcher are defined but not used. Consider removing them if they are not needed, or implement their intended functionality.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The KeywordMatcher class uses a singleton pattern, which is appropriate for shared resources like the PhraseMatcher. However, the batch_size and patterns_cache attributes are not used in the current implementation, which might indicate leftover code or incomplete functionality.
3. agents-api/agents_api/common/nlp.py:93
  • Draft comment:
    The seen_texts set in extract_keywords might be redundant since normalized_keywords are already processed in lowercase. Consider removing it if deduplication is not needed at this stage.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The extract_keywords function uses a set to track seen texts, which is efficient for deduplication. However, the seen_texts set is populated with lowercase text, which might not be necessary since normalized_keywords are already processed in lowercase. This could be redundant.
4. agents-api/agents_api/common/nlp.py:153
  • Draft comment:
    Consider using a collections.deque for the window in find_proximity_groups to optimize the removal of elements from the left, which is more efficient than using a list.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The find_proximity_groups function uses a sliding window to check proximity, which is efficient. However, the window list could be optimized by using a deque for faster pops from the left.
5. agents-api/agents_api/common/nlp.py:174
  • Draft comment:
    Consider increasing the cache size for build_query_pattern if there are many different group sizes to reduce potential cache misses.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The build_query function uses build_query_pattern to cache patterns, which is efficient. However, the cache size of 100 might be too small if there are many different group sizes, potentially leading to cache misses.

Workflow ID: wflow_V81GHvlhMZGn8Tcp


You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

@HamadaSalhab HamadaSalhab marked this pull request as draft October 23, 2024 17:14
@creatorrr
Copy link
Contributor

Try this instead:

import re
from collections import Counter, defaultdict
from functools import lru_cache
from typing import List, Set, Dict, Tuple

import spacy
from spacy.matcher import PhraseMatcher
from spacy.tokens import Doc
from spacy.util import filter_spans

# Precompile regex patterns
WHITESPACE_RE = re.compile(r"\s+")
NON_ALPHANUM_RE = re.compile(r"[^\w\s\-_]+")

# Initialize spaCy with minimal pipeline
nlp = spacy.load("en_core_web_sm", exclude=["lemmatizer", "textcat", "parser", "tok2vec"])

# Add sentencizer for faster sentence tokenization
sentencizer = nlp.add_pipe("sentencizer")

# Singleton PhraseMatcher for better performance
class KeywordMatcher:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.matcher = PhraseMatcher(nlp.vocab, attr="LOWER")
            cls._instance.batch_size = 1000  # Adjust based on memory constraints
            cls._instance.patterns_cache = {}
        return cls._instance

    @lru_cache(maxsize=10000)
    def _create_pattern(self, text: str) -> Doc:
        return nlp.make_doc(text)

    def find_matches(self, doc: Doc, keywords: List[str]) -> Dict[str, List[int]]:
        """Batch process keywords for better performance."""
        keyword_positions = defaultdict(list)

        # Process keywords in batches to avoid memory issues
        for i in range(0, len(keywords), self.batch_size):
            batch = keywords[i:i + self.batch_size]
            patterns = [self._create_pattern(kw) for kw in batch]

            # Clear previous patterns and add new batch
            if "KEYWORDS" in self.matcher.labels:
                self.matcher.remove("KEYWORDS")
            self.matcher.add("KEYWORDS", patterns)

            # Find matches for this batch
            matches = self.matcher(doc)
            for match_id, start, end in matches:
                span_text = doc[start:end].text
                normalized = WHITESPACE_RE.sub(" ", span_text).lower().strip()
                keyword_positions[normalized].append(start)

        return keyword_positions

# Initialize global matcher
keyword_matcher = KeywordMatcher()

@lru_cache(maxsize=10000)
def clean_keyword(kw: str) -> str:
    """Cache cleaned keywords for reuse."""
    return NON_ALPHANUM_RE.sub("", kw).strip()

def extract_keywords(doc: Doc, top_n: int = 10, clean: bool = True) -> List[str]:
    """Optimized keyword extraction with minimal behavior change."""
    excluded_labels = {"DATE", "TIME", "PERCENT", "MONEY", "QUANTITY", "ORDINAL", "CARDINAL"}

    # Extract and filter spans in a single pass
    ent_spans = [ent for ent in doc.ents if ent.label_ not in excluded_labels]
    chunk_spans = [chunk for chunk in doc.noun_chunks if not chunk.root.is_stop]
    all_spans = filter_spans(ent_spans + chunk_spans)

    # Process spans efficiently
    keywords = []
    seen_texts = set()

    for span in all_spans:
        text = span.text.strip()
        lower_text = text.lower()

        # Skip empty or seen texts
        if not text or lower_text in seen_texts:
            continue

        seen_texts.add(lower_text)
        keywords.append(text)

    # Normalize keywords by replacing multiple spaces with single space and stripping
    normalized_keywords = [WHITESPACE_RE.sub(" ", kw).strip() for kw in keywords]

    # Count frequencies efficiently
    freq = Counter(normalized_keywords)
    top_keywords = [kw for kw, _ in freq.most_common(top_n)]

    if clean:
        return [clean_keyword(kw) for kw in top_keywords]
    return top_keywords

def find_proximity_groups(
    keywords: List[str],
    keyword_positions: Dict[str, List[int]],
    n: int = 10
) -> List[Set[str]]:
    """Optimized proximity grouping using sorted positions."""
    # Early return for single or no keywords
    if len(keywords) <= 1:
        return [{kw} for kw in keywords]

    # Create flat list of positions for efficient processing
    positions: List[Tuple[int, str]] = [
        (pos, kw)
        for kw in keywords
        for pos in keyword_positions[kw]
    ]

    # Sort positions once
    positions.sort()

    # Initialize Union-Find with path compression and union by rank
    parent = {kw: kw for kw in keywords}
    rank = {kw: 0 for kw in keywords}

    def find(u: str) -> str:
        if parent[u] != u:
            parent[u] = find(parent[u])
        return parent[u]

    def union(u: str, v: str) -> None:
        u_root, v_root = find(u), find(v)
        if u_root != v_root:
            if rank[u_root] < rank[v_root]:
                u_root, v_root = v_root, u_root
            parent[v_root] = u_root
            if rank[u_root] == rank[v_root]:
                rank[u_root] += 1

    # Use sliding window for proximity checking
    window = []
    for pos, kw in positions:
        # Remove positions outside window
        while window and pos - window[0][0] > n:
            window.pop(0)

        # Union with all keywords in window
        for _, w_kw in window:
            union(kw, w_kw)

        window.append((pos, kw))

    # Group keywords efficiently
    groups = defaultdict(set)
    for kw in keywords:
        root = find(kw)
        groups[root].add(kw)

    return list(groups.values())

def build_query_pattern(group_size: int, n: int) -> str:
    """Cache query patterns for common group sizes."""
    if group_size == 1:
        return '"{}"'
    return f'NEAR/{n}(' + " ".join('"{}"' for _ in range(group_size)) + ")"

def build_query(groups: List[Set[str]], n: int = 10) -> str:
    """Build query with cached patterns."""
    clauses = []

    for group in groups:
        if len(group) == 1:
            clauses.append(f'"{next(iter(group))}"')
        else:
            # Sort by length descending to prioritize longer phrases
            sorted_group = sorted(group, key=len, reverse=True)
            # Get cached pattern and format with keywords
            pattern = build_query_pattern(len(group), n)
            clause = pattern.format(*sorted_group)
            clauses.append(clause)

    return " OR ".join(clauses)

@lru_cache(maxsize=100)
def paragraph_to_custom_queries(
    paragraph: str,
    top_n: int = 10,
    proximity_n: int = 10,
    min_keywords: int = 1
) -> List[str]:
    """
    Optimized paragraph processing with minimal behavior changes.
    Added min_keywords parameter to filter out low-value queries.
    
    Args:
        paragraph (str): The input paragraph to convert.
        top_n (int): Number of top keywords to extract per sentence.
        proximity_n (int): The proximity window for NEAR/n.
        min_keywords (int): Minimum number of keywords required to form a query.
    
    Returns:
        List[str]: The list of custom query strings.
    """
    if not paragraph or not paragraph.strip():
        return []

    # Process entire paragraph once
    doc = nlp(paragraph)
    queries = []

    # Process sentences
    for sent in doc.sents:
        # Convert to doc for consistent API
        sent_doc = sent.as_doc()

        # Extract and clean keywords
        keywords = extract_keywords(sent_doc, top_n)
        if len(keywords) < min_keywords:
            continue

        # Find keyword positions using matcher
        keyword_positions = keyword_matcher.find_matches(sent_doc, keywords)

        # Skip if no keywords found in positions
        if not keyword_positions:
            continue

        # Find proximity groups and build query
        groups = find_proximity_groups(keywords, keyword_positions, proximity_n)
        query = build_query(groups, proximity_n)

        if query:
            queries.append(query)

    return queries

def batch_paragraphs_to_custom_queries(
    paragraphs: List[str],
    top_n: int = 10,
    proximity_n: int = 10,
    min_keywords: int = 1,
    n_process: int = 1
) -> List[List[str]]:
    """
    Processes multiple paragraphs using nlp.pipe for better performance.
    
    Args:
        paragraphs (List[str]): List of paragraphs to process.
        top_n (int): Number of top keywords to extract per sentence.
        proximity_n (int): The proximity window for NEAR/n.
        min_keywords (int): Minimum number of keywords required to form a query.
        n_process (int): Number of processes to use for multiprocessing.
    
    Returns:
        List[List[str]]: A list where each element is a list of queries for a paragraph.
    """
    results = []
    for doc in nlp.pipe(paragraphs, disable=["lemmatizer", "textcat", "parser"], n_process=n_process):
        queries = []
        for sent in doc.sents:
            sent_doc = sent.as_doc()
            keywords = extract_keywords(sent_doc, top_n)
            if len(keywords) < min_keywords:
                continue
            keyword_positions = keyword_matcher.find_matches(sent_doc, keywords)
            if not keyword_positions:
                continue
            groups = find_proximity_groups(keywords, keyword_positions, proximity_n)
            query = build_query(groups, proximity_n)
            if query:
                queries.append(query)
        results.append(queries)
    return results

# Usage Example
if __name__ == "__main__":
    sample_paragraph = """
    OpenAI has developed several AI models. The latest model, GPT-4, offers improved performance over its predecessors.
    Many industries are adopting AI technologies to enhance their operations.
    """

    queries = paragraph_to_custom_queries(
        sample_paragraph,
        top_n=5,
        proximity_n=10,
        min_keywords=1
    )
    for idx, query in enumerate(queries, 1):
        print(f"Query {idx}: {query}")

    # Example of batch processing
    sample_paragraphs = [
        "OpenAI has developed several AI models. The latest model, GPT-4, offers improved performance over its predecessors.",
        "Many industries are adopting AI technologies to enhance their operations."
    ]

    batch_queries = batch_paragraphs_to_custom_queries(
        sample_paragraphs,
        top_n=5,
        proximity_n=10,
        min_keywords=1,
        n_process=2  # Adjust based on your CPU cores
    )
    for para_idx, para_queries in enumerate(batch_queries, 1):
        print(f"\nParagraph {para_idx} Queries:")
        for q_idx, query in enumerate(para_queries, 1):
            print(f"  Query {q_idx}: {query}")

@creatorrr
Copy link
Contributor

  • test it thoroughly
  • compare time of execution end-to-end
  • play with n_processes = 2
  • add gunicorn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants