Damian's notes – Using Sensors to manage running of time sensitive tasks in Apache Airflow

Damian Kula

Using Sensors to manage running of time sensitive tasks in Apache Airflow

Posted on 2020.09.25

Apache Airflow is a magnificent tool. It makes executing repeatable workflows a pleasure. No more shell scripts executed with cron and crawling through logs if everything was executed properly. If you don't know Airflow yet, check out their website. [1] At first, it might seem to be overwhelming, but it is worth to learn more about that piece of software.

Anyway, to the point. Some tasks have to be executed only within some time limits. Let's say, you are having a device that is connected to the internet only for 3 hours a day and within that time you have to download data from it. In standard situation, it is as easy as preparing a workflow that is being run within those 3 hours. The problem with this approach becomes apparent if you have a backlog of DagRuns to be fulfilled or if the runtime of tasks above the time sensitive one is heavily variable.

Looking at the built-in Sensors and Operators, there seems not to be anything that would fix our problem. The thing closest to it is TimeSensor. [2] This sensor checks only for the lower bound of the time window we are forced to work within. It is not exactly what we need, we would need something that will also check if the task is executed exactly within the permitted time window. Of course, that's a good base and making a Sensor that will fit our needs seems feasible.

Using the code of TimeSensor as a base and modifying it to accept two Time objects we can get something like that:

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults


class TimeWindowSensor(BaseSensorOperator):
    """
    Checks if current task is being executed within time limits

    :param start_time: time after which the job succeeds
    :type start_time: datetime.time
    :param end_time: time before which the job succeeds
    :type end_time: datetime.time
    """

    @apply_defaults
    def __init__(self, start_time, end_time, *args, **kwargs):
        super(TimeRangeSensor, self).__init__(*args, **kwargs)
        self.start_time = start_time
        self.end_time = end_time

    def poke(self, context):
        self.log.info(f"Checking if it's currently within time limit {self.start_time} - {self.end_time}")
        current_time = timezone.utcnow().time()
        return (current_time > self.start_time) and (current_time < self.end_time)

Usage of that sensor is also trivial.

import pendulum

from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

args = {
    "owner": "damian",
    "start_date": pendulum.Pendulum(2020,1,1),
    "end_date": pendulum.Pendulum(2020,1,10),
    "retries": 0,
    "retry_delay": pendulum.duration(seconds=30),
    "catchup_by_default": True,
}

dag = DAG(
    dag_id=dag_id,
    default_args=args,
    schedule_interval="30 11 * * *",
)


time_window_checker = TimeWindowSensor(
task_id=f'check_if_within_time_window_for_data_fetching',
start_time=pendulum.Time(hour=14,minute=30),
end_time=pendulum.Time(hour=19,minute=45),
mode='reschedule',
dag=dag,
)

downloader = PythonOperator(
task_id=f'time_sensitive_data_download',
python_callable=download_data,
dag=dag,
retries=5,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

time_windows_checker >> data_downloader

In case of this example, despite of the dag being executed daily at 11:30, the time_window_checker sensor will wait until 14:30 and then it will let downloader task to be executed.

It's also worth to note that this sensor itself does not guaranty that your task will be executed within the necessary time limits. If you have a lot of tasks being executed in the same time, you have to properly adjust priorities for the sensor and the time-sensitive task itself so it is executed as soon as possible after the sensor.

[1]https://airflow.apache.org/
[2]https://github.com/apache/airflow/blob/cdec3012542b45d23a05f62d69110944ba542e2a/airflow/sensors/time_sensor.py#L24