Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry retriable errors in Neo4j #26211

Merged
merged 10 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions libs/community/langchain_community/graphs/neo4j_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ def get_structured_schema(self) -> Dict[str, Any]:
return self.structured_schema

def query(
self, query: str, params: dict = {}, retry_on_session_expired: bool = True
self,
query: str,
params: dict = {},
) -> List[Dict[str, Any]]:
"""Query Neo4j database.

Expand All @@ -423,26 +425,44 @@ def query(
List[Dict[str, Any]]: The list of dictionaries containing the query results.
"""
from neo4j import Query
from neo4j.exceptions import CypherSyntaxError, SessionExpired
from neo4j.exceptions import Neo4jError

with self._driver.session(database=self._database) as session:
try:
data = session.run(Query(text=query, timeout=self.timeout), params)
json_data = [r.data() for r in data]
if self.sanitize:
json_data = [value_sanitize(el) for el in json_data]
return json_data
except CypherSyntaxError as e:
raise ValueError(f"Generated Cypher Statement is not valid\n{e}")
except (
SessionExpired
) as e: # Session expired is a transient error that can be retried
if retry_on_session_expired:
return self.query(
query, params=params, retry_on_session_expired=False
try:
data, _, _ = self._driver.execute_query(
Query(text=query, timeout=self.timeout),
database=self._database,
parameters_=params,
)
json_data = [r.data() for r in data]
if self.sanitize:
json_data = [value_sanitize(el) for el in json_data]
return json_data
except Neo4jError as e:
if not (
(
( # isCallInTransactionError
e.code == "Neo.DatabaseError.Statement.ExecutionFailed"
or e.code
== "Neo.DatabaseError.Transaction.TransactionStartFailed"
)
else:
raise e
and "in an implicit transaction" in e.message
)
or ( # isPeriodicCommitError
e.code == "Neo.ClientError.Statement.SemanticError"
and (
"in an open transaction is not possible" in e.message
or "tried to execute in an explicit transaction" in e.message
)
)
):
raise
# fallback to allow implicit transactions
with self._driver.session() as session:
data = session.run(Query(text=query, timeout=self.timeout), params)
json_data = [r.data() for r in data]
if self.sanitize:
json_data = [value_sanitize(el) for el in json_data]
return json_data

def refresh_schema(self) -> None:
"""
Expand Down
49 changes: 30 additions & 19 deletions libs/community/langchain_community/vectorstores/neo4j_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,8 @@ def query(
query: str,
*,
params: Optional[dict] = None,
retry_on_session_expired: bool = True,
) -> List[Dict[str, Any]]:
"""
This method sends a Cypher query to the connected Neo4j database
and returns the results as a list of dictionaries.
"""Query Neo4j database with retries and exponential backoff.

Args:
query (str): The Cypher query to execute.
Expand All @@ -608,24 +605,38 @@ def query(
Returns:
List[Dict[str, Any]]: List of dictionaries containing the query results.
"""
from neo4j.exceptions import CypherSyntaxError, SessionExpired
from neo4j import Query
from neo4j.exceptions import Neo4jError

params = params or {}
with self._driver.session(database=self._database) as session:
try:
data = session.run(query, params)
return [r.data() for r in data]
except CypherSyntaxError as e:
raise ValueError(f"Cypher Statement is not valid\n{e}")
except (
SessionExpired
) as e: # Session expired is a transient error that can be retried
if retry_on_session_expired:
return self.query(
query, params=params, retry_on_session_expired=False
try:
data, _, _ = self._driver.execute_query(
query, database=self._database, parameters_=params
)
return [r.data() for r in data]
except Neo4jError as e:
if not (
(
( # isCallInTransactionError
e.code == "Neo.DatabaseError.Statement.ExecutionFailed"
or e.code
== "Neo.DatabaseError.Transaction.TransactionStartFailed"
)
else:
raise e
and "in an implicit transaction" in e.message
)
or ( # isPeriodicCommitError
e.code == "Neo.ClientError.Statement.SemanticError"
and (
"in an open transaction is not possible" in e.message
or "tried to execute in an explicit transaction" in e.message
)
)
):
raise
# Fallback to allow implicit transactions
with self._driver.session() as session:
data = session.run(Query(text=query), params)
return [r.data() for r in data]

def verify_version(self) -> None:
"""
Expand Down
Loading