import os
import requests
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def send_alert_to_keep(dag_id, task_id, execution_date, error_message):
# Replace with your specific Keep webhook URL if different.
keep_webhook_url = "https://api.keephq.dev/alerts/event/airflow"
api_key = os.getenv("KEEP_API_KEY")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-API-KEY": api_key,
}
data = {
"name": f"Airflow Task Failure: {task_id}",
"message": f"Task {task_id} failed in DAG {dag_id} at {execution_date}",
"status": "firing",
"service": "pipeline",
"severity": "critical",
"description": str(error_message),
}
response = requests.post(keep_webhook_url, headers=headers, json=data)
response.raise_for_status()
def task_failure_callback(context):
send_alert_to_keep(
dag_id=context["dag"].dag_id,
task_id=context["task_instance"].task_id,
execution_date=context["execution_date"],
error_message=context.get("exception", "Unknown error"),
)
dag = DAG(
dag_id="keep_dag",
default_args=default_args,
description="A simple DAG with Keep integration",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
)
task = BashOperator(
task_id="keep_task",
bash_command="exit 1",
dag=dag,
on_failure_callback=task_failure_callback,
)