Overview

Apache Airflow is an open-source tool for programmatically authoring, scheduling, and monitoring data pipelines. Airflow’s extensible Python framework enables you to build workflows that connect with virtually any technology. When working with Airflow, it’s essential to monitor the health of your DAGs and tasks to ensure that your data pipelines run smoothly. The Airflow Provider integration allows seamless communication between Airflow and Keep, so you can forward alerts, such as task failures, directly to Keep via webhook configurations.

Connecting Airflow to Keep

Alert Integration via Webhook

To connect Airflow to Keep, configure Airflow to send alerts using Keep’s webhook. You must provide:

  • Keep Webhook URL: The webhook URL provided by Keep (for example, https://api.keephq.dev/alerts/event/airflow).
  • Keep API Key: The API key generated on Keep’s platform, which is used for authentication.

A common method to integrate Airflow with Keep is by configuring alerts through Airflow Callbacks. For instance, when an Airflow task fails, a callback can send an alert to Keep via the webhook.

There are several steps to implement this:

Step 1: Define Keep’s Alert Information

Structure your alert payload with the following information:

data = {
    "name": "Airflow Task Failure",
    "description": "Task keep_task failed in DAG keep_dag",
    "status": "firing",
    "service": "pipeline",
    "severity": "critical",
}

Step 2: Configure Keep’s Webhook Credentials

To send alerts to Keep, configure the webhook URL and API key. Below is an example of how to send an alert using Python:

Note: You need to set up the KEEP_API_KEY environment variable with your Keep API key.

import os
import requests

def send_alert_to_keep(dag_id, task_id, execution_date, error_message):
    # Replace with your specific Keep webhook URL if different.
    keep_webhook_url = "https://api.keephq.dev/alerts/event/airflow"
    api_key = os.getenv("KEEP_API_KEY")
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "X-API-KEY": api_key,
    }

    data = {
        "name": f"Airflow Task Failure: {task_id}",
        "message": f"Task {task_id} failed in DAG {dag_id} at {execution_date}",
        "status": "firing",
        "service": "pipeline",
        "severity": "critical",
        "description": str(error_message),
    }

    response = requests.post(keep_webhook_url, headers=headers, json=data)
    response.raise_for_status()

Step 3: Configure the Airflow Callback Function

Now, configure the callback so that an alert is sent to Keep when a task fails. You can attach this callback to one or more tasks in your DAG as shown below:

import os
import requests
from datetime import datetime
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def send_alert_to_keep(dag_id, task_id, execution_date, error_message):
    # Replace with your specific Keep webhook URL if different.
    keep_webhook_url = "https://api.keephq.dev/alerts/event/airflow"
    api_key = os.getenv("KEEP_API_KEY")
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "X-API-KEY": api_key,
    }

    data = {
        "name": f"Airflow Task Failure: {task_id}",
        "message": f"Task {task_id} failed in DAG {dag_id} at {execution_date}",
        "status": "firing",
        "service": "pipeline",
        "severity": "critical",
        "description": str(error_message),
    }

    response = requests.post(keep_webhook_url, headers=headers, json=data)
    response.raise_for_status()

def task_failure_callback(context):
    send_alert_to_keep(
        dag_id=context["dag"].dag_id,
        task_id=context["task_instance"].task_id,
        execution_date=context["execution_date"],
        error_message=context.get("exception", "Unknown error"),
    )

dag = DAG(
    dag_id="keep_dag",
    default_args=default_args,
    description="A simple DAG with Keep integration",
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

task = BashOperator(
    task_id="keep_task",
    bash_command="exit 1",
    dag=dag,
    on_failure_callback=task_failure_callback,
)

Step 4: Observe Alerts in Keep

After setting up the above configuration, any failure in your Airflow tasks will trigger an alert that is sent to Keep via the configured webhook. You can then view, manage, and respond to these alerts using the Keep dashboard.