This guide explains how to create a new provider for Keep. Providers are integrations that allow Keep to interact with external services for alerting, querying data, managing incidents, or building topology maps.

Table of contents

Provider structure

Each provider in Keep follows a specific structure:
keep/providers/
├── yourservice_provider/
│   ├── __init__.py
│   └── yourservice_provider.py
Important Notes:
  • Keep’s ProvidersFactory automatically discovers providers based on the directory naming convention (*_provider).
  • You don’t need to register them explicitly - just follow the naming pattern.
  • The provider type is automatically extracted from the class name (for example, ServiceNowProviderservicenow).

Step-by-step implementation

1. Create provider directory

Create a new directory under keep/providers/ with the pattern {service}_provider:
mkdir keep/providers/yourservice_provider

2. Create the provider module

Create yourservice_provider.py with the following structure:
"""
YourService Provider is a class that allows integration with YourService.
"""

import dataclasses
import json
import os
from typing import Optional, List, Dict, Any

import pydantic
import requests

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig, ProviderScope
from keep.providers.models.provider_method import ProviderMethod


@pydantic.dataclasses.dataclass
class YourserviceProviderAuthConfig:
    """YourService authentication configuration."""
    
    api_endpoint: str = dataclasses.field(
        metadata={
            "required": True,
            "description": "YourService API endpoint URL",
            "validation": "https_url",  # Optional: validates HTTPS URLs
        }
    )
    
    api_key: str = dataclasses.field(
        metadata={
            "required": True,
            "description": "API key for YourService",
            "sensitive": True,  # Marks field as sensitive in UI
        }
    )
    
    region: str = dataclasses.field(
        default="us-east-1",
        metadata={
            "required": False,
            "description": "YourService region",
            "type": "select",
            "options": ["us-east-1", "eu-west-1", "ap-south-1"],
        }
    )


class YourserviceProvider(BaseProvider):
    """Send alerts and fetch data from YourService."""
    
    # Required: Display name shown in UI
    PROVIDER_DISPLAY_NAME = "YourService"
    
    # Required: Categories for provider classification
    PROVIDER_CATEGORY = ["Monitoring"]
    
    # Optional: Tags for searchability
    PROVIDER_TAGS = ["alert", "data"]
    
    # Optional: Define required scopes/permissions
    PROVIDER_SCOPES = [
        ProviderScope(
            name="read:alerts",
            description="Read alerts from YourService",
            mandatory=True,
            documentation_url="https://docs.yourservice.com/permissions",
            alias="Read Alerts",
        ),
        ProviderScope(
            name="write:alerts",
            description="Create and update alerts",
            mandatory=False,
            mandatory_for_webhook=True,  # Required only for webhook setup
        ),
    ]
    
    # Optional: OAuth2 URL (MUST be set as class attribute, not in __init__)
    OAUTH2_URL = None  # Or os.environ.get("YOURSERVICE_OAUTH2_URL")
    
    def __init__(
        self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
    ):
        super().__init__(context_manager, provider_id, config)
        # Initialize any client libraries or state here
        # Note: Logger is automatically available as self.logger
        
        # Context manager provides access to:
        # - self.context_manager.tenant_id: Current tenant ID
        # - self.context_manager.workflow_id: Current workflow ID
        # - self.context_manager.workflow_execution_id: Current execution ID
        # - self.context_manager.get_full_context(): Full workflow context
        
    def validate_config(self):
        """
        Validates required configuration for YourService provider.
        
        This is an abstract method that MUST be implemented.
        """
        self.authentication_config = YourserviceProviderAuthConfig(
            **self.config.authentication
        )
        
    def dispose(self):
        """
        Cleanup any resources when provider is disposed.
        
        This is an abstract method that MUST be implemented, even if it just passes.
        """
        pass

3. Create the init.py File

Create keep/providers/yourservice_provider/__init__.py:
from keep.providers.yourservice_provider.yourservice_provider import (
    YourserviceProvider,
    YourserviceProviderAuthConfig
)

__all__ = ["YourserviceProvider", "YourserviceProviderAuthConfig"]

4. Add provider documentation

Create docs/providers/documentation/yourservice-provider.mdx following the documentation template.
Provider configuration fields are automatically documented through auto-generated snippets. Keep generates the snippet files in docs/snippets/providers/ from the provider’s AuthConfig metadata and includes them in the documentation automatically.

Provider architecture

Abstract methods

Every provider must implement these two abstract methods from BaseProvider:
  1. validate_config(self) - Validates and processes the provider configuration
  2. dispose(self) - Clean up resources when the provider is disposed of

Provider capabilities

Providers expose capabilities through standard methods:
  • _notify(**kwargs) - Send notifications or alerts
  • _query(**kwargs) - Query data from the provider
  • _get_alerts() - Fetch alerts for monitoring
  • setup_webhook(...) - Configure webhook endpoints
  • validate_scopes() - Check provider permissions
  • expose() - Return parameters calculated during execution for use in workflows
The public methods notify() and query() wrap the private implementations (_notify() and _query()) with additional capabilities like enrichment and error handling. Always implement the private methods.

Provider discovery

Keep automatically discovers providers based on naming conventions:
  • Location: keep/providers/ directory
  • Directory naming: Must end with _provider (for example, slack_provider)
  • Main file: Must match directory name with .py extension (for example, slack_provider.py)
  • No explicit registration needed - just follow the naming convention

Implementation examples

Validate_config()

def validate_config(self):
    """Validate and process provider configuration."""
    self.authentication_config = YourserviceProviderAuthConfig(
        **self.config.authentication
    )

Dispose()

def dispose(self):
    """Cleanup any resources."""
    # Close connections, cleanup clients, etc.
    # Can just pass if no cleanup needed
    pass

Provider type extraction

The provider type is automatically extracted from your class name:
  • YourserviceProvideryourservice
  • ServiceNowProviderservice.now
  • DatadogProviderdatadog
This happens via the _extract_type() method in BaseProvider.

Provider attributes

Providers should define the following class attributes:
  • PROVIDER_DISPLAY_NAME: String used for UI display (for example, “Slack”)
  • PROVIDER_CATEGORY: List of categories from the allowed values (see Provider Categories section)
  • PROVIDER_COMING_SOON: Boolean flag to mark providers as not ready (default: False)
  • WEBHOOK_INSTALLATION_REQUIRED: Boolean to make webhook setup mandatory in UI (default: False)
  • PROVIDER_TAGS: List of tags describing provider capabilities (for example, [“alert”, “messaging”])
  • PROVIDER_SCOPES: List of ProviderScope objects defining required permissions
  • PROVIDER_METHODS: List of ProviderMethod objects for additional capabilities (see Provider Methods)
  • FINGERPRINT_FIELDS: List of field names used to calculate alert fingerprints
  • OAUTH2_URL: OAuth 2.0 authorization URL if provider supports OAuth 2.0 authentication

Provider categories

Providers must specify one or more categories from the following list:
PROVIDER_CATEGORY: list[Literal[
    "AI", "Monitoring", "Incident Management", "Cloud Infrastructure",
    "Ticketing", "Identity", "Developer Tools", "Database",
    "Identity and Access Management", "Security", "Collaboration",
    "Organizational Tools", "CRM", "Queues", "Orchestration", "Others"
]]

Provider tags

Valid options for PROVIDER_TAGS:
  • "alert" - Provider handles alerts
  • "ticketing" - Provider manages tickets
  • "messaging" - Provider sends messages
  • "data" - Provider queries data
  • "queue" - Provider manages queues
  • "topology" - Provider provides topology data
  • "incident" - Provider manages incidents

Provider scope

@dataclass
class ProviderScope:
    """
    Provider scope model.

    Args:
        name (str): The name of the scope.
        description (Optional[str]): The description of the scope.
        mandatory (bool): Whether the scope is mandatory.
        mandatory_for_webhook (bool): Whether the scope is mandatory for webhook auto installation.
        documentation_url (Optional[str]): The documentation url of the scope.
        alias (Optional[str]): Another alias of the scope.
    """

    name: str
    description: Optional[str] = None
    mandatory: bool = False
    mandatory_for_webhook: bool = False
    documentation_url: Optional[str] = None
    alias: Optional[str] = None

Provider config

@dataclass
class ProviderConfig:
    """
    Provider configuration model.

    Args:
        description (Optional[str]): The description of the provider.
        authentication (dict): The configuration for the provider.
    """

    authentication: Optional[dict]
    name: Optional[str] = None
    description: Optional[str] = None

    def __post_init__(self):
        if not self.authentication:
            return
        for key, value in self.authentication.items():
            if (
                isinstance(value, str)
                and value.startswith("{{")
                and value.endswith("}}")
            ):
                self.authentication[key] = chevron.render(value, {"env": os.environ})

Base provider

"""
Base class for all providers.
"""
class BaseProvider(metaclass=abc.ABCMeta):
    OAUTH2_URL = None
    PROVIDER_SCOPES: list[ProviderScope] = []
    PROVIDER_METHODS: list[ProviderMethod] = []
    FINGERPRINT_FIELDS: list[str] = []
    PROVIDER_TAGS: list[
        Literal["alert", "ticketing", "messaging", "data", "queue", "topology", "incident"]
    ] = []
    PROVIDER_DISPLAY_NAME: str = None
    PROVIDER_CATEGORY: list[str] = []
    PROVIDER_COMING_SOON: bool = False
    WEBHOOK_INSTALLATION_REQUIRED: bool = False

    def __init__(
        self,
        context_manager: ContextManager,
        provider_id: str,
        config: ProviderConfig,
        webhook_template: Optional[str] = None,
        webhook_description: Optional[str] = None,
        webhook_markdown: Optional[str] = None,
        provider_description: Optional[str] = None,
    ):
        """
        Initialize a provider.

        Args:
            provider_id (str): The provider id.
            **kwargs: Provider configuration loaded from the provider yaml file.
        """
        self.provider_id = provider_id

        self.config = config
        self.webhook_template = webhook_template
        self.webhook_description = webhook_description
        self.provider_description = provider_description
        self.context_manager = context_manager
        self.logger = context_manager.get_logger()
        self.validate_config()
        self.logger.debug(
            "Base provider initalized", extra={"provider": self.__class__.__name__}
        )
        self.provider_type = self._extract_type()
        self.results = []
        # tb: we can have this overriden by customer configuration, when initializing the provider
        self.fingerprint_fields = self.FINGERPRINT_FIELDS

    def _extract_type(self):
        """
        Extract the provider type from the provider class name.

        Returns:
            str: The provider type.
        """
        name = self.__class__.__name__
        name_without_provider = name.replace("Provider", "")
        name_with_spaces = (
            re.sub("([A-Z])", r" \1", name_without_provider).lower().strip()
        )
        return name_with_spaces.replace(" ", ".")

    @abc.abstractmethod
    def dispose(self):
        """
        Dispose of the provider.
        """
        raise NotImplementedError("dispose() method not implemented")

    @abc.abstractmethod
    def validate_config(self):
        """
        Validate provider configuration.
        """
        raise NotImplementedError("validate_config() method not implemented")

    def validate_scopes(self) -> dict[str, bool | str]:
        """
        Validate provider scopes.

        Returns:
            dict: where key is the scope name and value is whether the scope is valid (True boolean) or string with error message.
        """
        return {}

    def notify(self, **kwargs):
        """
        Output alert message.

        Args:
            **kwargs (dict): The provider context (with statement)
        """
        # trigger the provider
        results = self._notify(**kwargs)
        self.results.append(results)
        # if the alert should be enriched, enrich it
        enrich_alert = kwargs.get("enrich_alert", [])
        if not enrich_alert or not results:
            return results if results else None

        self._enrich(enrich_alert, results)
        return results

    def _enrich(self, enrichments, results, audit_enabled=True):
        """
        Enrich alert or incident with provider specific data.
        
        This method replaces the deprecated _enrich_alert method and supports both
        alert and incident enrichment.
        
        Args:
            enrichments: List of enrichment configurations
            results: Results from the provider action
            audit_enabled: Whether to audit the enrichment operation (default: True)
        """
        self.logger.debug("Extracting the fingerprint from the alert")
        if "fingerprint" in results:
            fingerprint = results["fingerprint"]
        elif self.context_manager.foreach_context.get("value", {}):
            # TODO: if it's zipped, we need to extract the fingerprint from the zip (i.e. multiple foreach)
            fingerprint = self.context_manager.foreach_context.get("value", {}).get(
                "fingerprint"
            )
        # else, if we are in an event context, use the event fingerprint
        elif self.context_manager.event_context:
            # TODO: map all cases event_context is dict and update them to the DTO
            #       and remove this if statement
            if isinstance(self.context_manager.event_context, dict):
                fingerprint = self.context_manager.event_context.get("fingerprint")
            # Alert DTO
            else:
                fingerprint = self.context_manager.event_context.fingerprint
        else:
            fingerprint = None

        if not fingerprint:
            self.logger.error(
                "No fingerprint found for alert enrichment",
                extra={"provider": self.provider_id},
            )
            raise Exception("No fingerprint found for alert enrichment")
        self.logger.debug("Fingerprint extracted", extra={"fingerprint": fingerprint})

        _enrichments = {}
        # enrich only the requested fields
        for enrichment in enrichments:
            try:
                if enrichment["value"].startswith("results."):
                    val = enrichment["value"].replace("results.", "")
                    parts = val.split(".")
                    r = copy.copy(results)
                    for part in parts:
                        r = r[part]
                    _enrichments[enrichment["key"]] = r
                else:
                    _enrichments[enrichment["key"]] = enrichment["value"]
            except Exception:
                self.logger.error(
                    f"Failed to enrich alert - enrichment: {enrichment}",
                    extra={"fingerprint": fingerprint, "provider": self.provider_id},
                )
                continue
        self.logger.info("Enriching alert", extra={"fingerprint": fingerprint})
        try:
            enrich_alert(self.context_manager.tenant_id, fingerprint, _enrichments)
        except Exception as e:
            self.logger.error(
                "Failed to enrich alert in db",
                extra={"fingerprint": fingerprint, "provider": self.provider_id},
            )
            raise e
        self.logger.info("Alert enriched", extra={"fingerprint": fingerprint})

    def _notify(self, **kwargs):
        """
        Output alert message.

        Args:
            **kwargs (dict): The provider context (with statement)
        """
        raise NotImplementedError("notify() method not implemented")

    def _query(self, **kwargs: dict):
        """
        Query the provider using the given query

        Args:
            kwargs (dict): The provider context (with statement)

        Raises:
            NotImplementedError: _description_
        """
        raise NotImplementedError("query() method not implemented")

    def query(self, **kwargs: dict):
        # just run the query
        results = self._query(**kwargs)
        # now add the type of the results to the global context
        if results and isinstance(results, list):
            self.context_manager.dependencies.add(results[0].__class__)
        elif results:
            self.context_manager.dependencies.add(results.__class__)

        enrich_alert = kwargs.get("enrich_alert", [])
        if enrich_alert:
            self._enrich(enrich_alert, results)
        # and return the results
        return results

    @staticmethod
    def _format_alert(
        event: dict | list[dict], provider_instance: "BaseProvider" = None
    ) -> AlertDto | list[AlertDto]:
        """
        Format incoming event(s) into AlertDto object(s).
        
        Args:
            event: Single event dict or list of event dicts
            provider_instance: Optional provider instance for context
            
        Returns:
            AlertDto or list of AlertDto objects
        """
        raise NotImplementedError("format_alert() method not implemented")

    @classmethod
    def format_alert(cls, event: dict) -> AlertDto | list[AlertDto]:
        logger = logging.getLogger(__name__)
        logger.debug("Formatting alert")
        formatted_alert = cls._format_alert(event)
        logger.debug("Alert formatted")
        return formatted_alert

    @staticmethod
    def get_alert_fingerprint(alert: AlertDto, fingerprint_fields: list = []) -> str:
        """
        Get the fingerprint of an alert.

        Args:
            event (AlertDto): The alert to get the fingerprint of.
            fingerprint_fields (list, optional): The fields we calculate the fingerprint upon. Defaults to [].

        Returns:
            str: hexdigest of the fingerprint or the event.name if no fingerprint_fields were given.
        """
        if not fingerprint_fields:
            return alert.name
        fingerprint = hashlib.sha256()
        event_dict = alert.dict()
        for fingerprint_field in fingerprint_fields:
            fingerprint_field_value = event_dict.get(fingerprint_field, None)
            if isinstance(fingerprint_field_value, (list, dict)):
                fingerprint_field_value = json.dumps(fingerprint_field_value)
            if fingerprint_field_value:
                fingerprint.update(str(fingerprint_field_value).encode())
        return fingerprint.hexdigest()

    def get_alerts_configuration(self, alert_id: Optional[str] = None):
        """
        Get configuration of alerts from the provider.

        Args:
            alert_id (Optional[str], optional): If given, gets a specific alert by id. Defaults to None.
        """
        # todo: we'd want to have a common alert model for all providers (also for consistent output from GPT)
        raise NotImplementedError("get_alerts() method not implemented")

    def deploy_alert(self, alert: dict, alert_id: Optional[str] = None):
        """
        Deploy an alert to the provider.

        Args:
            alert (dict): The alert to deploy.
            alert_id (Optional[str], optional): If given, deploys a specific alert by id. Defaults to None.
        """
        raise NotImplementedError("deploy_alert() method not implemented")

    def _get_alerts(self) -> list[AlertDto]:
        """
        Get alerts from the provider.
        """
        raise NotImplementedError("get_alerts() method not implemented")

    def get_alerts(self) -> list[AlertDto]:
        """
        Get alerts from the provider.
        """
        with tracer.start_as_current_span(f"{self.__class__.__name__}-get_alerts"):
            alerts = self._get_alerts()
            # enrich alerts with provider id
            for alert in alerts:
                alert.providerId = self.provider_id
            return alerts

    def get_alerts_by_fingerprint(self, tenant_id: str) -> dict[str, list[AlertDto]]:
        """
        Get alerts from the provider grouped by fingerprint, sorted by lastReceived.

        Returns:
            dict[str, list[AlertDto]]: A dict of alerts grouped by fingerprint, sorted by lastReceived.
        """
        alerts = self.get_alerts()

        if not alerts:
            return {}

        # get alerts, group by fingerprint and sort them by lastReceived
        with tracer.start_as_current_span(f"{self.__class__.__name__}-get_last_alerts"):
            get_attr = operator.attrgetter("fingerprint")
            grouped_alerts = {
                fingerprint: list(alerts)
                for fingerprint, alerts in itertools.groupby(
                    sorted(
                        alerts,
                        key=get_attr,
                    ),
                    get_attr,
                )
            }

        # enrich alerts
        with tracer.start_as_current_span(f"{self.__class__.__name__}-enrich_alerts"):
            pulled_alerts_enrichments = get_enrichments(
                tenant_id=tenant_id,
                fingerprints=grouped_alerts.keys(),
            )
            for alert_enrichment in pulled_alerts_enrichments:
                if alert_enrichment:
                    alerts_to_enrich = grouped_alerts.get(
                        alert_enrichment.alert_fingerprint
                    )
                    for alert_to_enrich in alerts_to_enrich:
                        parse_and_enrich_deleted_and_assignees(
                            alert_to_enrich, alert_enrichment.enrichments
                        )
                        for enrichment in alert_enrichment.enrichments:
                            # set the enrichment
                            setattr(
                                alert_to_enrich,
                                enrichment,
                                alert_enrichment.enrichments[enrichment],
                            )

        return grouped_alerts

    def setup_webhook(
        self, tenant_id: str, keep_api_url: str, api_key: str, setup_alerts: bool = True
    ) -> dict | None:
        """
        Setup a webhook for the provider.

        Args:
            tenant_id (str): The tenant ID
            keep_api_url (str): The Keep API URL for webhook callbacks
            api_key (str): The API key for authentication
            setup_alerts (bool, optional): Whether to setup alerts. Defaults to True.

        Returns:
            dict | None: Dictionary of secrets to be saved if any, None otherwise
            
        Raises:
            NotImplementedError: If not implemented by the provider
        """
        raise NotImplementedError("setup_webhook() method not implemented")

    @staticmethod
    def get_alert_schema() -> dict:
        """
        Get the alert schema description for the provider.
            e.g. How to define an alert for the provider that can be pushed via the API.

        Returns:
            str: The alert format description.
        """
        raise NotImplementedError(
            "get_alert_format_description() method not implemented"
        )

    @staticmethod
    def oauth2_logic(**payload) -> dict:
        """
        Logic for oauth2 authentication.

        For example, in Slack oauth2, we need to get the code from the payload and exchange it for a token.

        return: dict: The secrets to be saved as the provider configuration. (e.g. the Slack access token)
        """
        raise NotImplementedError("oauth2_logic() method not implemented")

    @staticmethod
    def parse_event_raw_body(raw_body: bytes | dict) -> dict:
        """
        Parse the raw body of an event and create an ingestible dict from it.

        For instance, in parseable, the "event" is just a string
        > b'Alert: Server side error triggered on teststream1\nMessage: server reporting status as 500\nFailing Condition: status column equal to abcd, 2 times'
        and we want to return an object
        > {'alert': 'Server side error triggered on teststream1', 'message': 'server reporting status as 500', 'failing_condition': 'status column equal to abcd, 2 times'}

        If this method is not implemented for a provider, it should convert the raw body to a dict.

        Args:
            raw_body (bytes | dict): The raw body of the incoming event (can be bytes or dict)

        Returns:
            dict: Ingestible event dictionary
        """
        if isinstance(raw_body, dict):
            return raw_body
        return raw_body

    def get_logs(self, limit: int = 5) -> list:
        """
        Get logs from the provider.

        Args:
            limit (int): The number of logs to get.
        """
        raise NotImplementedError("get_logs() method not implemented")

    def expose(self):
        """Expose parameters that were calculated during query time.

        Each provider can expose parameters that were calculated during query time.
        E.g. parameters that were supplied by the user and were rendered by the provider.

        A concrete example is the "_from" and "to" of the Datadog Provider which are calculated during execution.
        """
        # TODO - implement dynamically using decorators and
        return {}

    def start_consume(self):
        """Get the consumer for the provider.

        should be implemented by the provider if it has a consumer.

        for an example, see Kafka Provider

        Returns:
            Consumer: The consumer for the provider.
        """
        return

    def status(self) -> bool:
        """Return the status of the provider.

        Returns:
            bool: The status of the provider.
        """
        return {
            "status": "should be implemented by the provider if it has a consumer",
            "error": "",
        }

    @property
    def is_consumer(self) -> bool:
        """Return consumer if the inherited class has a start_consume method.

        Returns:
            bool: _description_
        """
        return self.start_consume.__qualname__ != "BaseProvider.start_consume"

    def _push_alert(self, alert: dict):
        """
        Push an alert to the provider.

        Args:
            alert (dict): The alert to push.
        """
        # if this is not a dict, try to convert it to a dict
        if not isinstance(alert, dict):
            try:
                alert_data = json.loads(alert)
            except Exception:
                alert_data = alert_data
        else:
            alert_data = alert

        # if this is still not a dict, we can't push it
        if not isinstance(alert_data, dict):
            self.logger.warning(
                "We currently support only alert represented as a dict, dismissing alert",
                extra={"alert": alert},
            )
            return
        # now try to build the alert model
        # we will have a lot of default values here to support all providers and all cases, the
        # way to fine tune those would be to use the provider specific model or enforce that the event from the queue will be casted into the fields
        alert_model = AlertDto(
            id=alert_data.get("id", str(uuid.uuid4())),
            name=alert_data.get("name", "alert-from-event-queue"),
            status=alert_data.get("status", AlertStatus.FIRING),
            lastReceived=alert_data.get("lastReceived", datetime.datetime.now()),
            environment=alert_data.get("environment", "alert-from-event-queue"),
            isDuplicate=alert_data.get("isDuplicate", False),
            duplicateReason=alert_data.get("duplicateReason", None),
            service=alert_data.get("service", "alert-from-event-queue"),
            source=alert_data.get("source", [self.provider_type]),
            message=alert_data.get("message", "alert-from-event-queue"),
            description=alert_data.get("description", "alert-from-event-queue"),
            severity=alert_data.get("severity", AlertSeverity.INFO),
            pushed=alert_data.get("pushed", False),
            event_id=alert_data.get("event_id", str(uuid.uuid4())),
            url=alert_data.get("url", None),
            fingerprint=alert_data.get("fingerprint", None),
        )
        # push the alert to the provider
        url = f'{os.environ["KEEP_API_URL"]}/alerts/event'
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "X-API-KEY": self.context_manager.api_key,
        }
        response = requests.post(url, json=alert_model.dict(), headers=headers)
        try:
            response.raise_for_status()
            self.logger.info("Alert pushed successfully")
        except Exception:
            self.logger.error(
                f"Failed to push alert to {self.provider_id}: {response.content}"
            )

Provider types and capabilities

Base provider types

Keep supports several base provider types, each with specific capabilities:
  1. BaseProvider (keep/providers/base/base_provider.py)
    • Basic provider capabilities
    • Methods: _notify(), _query(), _get_alerts()
    • Use for: General integrations
  2. BaseTopologyProvider (keep/providers/base/base_provider.py)
    • Extends BaseProvider
    • Methods: pull_topology()
    • Use for: Services that provide infrastructure topology data
    • Example: Datadog Provider (keep/providers/datadog_provider/datadog_provider.py)
  3. BaseIncidentProvider (keep/providers/base/base_provider.py)
    • Extends BaseProvider
    • Methods: _get_incidents(), _format_incident() (static), format_incident() (classmethod), setup_incident_webhook()
    • Use for: Incident management systems
    • Example: PagerDuty Provider (keep/providers/pagerduty_provider/pagerduty_provider.py)

Common capabilities

1. Notification (_notify)

Send alerts or messages to external services:
def _notify(self, title: str, description: str = "", **kwargs) -> dict:
    # Implementation

2. Query (_query)

Fetch data from external services:
def _query(self, query: str, **kwargs) -> list:
    # Implementation

3. Alert Fetching (_get_alerts)

Pull alerts for monitoring:
def _get_alerts(self) -> List[AlertDto]:
    # Implementation

4. Webhook support

Handle incoming webhooks:
@staticmethod
def parse_event_raw_body(raw_body: bytes | str) -> dict:
    # Parse webhook payload
    
@staticmethod
def _format_alert(event: dict, provider_instance: "BaseProvider" = None) -> AlertDto | list[AlertDto]:
    # Format webhook events into alerts

5. OAuth 2.0 support

Handle OAuth 2.0 authentication:
# IMPORTANT: Define OAUTH2_URL as a class attribute at the class level, NOT in __init__
class YourserviceProvider(BaseProvider):
    OAUTH2_URL = os.environ.get("YOURSERVICE_OAUTH2_URL")  # Must be at class level

@staticmethod
def oauth2_logic(**payload) -> dict:
    # OAuth 2.0 implementation

6. Consumer providers

For providers that consume messages from queues or streams:
def start_consume(self):
    """
    Start consuming messages from the provider.
    
    This method is called when Keep starts the provider as a consumer.
    Implement long-running consumption logic here.
    """
    # Example: Kafka consumer
    while True:
        message = self.consumer.poll()
        if message:
            self._push_alert(message)
            
@property
def is_consumer(self) -> bool:
    """Provider is automatically detected as consumer if start_consume is implemented."""
    return True  # Automatically set if start_consume is overridden
    
def status(self) -> dict:
    """Return the status of the consumer."""
    return {
        "status": "running" if self.consumer_active else "stopped",
        "error": self.last_error if hasattr(self, 'last_error') else ""
    }

Specialized base classes

Keep provides specialized base classes for specific provider types:

Base topology provider

For providers that manage infrastructure topology and service dependencies:
from keep.providers.base.base_topology_provider import BaseTopologyProvider

class MyTopologyProvider(BaseTopologyProvider):
    def pull_topology(self) -> tuple[list[TopologyServiceInDto], dict]:
        """
        Pull topology data from the provider.
        
        Returns:
            tuple: A tuple of (services list, edges dict)
        """
        # Implement topology fetching logic
        pass

BaseIncidentProvider

For providers that manage incidents and incident response:
from keep.providers.base.base_incident_provider import BaseIncidentProvider

class MyIncidentProvider(BaseIncidentProvider):
    def _get_incidents(self) -> list[IncidentDto]:
        """
        Fetch incidents from the provider (abstract method).
        
        Returns:
            list[IncidentDto]: List of incidents
        """
        # Implement incident fetching logic
        pass
    
    @staticmethod
    def _format_incident(
        event: dict, provider_instance: "BaseProvider" = None
    ) -> IncidentDto | list[IncidentDto]:
        """
        Format raw incident data into IncidentDto objects.
        
        Args:
            event: Raw incident data from webhook or API
            provider_instance: Optional provider instance for context
            
        Returns:
            IncidentDto or list of IncidentDto objects
        """
        # Implement incident formatting logic
        pass
    
    def setup_incident_webhook(
        self,
        tenant_id: str,
        keep_api_url: str,
        api_key: str,
        setup_alerts: bool = True,
    ) -> dict | None:
        """
        Setup webhook for incident updates.
        
        Args:
            tenant_id: Tenant identifier
            keep_api_url: Keep API URL for callbacks
            api_key: API key for authentication
            setup_alerts: Whether to also setup alert webhooks
            
        Returns:
            dict | None: Secrets to save if any
        """
        # Implement webhook setup logic
        pass
Note: The get_incidents() method is automatically provided by the base class and wraps _get_incidents(). The format_incident() class method handles provider loading and calls _format_incident().

Authentication configuration

Providers should define an authentication configuration class as a dataclass with proper field types and validation:
import dataclasses
import pydantic
from keep.validation.fields import HttpsUrl, NoSchemeUrl, UrlPort

@pydantic.dataclasses.dataclass
class MyProviderAuthConfig:
    """Configuration for MyProvider authentication."""
    
    api_key: str = dataclasses.field(
        metadata={
            "required": True,
            "description": "API Key for authentication",
            "sensitive": True,  # Masks the field value in UI
        }
    )
    
    api_url: HttpsUrl = dataclasses.field(
        default="https://api.example.com",
        metadata={
            "required": False,
            "description": "API endpoint URL (HTTPS only)",
            "documentation_url": "https://docs.example.com/api",
            "validation": "https_url",  # Maps to HttpsUrl validator
        }
    )
    
    host: NoSchemeUrl = dataclasses.field(
        metadata={
            "required": True,
            "description": "Service hostname",
            "hint": "example.com or 192.168.1.1",
            "validation": "no_scheme_url",  # Maps to NoSchemeUrl validator
        }
    )
    
    port: UrlPort = dataclasses.field(
        default=443,
        metadata={
            "required": False,
            "description": "Service port",
            "validation": "port",  # Validates port range 1-65535
        }
    )
    
    workspace_id: str = dataclasses.field(
        metadata={
            "required": True,
            "description": "Workspace identifier",
            "hint": "Can be found in Settings > Workspace",
        }
    )
    
    region: str = dataclasses.field(
        default="us-east-1",
        metadata={
            "required": False,
            "description": "Service region",
            "type": "select",  # Renders as dropdown in UI
            "options": ["us-east-1", "eu-west-1", "ap-south-1"],
        }
    )

Field validation

Keep provides built-in field validation through custom Pydantic field types:
Validation TypeField TypeDescriptionExample
"https_url"HttpsUrlValidates HTTPS URLs onlyhttps://api.example.com
"any_http_url"pydantic.AnyHttpUrlValidates any HTTP/HTTPS URLhttp://example.com
"no_scheme_url"NoSchemeUrlValidates URLs without schemeexample.com:8080
"port"UrlPortValidates port numbers (1-65535)443
"multihost_url"MultiHostUrlValidates multi-host URLsmongodb://host1:27017,host2:27017
"no_scheme_multihost_url"NoSchemeMultiHostUrlMulti-host URLs without schemehost1:9092,host2:9092
To use validation:
  1. Import the appropriate field type from keep.validation.fields
  2. Use it as the field type annotation
  3. Add the corresponding validation string in metadata
Example implementations:
# HTTPS-only webhook URL
webhook_url: HttpsUrl = dataclasses.field(
    metadata={
        "required": True,
        "description": "Webhook endpoint (HTTPS required)",
        "sensitive": True,
        "validation": "https_url",
    }
)

# Database connection with multiple hosts
connection_string: MultiHostUrl = dataclasses.field(
    metadata={
        "required": True,
        "description": "Database connection string",
        "hint": "mongodb://host1:27017,host2:27017/dbname",
        "validation": "multihost_url",
    }
)

# SSH connection
ssh_host: NoSchemeUrl = dataclasses.field(
    metadata={
        "required": True,
        "description": "SSH hostname or IP",
        "validation": "no_scheme_url",
    }
)

ssh_port: UrlPort = dataclasses.field(
    default=22,
    metadata={
        "required": False,
        "description": "SSH port",
        "validation": "port",
    }
)

Metadata fields reference

  • required: Whether the field is mandatory
  • description: Field description shown in UI
  • sensitive: Whether to mask the field value (for secrets)
  • hidden: Whether to hide the field in UI
  • documentation_url: Link to relevant documentation
  • hint: Help text for users
  • validation: Validation type string (see preceding table)
  • type: UI input type (for example, “select” for dropdown)
  • options: List of valid options for select fields
  • config_main_group: Group name for organizing fields in UI
  • config_sub_group: Sub-group name for nested organization
The validation system ensures that configuration values are valid before Keep instantiates the provider. Invalid values are rejected with clear error messages, improving the user experience and preventing runtime errors.

Testing your provider

1. Unit test

Create tests/test_yourservice_provider.py:
import pytest
from keep.providers.yourservice_provider.yourservice_provider import YourserviceProvider
from keep.providers.models.provider_config import ProviderConfig
from keep.contextmanager.contextmanager import ContextManager


def test_yourservice_provider_init():
    """Test provider initialization."""
    config = ProviderConfig(
        authentication={
            "api_endpoint": "https://api.yourservice.com",
            "api_key": "test-key",
        }
    )
    
    context_manager = ContextManager(tenant_id="test", workflow_id="test")
    provider = YourserviceProvider(
        context_manager=context_manager,
        provider_id="test",
        config=config
    )
    
    assert provider.authentication_config.api_endpoint == "https://api.yourservice.com"
    assert provider.authentication_config.api_key == "test-key"


@pytest.fixture
def mock_requests(monkeypatch):
    """Mock requests module."""
    import requests
    class MockResponse:
        def __init__(self, json_data, status_code=200):
            self.json_data = json_data
            self.status_code = status_code
        
        def json(self):
            return self.json_data
        
        def raise_for_status(self):
            pass
    
    def mock_post(*args, **kwargs):
        return MockResponse({"success": True})
    
    def mock_get(*args, **kwargs):
        return MockResponse({"alerts": []})
    
    monkeypatch.setattr(requests, "post", mock_post)
    monkeypatch.setattr(requests, "get", mock_get)


def test_yourservice_notify(mock_requests):
    """Test notification sending."""
    config = ProviderConfig(
        authentication={
            "api_endpoint": "https://api.yourservice.com",
            "api_key": "test-key",
        }
    )
    
    context_manager = ContextManager(tenant_id="test", workflow_id="test")
    provider = YourserviceProvider(
        context_manager=context_manager,
        provider_id="test",
        config=config
    )
    
    result = provider.notify(message="Test message")
    assert result["success"] is True

2. Integration test

Test with the provider factory:
def test_provider_factory_loading():
    """Test that provider loads correctly through factory."""
    from keep.providers.providers_factory import ProvidersFactory
    
    # Get provider class
    provider_class = ProvidersFactory.get_provider_class("yourservice")
    assert provider_class.__name__ == "YourserviceProvider"
    
    # Get all providers
    all_providers = ProvidersFactory.get_all_providers()
    yourservice = next((p for p in all_providers if p.type == "yourservice"), None)
    assert yourservice is not None
    assert yourservice.display_name == "YourService"

3. Manual testing

You can test your provider by running it directly:
cd keep
python -m keep.providers.yourservice_provider.yourservice_provider
The if __name__ == "__main__": block allows you to test provider initialization and basic capabilities. Add a test block to your provider for direct execution:
if __name__ == "__main__":
    # Test the provider directly
    import logging
    
    logging.basicConfig(level=logging.DEBUG, handlers=[logging.StreamHandler()])
    context_manager = ContextManager(
        tenant_id="singletenant",
        workflow_id="test",
    )
    
    # Initialize the provider with test config
    config = ProviderConfig(
        authentication={
            "api_endpoint": "https://api.yourservice.com",
            "api_key": "test-key",
        }
    )
    
    provider = YourserviceProvider(
        context_manager=context_manager,
        provider_id="test",
        config=config
    )
    
    # Test provider methods
    print("Provider initialized successfully!")
    
    # Test specific functionality
    try:
        result = provider._query("test query")
        print(f"Query result: {result}")
    except Exception as e:
        print(f"Query failed: {e}")

Best practices

1. Error handling

Always handle API errors gracefully:
from keep.exceptions.provider_exception import ProviderException

try:
    response = requests.get(url)
    response.raise_for_status()
except requests.exceptions.RequestException as e:
    raise ProviderException(f"Failed to fetch data: {str(e)}")

2. Logging

Use the provider’s logger:
self.logger.info("Fetching alerts from YourService")
self.logger.error(f"Failed to connect: {str(e)}")

3. Configuration validation

Validate configuration in validate_config():
def validate_config(self):
    self.authentication_config = YourserviceProviderAuthConfig(
        **self.config.authentication
    )
    
    # Additional validation
    if not self.authentication_config.api_endpoint.startswith("https://"):
        raise ValueError("API endpoint must use HTTPS")

4. Alert formatting

When returning alerts, use Keep’s standard format:
from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus

alert = AlertDto(
    id="unique-alert-id",
    name="Alert Title",
    description="Detailed description",
    severity=AlertSeverity.HIGH,
    status=AlertStatus.FIRING,
    lastReceived=datetime.now().isoformat(),
    source=["yourservice"],
    fingerprint="unique-fingerprint",
    labels={"key": "value"},
    annotations={"runbook": "https://docs.example.com"},
)

5. Secrets management

Never hardcode secrets. Use environment variables or configuration:
client_id = os.environ.get("YOURSERVICE_CLIENT_ID")
if not client_id:
    raise ProviderException("YOURSERVICE_CLIENT_ID environment variable not set")

Common patterns

1. Provider health checks

Implement health monitoring using the ProviderHealthMixin:
from keep.providers.base.base_provider import BaseProvider, ProviderHealthMixin

class YourserviceProvider(BaseProvider, ProviderHealthMixin):
    HAS_HEALTH_CHECK = True
    
    # The mixin provides automatic health checking for:
    # - Topology coverage validation
    # - Spammy alerts detection
    # - Alerting rule usage monitoring
The health check mixin is particularly useful for monitoring providers that collect topology data or handle high volumes of alerts.

2. Pagination

Handle paginated API responses:
def _get_all_items(self):
    items = []
    page = 1
    
    while True:
        response = self._query_page(page)
        items.extend(response["items"])
        
        if not response.get("has_next"):
            break
        page += 1
    
    return items

3. Rate limiting

Respect API rate limits:
import time
from typing import Any

def _rate_limited_request(self, url: str, **kwargs) -> Any:
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            response = requests.get(url, **kwargs)
            if response.status_code == 429:  # Rate limited
                retry_after = int(response.headers.get("Retry-After", 60))
                self.logger.warning(f"Rate limited, waiting {retry_after}s")
                time.sleep(retry_after)
                continue
            response.raise_for_status()
            return response.json()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

4. Caching

Cache frequently accessed data:
from datetime import datetime, timedelta

class YourserviceProvider(BaseProvider):
    def __init__(self, context_manager, provider_id, config):
        super().__init__(context_manager, provider_id, config)
        self._cache = {}
        self._cache_ttl = timedelta(minutes=5)
    
    def _get_cached_data(self, key: str) -> Any:
        if key in self._cache:
            data, timestamp = self._cache[key]
            if datetime.now() - timestamp < self._cache_ttl:
                return data
        return None
    
    def _set_cached_data(self, key: str, data: Any):
        self._cache[key] = (data, datetime.now())

5. Webhook signature verification

Verify webhook authenticity:
import hmac
import hashlib

@staticmethod
def verify_webhook_signature(raw_body: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(
        secret.encode(),
        raw_body,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

6. Exposing runtime parameters

Use the expose() method to make runtime-calculated values available to workflows:
class YourserviceProvider(BaseProvider):
    def __init__(self, context_manager, provider_id, config):
        super().__init__(context_manager, provider_id, config)
        self._from_timestamp = None
        self._to_timestamp = None
    
    def _query(self, metric: str, from_time: str = "1h", **kwargs):
        # Calculate actual timestamps
        self._to_timestamp = datetime.now()
        self._from_timestamp = self._to_timestamp - parse_duration(from_time)
        
        # Query with calculated timestamps
        return self._fetch_metrics(metric, self._from_timestamp, self._to_timestamp)
    
    def expose(self):
        """Expose calculated parameters for workflow use."""
        exposed = {}
        if self._from_timestamp:
            exposed["from"] = self._from_timestamp.isoformat()
        if self._to_timestamp:
            exposed["to"] = self._to_timestamp.isoformat()
        return exposed
This allows workflows to access the actual timestamps used in queries, not just the relative time strings.

Complete provider example

Here’s a minimal example of a complete provider implementation:
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig
from keep.contextmanager.contextmanager import ContextManager

class MyProvider(BaseProvider):
    PROVIDER_DISPLAY_NAME = "My Service"
    PROVIDER_CATEGORY = ["Monitoring", "Incident Management"]
    PROVIDER_TAGS = ["alert", "messaging"]
    
    def __init__(
        self,
        context_manager: ContextManager,
        provider_id: str,
        config: ProviderConfig,
        webhook_template: Optional[str] = None,
        webhook_description: Optional[str] = None,
        webhook_markdown: Optional[str] = None,
        provider_description: Optional[str] = None,
    ):
        super().__init__(
            context_manager, provider_id, config, 
            webhook_template, webhook_description,
            webhook_markdown, provider_description
        )
        
    def validate_config(self):
        # Validate the provider configuration
        pass
        
    def dispose(self):
        # Clean up resources
        pass
        
    def _query(self, **kwargs):
        # Implement query logic
        pass
        
    def _notify(self, **kwargs):
        # Implement notification logic
        pass

File references

  • Base Provider Classes: keep/providers/base/base_provider.py
  • Provider Models: keep/providers/models/
  • Provider Factory: keep/providers/providers_factory.py
  • Provider Exceptions: keep/exceptions/provider_exception.py
  • Example Providers:
    • Simple: keep/providers/slack_provider/slack_provider.py
    • Complex: keep/providers/datadog_provider/datadog_provider.py
    • Database: keep/providers/clickhouse_provider/clickhouse_provider.py
    • Incident: keep/providers/pagerduty_provider/pagerduty_provider.py
    • Topology: keep/providers/datadog_provider/datadog_provider.py
  • Tests: tests/test_*_provider.py
  • Documentation: docs/providers/documentation/
  • Additional Docs:
    • docs/providers/adding-a-new-provider.mdx
    • docs/providers/provider-methods.mdx
    • docs/providers/linked-providers.mdx

Checklist

  • Create provider directory and files
  • Implement AuthConfig class with proper metadata
  • Implement provider class with required methods
  • Add provider to __init__.py
  • Set appropriate PROVIDER_DISPLAY_NAME, PROVIDER_CATEGORY, and PROVIDER_TAGS
  • Implement validate_config() and dispose()
  • Add at least one capability (_notify, _query, or _get_alerts)
  • Create documentation in docs/providers/documentation/
  • Write unit tests
  • Test with provider factory
  • Handle errors gracefully
  • Add logging statements
  • Validate in Keep UI
  • If supporting webhooks, implement _format_alert() static method
  • If supporting OAuth 2.0, set OAUTH2_URL as class attribute
  • Consider implementing validate_scopes() for scope validation
  • Consider implementing get_provider_metadata() for provider versioning

Getting help