Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouseBranchSQLOperator: AirflowException, Invalid arguments #87

Closed
cra opened this issue Jul 23, 2024 · 10 comments
Closed

ClickHouseBranchSQLOperator: AirflowException, Invalid arguments #87

cra opened this issue Jul 23, 2024 · 10 comments

Comments

@cra
Copy link

cra commented Jul 23, 2024

Hello!
I've been using the regular ClickHouseOperator for my DAGs for a while now and I noticed that you support DB API 2.0 so I tried to using ClickHouseBranchSQLOperator and ran into an issue

check_tbl_exists = ClickHouseBranchSQLOperator(
        task_id='check_if_table_exists',
        sql='check_if_table_exists.sql',
        conn_id='ch_default',
        follow_task_ids_if_true='check_if_table_empty',
        follow_task_ids_if_false='create_agg_table',
    )

It seems to match the way the BranchSQLOperator is used but I get ImportError when trying to use this task:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 484, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 881, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to ClickHouseBranchSQLOperator (task_id: check_if_table_exists). Invalid arguments were:
**kwargs: {'sql': 'check_if_table_exists.sql', 'follow_task_ids_if_true': 'check_if_table_empty', 'follow_task_ids_if_false': 'create_agg_table'}

Could you please provide an example of how this operator is supposed to be used?
I'm probably missing something silly, but cannot figure out what

@cra
Copy link
Author

cra commented Jul 24, 2024

Changing the order of base classes in inheritance solved my issue

from airflow.providers.common.sql.operators import sql
from airflow_clickhouse_plugin.operators.clickhouse_dbapi import ClickHouseBaseDbApiOperator


class ClickHouseBranchSQLOperator(
    sql.BranchSQLOperator,
    ClickHouseBaseDbApiOperator,
):
    pass

@bryzgaloff
Copy link
Owner

bryzgaloff commented Jul 28, 2024

Hi @cra and thank you for reporting this!

TL;DR The behaviour looks strange. Before proceeding with the change, I suggest understanding the issue first.


Though I was able to reproduce it, this behaviour looks strange to me. This is what ClickHouseBranchSQLOperator.__mro__ shows:

  • ClickHouseBranchSQLOperator
  • ClickHouseBaseDbApiOperator
  • ClickHouseDbApiHookMixin
  • BranchSQLOperator
  • BaseSQLOperator
  • BaseOperator
  • Operator
  • SkipMixin
  • LoggingMixin
  • TaskMixin
  • object

Since the first 3 classes do not define __init__ method at all, ClickHouseBranchSQLOperator(…) call should resolve into calling BranchSQLOperator.__init__ as the first one. This should result in properly handling all the defined kwargs, leaving no unprocessed ones to the Airflow's BaseOperator which raises the AirflowException.

I have also created a small code snippet to reproduce this classes hierarchy:

# Airflow base operator
class BaseOperator(object):
    def __init__(self, **kwargs):
        print(f'BaseOperator.__init__ called: {kwargs=}')
        if kwargs:
            raise AssertionError(f'unprocessed {kwargs=}')

# Common SQL operators
class BaseSQLOperator(BaseOperator):
    def __init__(self, **kwargs):
        print(f'BaseSQLOperator.__init__ called: {kwargs=}')
        super().__init__(**kwargs)

    def get_db_hook(self):
        print('BaseSQLOperator.get_db_hook called')

class BranchSQLOperator(BaseSQLOperator):
    def __init__(self, follow_task_ids_if_true, **kwargs):
        print(f'BranchSQLOperator.__init__ called: {kwargs=}')
        super().__init__(**kwargs)

# ClickHouse base operators
class ClickHouseDbApiHookMixin(object):
    def _get_clickhouse_db_api_hook(self):
        print('ClickHouseDbApiHookMixin._get_clickhouse_db_api_hook called')

class ClickHouseBaseDbApiOperator(ClickHouseDbApiHookMixin, BaseSQLOperator):
    def get_db_hook(self):
        print('ClickHouseBaseDbApiOperator.get_db_hook called')
        return self._get_clickhouse_db_api_hook()

# The target class
class ClickHouseBranchSQLOperator(ClickHouseBaseDbApiOperator, BranchSQLOperator):
    pass

print(ClickHouseBranchSQLOperator.__mro__)
print('\ncalling ClickHouseBranchSQLOperator()')
operator = ClickHouseBranchSQLOperator(follow_task_ids_if_true=['task_1'])
print('\ncalling get_db_hook()')
operator.get_db_hook()

And it works perfectly fine, here is the output:

(<class '__main__.ClickHouseBranchSQLOperator'>, <class '__main__.ClickHouseBaseDbApiOperator'>, <class '__main__.ClickHouseDbApiHookMixin'>, <class '__main__.BranchSQLOperator'>, <class '__main__.BaseSQLOperator'>, <class '__main__.BaseOperator'>, <class 'object'>)

calling ClickHouseBranchSQLOperator()
BranchSQLOperator.__init__ called: kwargs={} 
BaseSQLOperator.__init__ called: kwargs={}
BaseOperator.__init__ called: kwargs={}

calling get_db_hook()
ClickHouseBaseDbApiOperator.get_db_hook called
ClickHouseDbApiHookMixin._get_clickhouse_db_api_hook called

BranchSQLOperator.__init__ called the first one without any unknown kwargs.

Thank you for sharing your solution. But before we proceed with the code change, I would like to understand the behaviour. Because it might be not the plugin's issue.

Do you have any clues of what happens in Airflow and why its behaviour changes from the regular Python MRO as shown in the above code snippet? Maybe the code snippet misses something significant differentiating from Airflow's implementation (some meta classes maybe, though I have checked them and spotted no crucial difference).

When we change the class definition to class ClickHouseBranchSQLOperator(sql.BranchSQLOperator, ClickHouseBaseDbApiOperator), the MRO is changed to:

  • ClickHouseBranchSQLOperator
  • BranchSQLOperator — placed before ClickHouseBaseDbApiOperator now
  • ClickHouseBaseDbApiOperator — placed after BranchSQLOperator now
  • ClickHouseDbApiHookMixin
  • BaseSQLOperator
  • BaseOperator
  • … the rest is the same as above

Which means that ClickHouseBaseDbApiOperator is likely to break the MRO and call __init__ of its base class: BaseSQLOperator — skipping BranchSQLOperator's one. But the simplified code sample does not reproduce it, though the same change in the MRO happens if the base classes order of ClickHouseBranchSQLOperator is switched.

@bryzgaloff
Copy link
Owner

bryzgaloff commented Jul 28, 2024

As a quicker option, you may also proceed with a PR. Ideally, please start a PR with tests only. They should fail for the reported case. Once we confirm the tests fail, you may proceed with the code change fixing it by switching the order of the base classes (please see the new proper way to fix below). After that the tests should pass.

@bryzgaloff bryzgaloff changed the title Unable to use ClickHouseBranchSQLOperator: invalid kwargs ClickHouseBranchSQLOperator: AirflowException, Invalid arguments Jul 28, 2024
@grihabor
Copy link

Here is another solution:

 class ClickHouseDbApiHookMixin(object):
     # these attributes are defined in both BaseSQLOperator and SqlSensor
     conn_id: str
     hook_params: t.Optional[dict]

+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)

Apparently just adding __init__ fixes the problem.

I think I know what's going on.

You're right, something is funky, so the first thing I checked is the metaclass
of the base class. And indeed BaseOperator uses a custom metaclass:
https://github.com/apache/airflow/blob/81845de9d95a733b4eb7826aaabe23ba9813eba3/airflow/models/baseoperator.py#L520

And here is the line that breaks everything:
https://github.com/apache/airflow/blob/81845de9d95a733b4eb7826aaabe23ba9813eba3/airflow/models/baseoperator.py#L515

If you don't have __init__ in your class, it creates it anyway, but uses the
parent class at the moment of class declaration. This is not the same as MRO:

setting new_cls=<class 'airflow.models.baseoperator.BaseOperator'>.__init__ to new_cls.__init__=<function BaseOperator.__init__ at 0x765141ae2d40>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.BaseSQLOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651419a9bc0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator'>.__init__ to new_cls.__init__=<function SQLExecuteQueryOperator.__init__ at 0x7651419a9f80>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator'>.__init__ to new_cls.__init__=<function SQLColumnCheckOperator.__init__ at 0x7651419aa5c0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLTableCheckOperator'>.__init__ to new_cls.__init__=<function SQLTableCheckOperator.__init__ at 0x7651419aaa20>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLCheckOperator'>.__init__ to new_cls.__init__=<function SQLCheckOperator.__init__ at 0x7651419aad40>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLValueCheckOperator'>.__init__ to new_cls.__init__=<function SQLValueCheckOperator.__init__ at 0x7651419aafc0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator'>.__init__ to new_cls.__init__=<function SQLIntervalCheckOperator.__init__ at 0x7651419ab600>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator'>.__init__ to new_cls.__init__=<function SQLThresholdCheckOperator.__init__ at 0x7651419ab7e0>
setting new_cls=<class 'airflow.providers.common.sql.operators.sql.BranchSQLOperator'>.__init__ to new_cls.__init__=<function BranchSQLOperator.__init__ at 0x7651419abb00>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseBaseDbApiOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651419aa020>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLExecuteQueryOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLColumnCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLTableCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLValueCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLIntervalCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseSQLThresholdCheckOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>
setting new_cls=<class 'airflow_clickhouse_plugin.operators.clickhouse_dbapi.ClickHouseBranchSQLOperator'>.__init__ to new_cls.__init__=<function BaseSQLOperator.__init__ at 0x7651409d4860>

So when you run your class, the method that actually gets called is
BaseSQLOperator.__init__ and not BranchSQLOperator.__init__.

I opened an issue in airflow: apache/airflow#41085
And the fix: apache/airflow#41086

@bryzgaloff
Copy link
Owner

Hi @grihabor thank you for your proactive participation! And the PR to Airflow in particular 🔥

For my better understanding, please confirm:

  • When ClickHouseBaseDbApiOperator is created, its BaseOperatorMeta metaclass (inherited from BaseSQLOperator) assigns its __init__ in BaseOperatorMeta.__new__ to new_cls.__init__ which is, based on MRO, is BaseSQLOperator.__init__ at the moment of creation.
  • When BranchSQLOperator.__init__ is executed, the first (in the MRO) class with an actual implementation is ClickHouseBaseDbApiOperator. It means that it executes BaseSQLOperator.__init__ as if it was ClickHouseBaseDbApiOperator.__init__.

But does not it mean that because BaseSQLOperator.__init__ contains super().__init__(…) call, it should also call the next-in-the-MRO class which is BranchSQLOperator?

Or does new_cls.__init__ = cls._apply_defaults(new_cls.__init__) assignment preserves the __init__ method to be class-bound (bound to BaseSQLOperator) so that its super() call resolves into BaseSQLOperator's parent which is BaseOperator?

Just trying to gather some keywords for me to know which concepts I have to refresh in memory 😅

@bryzgaloff
Copy link
Owner

I agree that a quick fix would be adding the __init__ method simply calling super().__init__(**kwargs) to the ClickHouseBaseDbApiOperator class.

@grihabor
Copy link

When ClickHouseBaseDbApiOperator is created, its BaseOperatorMeta metaclass (inherited from BaseSQLOperator) assigns its init in BaseOperatorMeta.new to new_cls.init which is, based on MRO, is BaseSQLOperator.init at the moment of creation.

Correct.

When BranchSQLOperator.init is executed, the first (in the MRO) class with an actual implementation is ClickHouseBaseDbApiOperator. It means that it executes BaseSQLOperator.init as if it was ClickHouseBaseDbApiOperator.init.

The thing is, when you create an instance of the ClickHouseBranchSQLOperator it's __init__ gets called, which is BaseSQLOperator.__init__, so BranchSQLOperator.__init__ is never called. In turn this results in follow_task_ids_if_true and follow_task_ids_if_false not being consumed by BranchSQLOperator.__init__ and eventually leads to airflow exception.

But does not it mean that because BaseSQLOperator.init contains super().init(…) call, it should also call the next-in-the-MRO class which is BranchSQLOperator?

Nope. It calls the next-in-the-MRO class after BaseSQLOperator which is BaseOperator. Here is a small example:

class InitMeta(type):
    def __new__(cls, name, bases, namespace, **kwargs):
        new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
        new_cls.__init__ = new_cls.__init__
        return new_cls


class A(metaclass=InitMeta):
    def __init__(self):
        print("A", super())
        super().__init__()


class B(A):
    pass


class C(A):
    def __init__(self):
        print("C", super())
        super().__init__()


class D(B, C):
    pass


D()

The output is

A <super: <class 'A'>, <D object>>

So in the class A the super call is equivalent to super(A, self).__init__()

Or does new_cls.init = cls._apply_defaults(new_cls.init) assignment preserves the init method to be class-bound (bound to BaseSQLOperator) so that its super() call resolves into BaseSQLOperator's parent which is BaseOperator?

Yep.

@bryzgaloff
Copy link
Owner

Thank you @grihabor for the explanation and your contribution to Airflow main repo!

@cra I believe you may expect the fix in Airflow 2.9.4

@cra
Copy link
Author

cra commented Aug 6, 2024

Could you add a warning or mark supported versions in the table then?
In the README
https://github.com/bryzgaloff/airflow-clickhouse-plugin?tab=readme-ov-file#python-and-airflow-versions-support

bryzgaloff added a commit that referenced this issue Aug 8, 2024
@bryzgaloff
Copy link
Owner

Mentioned in README ✅
Also, you are added to the contributors list, thanks! 🤝

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants