Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

closing connection chunks in DbApiHook.get_pandas_df #22947

Closed
1 of 2 tasks
bauerfranz opened this issue Apr 12, 2022 · 5 comments · Fixed by #23452
Closed
1 of 2 tasks

closing connection chunks in DbApiHook.get_pandas_df #22947

bauerfranz opened this issue Apr 12, 2022 · 5 comments · Fixed by #23452
Assignees
Labels

Comments

@bauerfranz
Copy link

bauerfranz commented Apr 12, 2022

Apache Airflow version

2.2.5 (latest released)

What happened

Hi all,
Please be patient with me, it's my first Bugreport in git at all :)

Affected function: DbApiHook.get_pandas_df

Short description: If I use DbApiHook.get_pandas_df with parameter "chunksize" the connection is lost

Error description
I tried using the DbApiHook.get_pandas_df function instead of pandas.read_sql. Without the parameter "chunksize" both functions work the same. But as soon as I add the parameter chunksize to get_pandas_df, I lose the connection in the first iteration. This happens both when querying Oracle and Mysql (Mariadb) databases.

During my research I found a comment on a closed issue that describes the same -> #8468

My Airflow version: 2.2.5

I think it's something to do with the "with closing" argument, because when I remove that argument, the chunksize argument was working.

def get_pandas_df(self, sql, parameters=None, **kwargs):
        """
        Executes the sql and returns a pandas dataframe
        :param sql: the sql statement to be executed (str) or a list of
            sql statements to execute
        :param parameters: The parameters to render the SQL query with.
        :param kwargs: (optional) passed into pandas.io.sql.read_sql method
        """
        try:
            from pandas.io import sql as psql
        except ImportError:
            raise Exception("pandas library not installed, run: pip install 'apache-airflow[pandas]'.")
       # Not working
        with closing(self.get_conn()) as conn:
                return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
       # would working
       # return psql.read_sql(sql, con=conn, params=parameters, **kwargs)_

What you think should happen instead

It should give me a chunk of DataFrame

How to reproduce

not working

src_hook = OracleHook(oracle_conn_id='oracle_source_conn_id')
query = "select * from example_table" 
for chunk in src_hook.get_pandas_df(query,chunksize=2):
    print(chunk.head())

works

for chunk in src_hook.get_pandas_df(query):
    print(chunk.head())

works

for chunk in pandas.read_sql(query,src_hook.get_conn(),chunksize=2):
    print(chunk.head())

Operating System

MacOS Monetäre

Versions of Apache Airflow Providers

apache-airflow 2.2.5
apache-airflow-providers-ftp 2.1.2
apache-airflow-providers-http 2.1.2
apache-airflow-providers-imap 2.2.3
apache-airflow-providers-microsoft-mssql 2.1.3
apache-airflow-providers-mongo 2.3.3
apache-airflow-providers-mysql 2.2.3
apache-airflow-providers-oracle 2.2.3
apache-airflow-providers-salesforce 3.4.3
apache-airflow-providers-sftp 2.5.2
apache-airflow-providers-sqlite 2.1.3
apache-airflow-providers-ssh 2.4.3

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@bauerfranz bauerfranz added area:core kind:bug This is a clearly a bug labels Apr 12, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 12, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@tirkarthi
Copy link
Contributor

Below is a test case explaining the problem. The get_pandas_df method in case of chunksize being passed returns a generator instead of a dataframe that references to the connection closed. When we try to iterate through then the generator uses a closed connection giving error. Also the behavior of closing the connection depends on the datatype as below.

https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html#pandas-read-sql

con: SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible for engine disposal and connection closure for the SQLAlchemy connectable; str connections are closed automatically. See here.

https://github.com/pandas-dev/pandas/blob/c90294db40cd48fca7fbec5fa419b46a4b4768a1/pandas/io/sql.py#L991

def test_get_pandas_df_chunksize(self):
    import sqlite3

    class UnitTestSqliteHook(SqliteHook):
        conn_name_attr = 'test_conn_id'
        log = mock.MagicMock()

        def setup_table(self):
            self.conn = sqlite3.connect(":memory:")
            cursor = self.conn.cursor()
            cursor.execute("create table users(id int, name text)")
            cursor.execute("insert into users(id, name) values(1, 'a')")
            cursor.close()

        def get_conn(self):
            self.setup_table()
            return self.conn

    self.db_hook = UnitTestSqliteHook()

    statement = 'select * from users'
    df = list(self.db_hook.get_pandas_df(statement, chunksize=1))

    assert df[0].columns[0] == 'id'
    assert df[0].values.tolist()[0][0] == 1
    assert df[0].values.tolist()[0][1] == 'a'
pytest tests/providers/sqlite/hooks/test_sqlite.py -k test_get_pandas_df_chunksize
============================================================================ test session starts ============================================================================
platform linux -- Python 3.10.4, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /usr/local/bin/python
cachedir: .pytest_cache
rootdir: /opt/airflow, configfile: pytest.ini
plugins: flaky-3.7.0, forked-1.4.0, cov-3.0.0, anyio-3.5.0, requests-mock-1.9.3, instafail-0.4.2, timeouts-1.2.1, rerunfailures-9.1.1, xdist-2.5.0, httpx-0.20.0, asyncio-0.18.3
asyncio: mode=strict
setup timeout: 0.0s, execution timeout: 0.0s, teardown timeout: 0.0s
collected 9 items / 8 deselected / 1 selected                                                                                                                               

tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize FAILED                                                                      [100%]

================================================================================= FAILURES ==================================================================================
________________________________________________________________ TestSqliteHook.test_get_pandas_df_chunksize ________________________________________________________________

self = <tests.providers.sqlite.hooks.test_sqlite.TestSqliteHook testMethod=test_get_pandas_df_chunksize>

    def test_get_pandas_df_chunksize(self):
        import sqlite3
    
        class UnitTestSqliteHook(SqliteHook):
            conn_name_attr = 'test_conn_id'
            log = mock.MagicMock()
    
            def setup_table(self):
                self.conn = sqlite3.connect(":memory:")
                cursor = self.conn.cursor()
                cursor.execute("create table users(id int, name text)")
                cursor.execute("insert into users(id, name) values(1, 'a')")
                cursor.close()
    
            def get_conn(self):
                self.setup_table()
                return self.conn
    
        self.db_hook = UnitTestSqliteHook()
    
        statement = 'select * from users'
>       df = list(self.db_hook.get_pandas_df(statement, chunksize=1))

tests/providers/sqlite/hooks/test_sqlite.py:126: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

cursor = <sqlite3.Cursor object at 0x7fede21680c0>, chunksize = 1, columns = ['id', 'name'], index_col = None, coerce_float = True, parse_dates = None, dtype = None

    @staticmethod
    def _query_iterator(
        cursor,
        chunksize: int,
        columns,
        index_col=None,
        coerce_float: bool = True,
        parse_dates=None,
        dtype: DtypeArg | None = None,
    ):
        """Return generator through chunked result set"""
        has_read_data = False
        while True:
>           data = cursor.fetchmany(chunksize)
E           sqlite3.ProgrammingError: Cannot operate on a closed database.

/usr/local/lib/python3.10/site-packages/pandas/io/sql.py:2047: ProgrammingError
--------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when running tests.
============================================================================= warnings summary ==============================================================================
airflow/configuration.py:407
  /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'dag_default_view' setting in [webserver] has the old default value of 'tree'. This value has been changed to 'grid' in the running config, but please update your config before Apache Airflow 3.0.
    warnings.warn(

airflow/configuration.py:407
  /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'log_filename_template' setting in [logging] has the old default value of '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'. This value has been changed to 'dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log' in the running config, but please update your config before Apache Airflow 3.0.
    warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/warnings.html
========================================================================== short test summary info ==========================================================================
FAILED tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize - sqlite3.ProgrammingError: Cannot operate on a closed database.
================================================================ 1 failed, 8 deselected, 2 warnings in 0.90s ================================================================

@potiuk
Copy link
Member

potiuk commented Apr 13, 2022

Why don't you attempt to fix it ?

@hubert-pietron
Copy link
Contributor

If @bauerfranz is not interested in fixing this bug i would want to work on this

@potiuk
Copy link
Member

potiuk commented Apr 20, 2022

assigned you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants