Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to better support odd scheduling time #10194

Closed
Shadowsong27 opened this issue Aug 6, 2020 · 11 comments
Closed

Ability to better support odd scheduling time #10194

Shadowsong27 opened this issue Aug 6, 2020 · 11 comments
Labels
AIP-39 Timetables area:Scheduler including HA (high availability) scheduler kind:feature Feature Requests

Comments

@Shadowsong27
Copy link

Description

So I have been encountering odd time schedule request recently, say some updates need to be done
on every second and third Friday each month, 4pm.

Tried to search for solution but could not find a perfect ones, I can think of the following solutions:

  1. use a cron and a separate DAG
  2. embed trigger in the actual task logic, that is to check whether the code should be executed in that date, otherwise return success (equivalent to skip and mark success)

Drawbacks (at least the one i can think of):

  1. it creates extra DAG, and when you have a bunch of these requests, it's not really clean. Besides, you need to add equal number of ExternalTaskSensor to preserve inter-DAG dependencies. It also however, will need to provision other dependent resources (such as Kafka consumers)

  2. not very generic and reusable apparently, and if you are going for a full containerized solution - each Task is executed in container - you also need to start a container first to reach that simple if-else check, which seems a bit wasteful

Use case / motivation

Basically I have a bunch of extraction scripts that need to be executed at very different and very weird times, very straight forward (?), without the need to replicating provision tasks, too many inter-DAG dependencies

Potential solutions I have in mind:

  • not touching Airflow core (workaround)

inherit and extend the Operators i am using - troublesome and needs to repeat for multiple operators

  • touching Airflow core

an extra tuple params allowed_dates passed into the operators (BaseOperator), and then inject the date check right before the operator execution, essentially giving all operator the ability to be programmatically skipped in any DAG - might be an overkill

@Shadowsong27 Shadowsong27 added the kind:feature Feature Requests label Aug 6, 2020
@JeffryMAC
Copy link

This has many requests (not all the same but the general idea is more scheduling abilities)
#10123
#9321
#8649

and I'm adding my +1000 to having more scheduling ability.

I think at the end Airflow as workflow management system and as such the key functionality needed is flexible scheduling. In my opinion this should be one of the pillars of the tool.

@Shadowsong27
Copy link
Author

Shadowsong27 commented Aug 14, 2020

I have just hacked some workaround since I really need this feature, it may be a bit raw but it's usable, using the first approach I mentioned above, with some abstractions. Basically I create a function that takes in any Airflow Operator, and returned a children of it, opening up an additional argument to skip dates.

Code here:

def flexify_operator(operator: Any) -> Any:

    class FlexibleOperator(operator):

        @apply_defaults
        def __init__(
                self,
                allowed_dates: Tuple = (),
                skip_func: Callable = None,
                *args,
                **kwargs,
        ):
            super().__init__(*args, **kwargs)

            if allowed_dates and skip_func:
                raise AirflowException("You can only provide either allowed dates or a custom skip function.")

            self.allowed_dates = allowed_dates
            self.skip_func = skip_func

        def execute(self, context):
            execution_date = context.get('ds')
            is_skipping = False

            if self.skip_func:
                if self.skip_func(execution_date):
                    is_skipping = True

            if self.allowed_dates:
                if self.is_skipping_task(execution_date):
                    is_skipping = True

            if is_skipping:
                self.log.warning('Skipping the task based on defined logic ')
                return

            self.log.warning("Executing actual tasks ... ")
            super().execute(context)

        def is_skipping_task(self, execution_date: str) -> bool:

            date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')

            if date_obj.day not in self.allowed_dates:
                return True

            return False

    return FlexibleOperator


def exec_tues_fri(execution_date: str) -> bool:
    """ skip if return True, this func execute on Tuesday and Friday """
    date_obj = dt.datetime.strptime(execution_date, '%Y-%m-%d')

    if date_obj.weekday() not in [1, 4]:
        return True

    return False

usage in DAG file:

from airflow.operators.postgres_operator import PostgresOperator
PostgresOperator = utils.flexify_operator(PostgresOperator)

PostgresOperator(
    task_id=f"test",
    postgres_conn_id="conn_id",
    sql="SELECT * FROM public.test_demo",
    dag=dag,
    allowed_dates=(12, 13, 14)
)

PostgresOperator(
    task_id=f"test2",
    postgres_conn_id="conn_id",
    sql="SELECT * FROM public.test_demo",
    dag=dag,
    skip_func=exec_tues_fri
)

Ok i have just updated this to be more generic, allowed either a tuple of dates to be executed, or a custom callable.

Note: if the callable returns True, it means it's going to skip. So when you want to execute on Friday, return False when weekday == 4. Well if you find this awkward you can change it.

@JeffryMAC
Copy link

I think you can utilise this to become a FelixbileDateSensor just like we have WeekDaySensor
@kaxil maybe you can give some pointers if that idea of a Sensor can be accepted as a core sensor?

@Shadowsong27
Copy link
Author

@JeffryMAC By reading the threads above I feel my use case is closer to an operator based from BaseBranchOperator instead of a sensor approach, something like FlexibleTimeBranchOperator which allows you to branch DAG logic base on purely the execution datetime, by a customized func. We could have some pre-builds like @monday similar to @daily in DAG interval etc.

I do agree that we should not introduce too much complexity to Airflow core, that's why I feel a plugin / operator like approach is probably better.

I provide a PR for this if you guys think this branch logic is useful.

@JeffryMAC
Copy link

@Shadowsong27 I don't know if it's the same case. having @monday isn't a "dynamic" it's something can be obtained with a single cron expression 0 0 * * MON. The idea of better scheduling times is to allow to schedule jobs with intervals that can't be specified by a single cron expression.

@frosk1
Copy link

frosk1 commented Sep 11, 2020

@Shadowsong27 it is nice to see more reminiscence and feedback on this topic. I have been gone the complete way from a outside enduser view to force airflow becoming more flexible in its scheduling mechanism: opened feature request #10449, send draft via mail to [email protected], etc.

Within my feature request I point out the huge huge demand coming from endusers in terms of stackoverflow discussions, where users not even see this being under development.

Having said this, airflow should integrate full stack scheduling behavior to fill the gap. It is not a huge feature request to realize this and also airflow could step up to cover all former cron based systems and make them obsolete. To have not only a really really great worflow handling software but also the state of the art scheduling software.

I think I did everything I could to point the airflow community on to that topic, which is based by a huge enduser demand. At my own firm we are applying airflow and we would also loved to see this step up.

@Shadowsong27
Copy link
Author

@Shadowsong27 I don't know if it's the same case. having @monday isn't a "dynamic" it's something can be obtained with a single cron expression 0 0 * * MON. The idea of better scheduling times is to allow to schedule jobs with intervals that can't be specified by a single cron expression.

Ok @monday is a bad example, guess pre-builts are not meaningful since if they are common enough to be a pre-built schedule, they most likely can be expressed in cron.

I totally agreed on the rest, that's actually why I felt a custom function defined by users is the way to go here.

@Shadowsong27
Copy link
Author

@frosk1 Yep I have been using Airflow since 2018, watched it grows to become the top choice in the market. This issue is not super blocking TBH, you can still kind of create one DAG per each odd schedule, but it creates a lot of pain, especially in most of the companys the DAG creation processes are still handled by data engineers (IMO, they should be done by the data consumers, via some sort of web UI).

That's why sometimes we will have to write our own plugins I guess, but I do see this as a potential Airflow core feature.

@ashb
Copy link
Member

ashb commented Jan 20, 2021

I've started a discussion thread on this on the dev mailing list to scope out what a solution to this will look like https://lists.apache.org/thread.html/rb4e004e68574e5fb77ee5b51f4fd5bfb4b3392d884c178bc767681bf%40%3Cdev.airflow.apache.org%3E

Use cases there would be ace (and feedback once we come up with a design)

@vikramkoka vikramkoka added the area:Scheduler including HA (high availability) scheduler label Mar 2, 2021
@ashb ashb added the AIP-39 Timetables label Apr 19, 2021
@eladkal
Copy link
Contributor

eladkal commented Jul 7, 2022

Please check Timetables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html
If a specific feature is missing you can open a new issue and explain

@eladkal eladkal closed this as completed Jul 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-39 Timetables area:Scheduler including HA (high availability) scheduler kind:feature Feature Requests
Projects
None yet
Development

No branches or pull requests

7 participants