Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query_id of last executed statement to Adapter Response #891

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

KeltonKarboviak
Copy link

@KeltonKarboviak KeltonKarboviak commented Jul 31, 2024

resolves #892
docs dbt-labs/docs.getdbt.com/#

Problem

I want to be able to capture the query_id of the statement used to load a model.

Solution

I ripped this off of the dbt-snowflake adapter and how they are setting query_id on the adapter response:

https://github.com/dbt-labs/dbt-snowflake/blob/f95b9192f6eec9af4e30eaab87f9e3412febf7d1/dbt/adapters/snowflake/connections.py#L456-L461

I realize it is different from Snowflake in that appears that the query ID is available directly on the cursor object, and with Redshift you need to use the pg_last_query_id() function in order to get the query ID.

I'm sure this doesn't handle 100% of accurately capturing the correct query ID for a particular load (I'm sure there are cases where multiple statements are run in order to perform the model or snapshot load), but I think this is a good start.

Please let me know what I'm missing here in the potential of this breaking something else due to injecting another another query into a model load flow.

Checklist

  • I have read the contributing guide and understand what's expected of me
  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • This PR has no interface changes (e.g. macros, cli, logs, json artifacts, config files, adapter interface, etc) or this PR has already received feedback and approval from Product or DX

I want to be able to capture the `query_id` of the statement used to load a model.

I ripped this off of the dbt-snowflake adapter and how they are setting `query_id` on the adapter response:

https://github.com/dbt-labs/dbt-snowflake/blob/f95b9192f6eec9af4e30eaab87f9e3412febf7d1/dbt/adapters/snowflake/connections.py#L456-L461

I'm sure this doesn't handle 100% of accurately capturing the correct query ID for a particular load (I'm sure there are cases where multiple statements are run in order to perform the model or snapshot load), but I think this is a good start.
@cla-bot cla-bot bot added the cla:yes label Jul 31, 2024
@KeltonKarboviak
Copy link
Author

Well there is definitely a side effect to running the _get_last_query_id method. After adding that and running a single functional test, it blows up:

$ .tox/py38-redshift/bin/python -m pytest -vvv -s --dist=loadscope tests/functional/adapter/test_basic.py::TestSimpleMaterializationsRedshift::test_base
============================================================================= test session starts ==============================================================================
platform darwin -- Python 3.8.14, pytest-7.4.4, pluggy-1.5.0 -- /Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/bin/python
cachedir: .pytest_cache
rootdir: /Users/kelton.karboviak/Code/dbt-redshift
configfile: pytest.ini
plugins: ddtrace-2.3.0, csv-3.0.0, logbook-1.2.0, dotenv-0.5.2, xdist-3.6.1
collected 1 item

tests/functional/adapter/test_basic.py::TestSimpleMaterializationsRedshift::test_base
=== Test project_root: /private/var/folders/s8/7lghdz255_lb4khl4xpwvz3j50ydy1/T/pytest-of-kelton.karboviak/pytest-18/project0


Invoking dbt with ['seed']
19:53:50  Running with dbt=1.9.0-a1
19:53:50  Registered adapter: redshift=1.9.0-a1
19:53:50  Unable to do partial parsing because saved manifest not found. Starting full parse.
19:53:51  Found 3 models, 1 seed, 1 source, 484 macros
19:53:51
19:53:53  Concurrency: 1 threads (target='default')
19:53:53
19:53:53  1 of 1 START seed file test17224556293545641964_test_basic.base ................ [RUN]
19:53:54  1 of 1 OK loaded seed file test17224556293545641964_test_basic.base ............ [INSERT 10 in 1.02s]
19:53:55
19:53:55  Finished running 1 seed in 0 hours 0 minutes and 3.73 seconds (3.73s).
19:53:55
19:53:55  Completed successfully
19:53:55
19:53:55  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1


Invoking dbt with ['run']
19:53:55  Running with dbt=1.9.0-a1
19:53:55  Registered adapter: redshift=1.9.0-a1
19:53:55  Found 3 models, 1 seed, 1 source, 484 macros
19:53:55
19:53:57  Concurrency: 1 threads (target='default')
19:53:57
19:53:57  1 of 3 START sql table model test17224556293545641964_test_basic.swappable ..... [RUN]
19:53:59  1 of 3 OK created sql table model test17224556293545641964_test_basic.swappable  [SUCCESS in 1.36s]
19:53:59  2 of 3 START sql table model test17224556293545641964_test_basic.table_model ... [RUN]
19:54:00  2 of 3 OK created sql table model test17224556293545641964_test_basic.table_model  [SUCCESS in 1.38s]
19:54:00  3 of 3 START sql view model test17224556293545641964_test_basic.view_model ..... [RUN]
19:54:01  3 of 3 OK created sql view model test17224556293545641964_test_basic.view_model  [SUCCESS in 1.52s]
19:54:02
19:54:02  Finished running 2 table models, 1 view model in 0 hours 0 minutes and 6.98 seconds (6.98s).
19:54:02
19:54:02  Completed successfully
19:54:02
19:54:02  Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
BASIS "dbt_functional_tests"."test17224556293545641964_test_basic"."base"
COMPARES [<RedshiftRelation "dbt_functional_tests"."test17224556293545641964_test_basic"."view_model">, <RedshiftRelation "dbt_functional_tests"."test17224556293545641964_test_basic"."table_model">, <RedshiftRelation "dbt_functional_tests"."test17224556293545641964_test_basic"."swappable">]
FAILED

=================================================================================== FAILURES ===================================================================================
_________________________________________________________________ TestSimpleMaterializationsRedshift.test_base _________________________________________________________________

self = <redshift_connector.core.Connection object at 0x12497e640>, cursor = <redshift_connector.cursor.Cursor object at 0x12494d5b0>
operation = '/* {"app": "dbt", "dbt_version": "1.9.0a1", "profile_name": "test", "target_name": "default", "connection_name": "_te...s row_count_difference,\n    diff_count.num_missing as num_mismatched\nfrom row_count_diff\njoin diff_count using (id)'
vals = None

    def execute(self: "Connection", cursor: Cursor, operation: str, vals) -> None:
        """
        Executes a database operation. Parameters may be provided as a sequence, or as a mapping, depending upon the value of `redshift_connector.paramstyle`.

        Parameters
        ----------
        cursor : :class:`Cursor`
        operation : str The SQL statement to execute.
        vals : If `redshift_connector.paramstyle` is `qmark`, `numeric`, or `format` this argument should be an array of parameters to bind into the statement. If `redshift_connector.paramstyle` is `named` the argument should be a `dict` mapping of parameters. If `redshift_connector.paramstyle` is `pyformat`, the argument value may be either an array or mapping.

        Returns
        -------
        None:None
        """
        _logger.debug("Connection.execute()")

        # get the process ID of the calling process.
        pid: int = getpid()

        args: typing.Tuple[typing.Optional[typing.Tuple[str, typing.Any]], ...] = ()
        # transforms user provided bind parameters to server friendly bind parameters
        params: typing.Tuple[typing.Optional[typing.Tuple[int, int, typing.Callable]], ...] = ()
        has_bind_parameters: bool = False if vals is None else True
        # multi dimensional dictionary to store the data
        # cache = self._caches[cursor.paramstyle][pid]
        # cache = {'statement': {}, 'ps': {}}
        # statement stores the data of the statement, ps store the data of the prepared statement
        # statement = {operation(query): tuple from 'convert_paramstyle'(statement, make_args)}
        try:
            cache = self._caches[cursor.paramstyle][pid]
        except KeyError:
            try:
                param_cache = self._caches[cursor.paramstyle]
            except KeyError:
                param_cache = self._caches[cursor.paramstyle] = {}

            try:
                cache = param_cache[pid]
            except KeyError:
                cache = param_cache[pid] = {"statement": {}, "ps": {}}

        try:
            statement, make_args = cache["statement"][operation]
        except KeyError:
            if has_bind_parameters:
                statement, make_args = cache["statement"][operation] = convert_paramstyle(cursor.paramstyle, operation)
            else:
                # use a no-op make_args in lieu of parsing the sql statement
                statement, make_args = cache["statement"][operation] = operation, lambda p: ()
        if has_bind_parameters:
            args = make_args(vals)
            _logger.debug("User provided vals converted to %s args", len(args))
            # change the args to the format that the DB will identify
            # take reference from self.py_types
            params = self.make_params(args)
            _logger.debug("args converted to %s params", len(params))
        key = operation, params

        try:
>           ps = cache["ps"][key]
E           KeyError: ('/* {"app": "dbt", "dbt_version": "1.9.0a1", "profile_name": "test", "target_name": "default", "connection_name": "_test"} */\nwith diff_count as (\n    SELECT\n        1 as id,\n        COUNT(*) as num_missing FROM (\n            (SELECT  FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."base" EXCEPT\n             SELECT  FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."view_model")\n             UNION ALL\n            (SELECT  FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."view_model" EXCEPT\n             SELECT  FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."base")\n        ) as a\n), table_a as (\n    SELECT COUNT(*) as num_rows FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."base"\n), table_b as (\n    SELECT COUNT(*) as num_rows FROM "dbt_functional_tests"."test17224556293545641964_test_basic"."view_model"\n), row_count_diff as (\n    select\n        1 as id,\n        table_a.num_rows - table_b.num_rows as difference\n    from table_a, table_b\n)\nselect\n    row_count_diff.difference as row_count_difference,\n    diff_count.num_missing as num_mismatched\nfrom row_count_diff\njoin diff_count using (id)', ())

/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/redshift_connector/core.py:1821: KeyError

During handling of the above exception, another exception occurred:

self = <dbt.adapters.redshift.connections.RedshiftConnectionManager object at 0x12448b700>
sql = '/* {"app": "dbt", "dbt_version": "1.9.0a1", "profile_name": "test", "target_name": "default", "connection_name": "_te...s row_count_difference,\n    diff_count.num_missing as num_mismatched\nfrom row_count_diff\njoin diff_count using (id)'

    @contextmanager
    def exception_handler(self, sql):
        try:
>           yield

/Users/kelton.karboviak/Code/dbt-redshift/dbt/adapters/redshift/connections.py:345:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/adapters/sql/connections.py:93: in add_query
    cursor.execute(sql, bindings)
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/redshift_connector/cursor.py:248: in execute
    raise e
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/redshift_connector/cursor.py:241: in execute
    self._c.execute(self, operation, args)
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/redshift_connector/core.py:1902: in execute
    self.handle_messages(cursor)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <redshift_connector.core.Connection object at 0x12497e640>, cursor = <redshift_connector.cursor.Cursor object at 0x12494d5b0>

    def handle_messages(self: "Connection", cursor: Cursor) -> None:
        """
        Reads messages formatted in ordinance with Amazon Redshift wire protocol, modifying the connection and cursor.

        Parameters
        ----------
        :param cursor: `Cursor`
            The `Cursor` object associated with the given connection object.

        Returns
        -------
        None:None
        """
        code = self.error = None

        while code != READY_FOR_QUERY:
            buffer = self._read(5)

            if len(buffer) == 0:
                if self._usock.timeout is not None:
                    raise InterfaceError(
                        "BrokenPipe: server socket closed. We noticed a timeout is set for this connection. Consider "
                        "raising the timeout or defaulting timeout to none."
                    )
                else:
                    raise InterfaceError(
                        "BrokenPipe: server socket closed. Please check that client side networking configurations such "
                        "as Proxies, firewalls, VPN, etc. are not affecting your network connection."
                    )

            code, data_len = ci_unpack(buffer)
            _logger.debug("Message received from BE with code %s length %s ", code, data_len)
            self.message_types[code](self._read(data_len - 4), cursor)

        if self.error is not None:
>           raise self.error
E           redshift_connector.error.ProgrammingError: {'S': 'ERROR', 'C': '42601', 'M': 'syntax error at or near "FROM"', 'P': '235', 'F': '/home/ec2-user/padb/src/pg/src/backend/parser/parser_scan.l', 'L': '828', 'R': 'yyerror'}

/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/redshift_connector/core.py:2194: ProgrammingError

The above exception was the direct cause of the following exception:

self = <test_basic.TestSimpleMaterializationsRedshift object at 0x1201258e0>, project = <dbt.tests.fixtures.project.TestProjInfo object at 0x120ecd940>

    @pytest.mark.flaky
    def test_base(self, project):
>       super().test_base(project)

/Users/kelton.karboviak/Code/dbt-redshift/tests/functional/adapter/test_basic.py:43:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/tests/adapter/basic/test_base.py:75: in test_base
    check_relations_equal(project.adapter, ["base", "view_model", "table_model", "swappable"])
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/tests/util.py:402: in check_relations_equal
    return check_relations_equal_with_relations(
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/tests/util.py:442: in check_relations_equal_with_relations
    _, tbl = adapter.execute(sql, fetch=True)
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/adapters/base/impl.py:356: in execute
    return self.connections.execute(sql=sql, auto_begin=auto_begin, fetch=fetch, limit=limit)
/Users/kelton.karboviak/Code/dbt-redshift/dbt/adapters/redshift/connections.py:423: in execute
    _, cursor = self.add_query(sql, auto_begin)
/Users/kelton.karboviak/Code/dbt-redshift/dbt/adapters/redshift/connections.py:451: in add_query
    connection, cursor = super().add_query(
/Users/kelton.karboviak/Code/dbt-redshift/.tox/py38-redshift/lib/python3.8/site-packages/dbt/adapters/sql/connections.py:103: in add_query
    return connection, cursor
/Users/kelton.karboviak/.asdf/installs/python/3.8.14/lib/python3.8/contextlib.py:131: in __exit__
    self.gen.throw(type, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <dbt.adapters.redshift.connections.RedshiftConnectionManager object at 0x12448b700>
sql = '/* {"app": "dbt", "dbt_version": "1.9.0a1", "profile_name": "test", "target_name": "default", "connection_name": "_te...s row_count_difference,\n    diff_count.num_missing as num_mismatched\nfrom row_count_diff\njoin diff_count using (id)'

    @contextmanager
    def exception_handler(self, sql):
        try:
            yield
        except redshift_connector.DatabaseError as e:
            try:
                err_msg = e.args[0]["M"]  # this is a type redshift sets, so we must use these keys
            except Exception:
                err_msg = str(e).strip()
            logger.debug(f"Redshift error: {err_msg}")
            self.rollback_if_open()
>           raise DbtDatabaseError(err_msg) from e
E           dbt_common.exceptions.base.DbtDatabaseError: Database Error
E             syntax error at or near "FROM"

/Users/kelton.karboviak/Code/dbt-redshift/dbt/adapters/redshift/connections.py:353: DbtDatabaseError
=========================================================================== short test summary info ============================================================================
FAILED tests/functional/adapter/test_basic.py::TestSimpleMaterializationsRedshift::test_base - dbt_common.exceptions.base.DbtDatabaseError: Database Error
============================================================================== 1 failed in 17.82s ==============================================================================

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Provide query_id in the Adapter Response
1 participant