Handling POSIX Signals With the Airflow Python Operator
Recently, I have been using Airflow’s PythonOperator, to interact with various AWS services.
It has been great for the most part, but gracefully handling these interactions is something we have to manage ourselves. I am specifically talking about a scenario, where the Airflow task receives a termination signal, and puts the running task into an error state. In these situations the external service doesn’t get notified about the state change.
For example in the case of Sagemaker, we could trigger a prediction job via Airflow. The said job would exceed the time limit allocated for the task, which would result in the task failing but the Sagemaker job still running.
In the worst cases the service keeps running as a ‘ghost’ task, while another Airflow task gets triggered. Of course it is highly dependent on the usage of your PythonOperator
! But it can create unwanted scenarios where multiple running services are conflicting with each other, and causing each other to fail. At the end of the day you don’t want to run things in the dark.
A simple workaround to this is to leverage the usage of POSIX SIGNALS.
Airflow sends a SIGTERM
signal whenever it wants to prematurely end a task, so we have to catch this signal and handle our interaction with the external service gracefully before the Airflow task ends.
In this example we’ll create a mock service to demonstrate the usage of signals:
from time import sleep
class MyService:
def __init__(self):
self.running = True
def run(self):
while self.running:
print("running as fast as I can")
sleep(1)
def stop(self):
self.running = False
print("gracefully shutting down")
sys.exit(1)
Then we define an Airflow job that interacts with this ‘service’:
import sys
from signal import signal, SIGTERM
from airflow.operators.python import PythonOperator
from src.service import MyService
def trigger_service(**kwargs):
class ServiceHandler:
service: MyService
def __init__(self):
signal(SIGTERM, self.sigterm_handler)
def run_service(self):
self.service = MyService()
self.service.run()
def sigterm_handler(self, signal, frame):
print("received '{}' signal - exiting".format(signal))
self.service.stop()
sys.exit(1)
handler = ServiceHandler()
handler.run_service()
dag = DAG(
dag_id="external_service_trigger",
schedule_interval="0 * * * *",
max_active_runs=1,
)
step_01_run_service = PythonOperator(
dag=dag,
task_id="step_01_run_service",
python_callable=trigger_service,
)
What happens here is that we define a very simple DAG
, which has a single step that calls the trigger_service
function. In this function we define a new ServiceHandler
class which is responsible for running the service.
We also register the handler’s sigterm_handler
method via the signal
package, so that it gets executed whenever we receive a SIGTERM
signal. This method then calls the stop()
method of our service, making sure that the service gets notified to end its operation.
And there you have it! This way we can ensure that we don’t have any ‘ghost’ tasks running, when Airflow decides to exit early.