diff --git a/tests/stub/tx_run/scripts/tx_error_on_pull.script b/tests/stub/tx_run/scripts/tx_error_on_pull.script new file mode 100644 index 000000000..79091522e --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_error_on_pull.script @@ -0,0 +1,15 @@ +!: BOLT 5.3 + +A: HELLO {"{}": "*"} +A: LOGON {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "failing on pull" {"{}": "*"} {"{}": "*"} +S: SUCCESS {"fields": ["n"], "qid": 1} +C: PULL {"n": {"Z": "*"}, "[qid]": -1} +S: RECORD [1] + RECORD [2] + FAILURE {"code": "Neo.ClientError.MadeUp.Code", "message": "message"} +*: RESET +?: GOODBYE diff --git a/tests/stub/tx_run/scripts/tx_successful_and_failing_on_pull_streams.script b/tests/stub/tx_run/scripts/tx_successful_and_failing_on_pull_streams.script new file mode 100644 index 000000000..5fa5f9a75 --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_successful_and_failing_on_pull_streams.script @@ -0,0 +1,29 @@ +!: BOLT 5.3 + +A: HELLO {"{}": "*"} +A: LOGON {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {"{}": "*"} {"{}": "*"} +S: SUCCESS {"fields": ["n"], "qid": 1} +{? + {{ + C: PULL {"n": {"Z": "*"}, "[qid]": -1} + ---- + C: PULL {"n": {"Z": "*"}, "qid": 1} + }} + S: RECORD [1] + RECORD [2] + SUCCESS {"has_more": true, "type": "r"} +?} +C: RUN "failing on pull" {"{}": "*"} {"{}": "*"} +S: SUCCESS {"fields": ["n"], "qid": 2} +{{ + C: PULL {"n": {"Z": "*"}, "[qid]": -1} +---- + C: PULL {"n": {"Z": "*"}, "qid": 2} +}} +S: FAILURE {"code": "Neo.ClientError.MadeUp.Code", "message": "message"} +*: RESET +?: GOODBYE diff --git a/tests/stub/tx_run/scripts/tx_successful_and_failing_on_run_streams.script b/tests/stub/tx_run/scripts/tx_successful_and_failing_on_run_streams.script new file mode 100644 index 000000000..d9c6ead28 --- /dev/null +++ b/tests/stub/tx_run/scripts/tx_successful_and_failing_on_run_streams.script @@ -0,0 +1,27 @@ +!: BOLT 5.3 + +A: HELLO {"{}": "*"} +A: LOGON {"{}": "*"} +*: RESET +C: BEGIN {"{}": "*"} +S: SUCCESS {} +C: RUN "RETURN 1 AS n" {"{}": "*"} {"{}": "*"} +S: SUCCESS {"fields": ["n"], "qid": 1} +{? + {{ + C: PULL {"n": {"Z": "*"}, "[qid]": -1} + ---- + C: PULL {"n": {"Z": "*"}, "qid": 1} + }} + S: RECORD [1] + RECORD [2] + SUCCESS {"has_more": true, "type": "r"} +?} +C: RUN "invalid" {"{}": "*"} {"{}": "*"} +S: FAILURE {"code": "Neo.ClientError.Statement.SyntaxError", "message": "Invalid input"} +{? + C: PULL {"n": {"Z": "*"}, "[qid]": -1} + S: IGNORED +?} +*: RESET +?: GOODBYE diff --git a/tests/stub/tx_run/test_tx_run.py b/tests/stub/tx_run/test_tx_run.py index 5dafd6808..8f9976a0a 100644 --- a/tests/stub/tx_run/test_tx_run.py +++ b/tests/stub/tx_run/test_tx_run.py @@ -258,3 +258,192 @@ def test_failed_tx_run_allows_rollback(self): def test_failed_tx_run_allows_skipping_rollback(self): self._test_failed_tx_run(rollback=False) + + def test_should_prevent_pull_after_transaction_termination_on_run(self): + # TODO remove this block once all languages work + if get_driver_name() in ["go", "javascript", "dotnet", "python"]: + self.skipTest("requires investigation") + + def _test(): + self._create_direct_driver() + script = "tx_successful_and_failing_on_run_streams.script" + self._server1.start(path=self.script_path(script)) + self._session = self._driver.session("r", fetch_size=2) + tx = self._session.begin_transaction() + res = tx.run("RETURN 1 AS n") + + # begin another stream that fails on RUN + with self.assertRaises(types.DriverError) as exc: + failed_res = tx.run("invalid") + if get_driver_name() in ["javascript"]: + failed_res.next() + self.assertEqual(exc.exception.code, + "Neo.ClientError.Statement.SyntaxError") + self._assert_is_client_exception(exc) + + # while already buffered records may be accessible, there must be + # no further PULL and an exception must be raised + with self.assertRaises(types.DriverError) as exc: + if iterate == "true": + for _i in range(0, 3): + res.next() + else: + fetch_all = types.Feature.OPT_RESULT_LIST_FETCH_ALL + if self.driver_supports_features(fetch_all): + res.list() + else: + # if the fetch all is not supported, only explicit + # iteration can be tested + list(res) + # the streaming result cursors surface the termination exception + self.assertEqual(exc.exception.code, + "Neo.ClientError.Statement.SyntaxError") + self._assert_is_client_exception(exc) + + tx.close() + self._session.close() + self._session = None + self._server1.done() + for iterate in ["true", "false"]: + with self.subTest(iterate=iterate): + _test() + self._server1.reset() + + def test_should_prevent_discard_after_transaction_termination_on_run(self): + # TODO remove this block once all languages work + if get_driver_name() in ["go", "javascript", "dotnet", "python"]: + self.skipTest("requires investigation") + + self._create_direct_driver() + script = "tx_successful_and_failing_on_run_streams.script" + self._server1.start(path=self.script_path(script)) + self._session = self._driver.session("r", fetch_size=2) + tx = self._session.begin_transaction() + res = tx.run("RETURN 1 AS n") + + # begin another stream that fails on RUN + with self.assertRaises(types.DriverError) as exc: + failed_res = tx.run("invalid") + if get_driver_name() in ["javascript"]: + failed_res.next() + self.assertEqual(exc.exception.code, + "Neo.ClientError.Statement.SyntaxError") + self._assert_is_client_exception(exc) + + with self.assertRaises(types.DriverError) as exc: + res.consume() + # the streaming result cursors surface the termination exception + self.assertEqual(exc.exception.code, + "Neo.ClientError.Statement.SyntaxError") + self._assert_is_client_exception(exc) + + tx.close() + self._session.close() + self._session = None + self._server1.done() + + def test_should_prevent_run_after_transaction_termination_on_pull(self): + # TODO remove this block once all languages work + if get_driver_name() in ["go", "javascript", "dotnet", "python"]: + self.skipTest("requires investigation") + + def _test(): + self._create_direct_driver() + script = "tx_error_on_pull.script" + self._server1.start(path=self.script_path(script)) + self._session = self._driver.session("r", fetch_size=2) + tx = self._session.begin_transaction() + res = tx.run("failing on pull") + + # streaming fails on PULL + with self.assertRaises(types.DriverError) as exc: + if iterate == "true": + for _i in range(0, 3): + res.next() + else: + fetch_all = types.Feature.OPT_RESULT_LIST_FETCH_ALL + if self.driver_supports_features(fetch_all): + res.list() + else: + # if the fetch all is not supported, only explicit + # iteration can be tested + list(res) + self.assertEqual(exc.exception.code, + "Neo.ClientError.MadeUp.Code") + self._assert_is_client_exception(exc) + + with self.assertRaises(types.DriverError) as exc: + tx.run("invalid") + # initiating actions of transaction throw terminated + self._assert_is_tx_terminated_exception(exc) + + tx.close() + self._session.close() + self._session = None + self._server1.done() + for iterate in ["true", "false"]: + with self.subTest(iterate=iterate): + _test() + self._server1.reset() + + def test_should_prevent_pull_after_transaction_termination_on_pull(self): + # TODO remove this block once all languages work + if get_driver_name() in ["go", "javascript", "dotnet", "python"]: + self.skipTest("requires investigation") + + def _test(): + self._create_direct_driver() + script = "tx_successful_and_failing_on_pull_streams.script" + self._server1.start(path=self.script_path(script)) + self._session = self._driver.session("r", fetch_size=2) + tx = self._session.begin_transaction() + res = tx.run("RETURN 1 AS n") + + # fail on PULL + with self.assertRaises(types.DriverError) as exc: + failed_res = tx.run("failing on pull") + failed_res.next() + self.assertEqual(exc.exception.code, + "Neo.ClientError.MadeUp.Code") + self._assert_is_client_exception(exc) + + # fail on iteration + with self.assertRaises(types.DriverError): + if iterate == "true": + for _i in range(0, 3): + res.next() + else: + fetch_all = types.Feature.OPT_RESULT_LIST_FETCH_ALL + if self.driver_supports_features(fetch_all): + res.list() + else: + # if the fetch all is not supported, only explicit + # iteration can be tested + list(res) + # the streaming result cursors surface the termination exception + self.assertEqual(exc.exception.code, + "Neo.ClientError.MadeUp.Code") + self._assert_is_client_exception(exc) + + tx.close() + self._session.close() + self._session = None + self._server1.done() + for iterate in ["true", "false"]: + with self.subTest(iterate=iterate): + _test() + self._server1.reset() + + def _assert_is_client_exception(self, e): + if get_driver_name() in ["java"]: + self.assertEqual( + "org.neo4j.driver.exceptions.ClientException", + e.exception.errorType + ) + + def _assert_is_tx_terminated_exception(self, e): + if get_driver_name() in ["java"]: + self.assertEqual( + "org.neo4j.driver.exceptions.TransactionTerminatedException", + e.exception.errorType + )