Handling POSIX Signals With the Airflow Python Operator

Recently, I have been using Airflow’s PythonOperator, to interact with various AWS services.

It has been great for the most part, but gracefully handling these interactions is something we have to manage ourselves. I am specifically talking about a scenario, where the Airflow task receives a termination signal, and puts the running task into an error state. In these situations the external service doesn’t get notified about the state change.

For example in the case of Sagemaker, we could trigger a prediction job via Airflow. The said job would exceed the time limit allocated for the task, which would result in the task failing but the Sagemaker job still running. In the worst cases the service keeps running as a ‘ghost’ task, while another Airflow task gets triggered. Of course it is highly dependent on the usage of your PythonOperator! But it can create unwanted scenarios where multiple running services are conflicting with each other, and causing each other to fail. At the end of the day you don’t want to run things in the dark.

A simple workaround to this is to leverage the usage of POSIX SIGNALS. Airflow sends a SIGTERM signal whenever it wants to prematurely end a task, so we have to catch this signal and handle our interaction with the external service gracefully before the Airflow task ends.

In this example we’ll create a mock service to demonstrate the usage of signals:

from time import sleep

class MyService:
    def __init__(self):
        self.running = True

    def run(self):
        while self.running:
            print("running as fast as I can")
            sleep(1)

    def stop(self):
        self.running = False
        print("gracefully shutting down")
        sys.exit(1)

Then we define an Airflow job that interacts with this ‘service’:

import sys
from signal import signal, SIGTERM

from airflow.operators.python import PythonOperator

from src.service import MyService

def trigger_service(**kwargs):
    class ServiceHandler:
        service: MyService

        def __init__(self):
            signal(SIGTERM, self.sigterm_handler)

        def run_service(self):
            self.service = MyService()
            self.service.run()

        def sigterm_handler(self, signal, frame):
            print("received '{}' signal - exiting".format(signal))
            self.service.stop()
            sys.exit(1)

    handler = ServiceHandler()
    handler.run_service()


dag = DAG(
    dag_id="external_service_trigger",
    schedule_interval="0 * * * *",
    max_active_runs=1,
)

step_01_run_service = PythonOperator(
    dag=dag,
    task_id="step_01_run_service",
    python_callable=trigger_service,
)

What happens here is that we define a very simple DAG, which has a single step that calls the trigger_service function. In this function we define a new ServiceHandler class which is responsible for running the service. We also register the handler’s sigterm_handler method via the signal package, so that it gets executed whenever we receive a SIGTERM signal. This method then calls the stop() method of our service, making sure that the service gets notified to end its operation.

And there you have it! This way we can ensure that we don’t have any ‘ghost’ tasks running, when Airflow decides to exit early.