Providers
Adding a new Provider
Under contstruction
Basics
- BaseProvider is the base class every provider needs to inherit from
- BaseProvider exposes 4 important functions:
query(self, **kwargs: dict)
which is used to query the provider in stepsnotify(self, **kwargs: dict)
which is used to notify via the provider in actionsdispose(self)
which is used to dispose the provider after usage (e.g. close the connection to the DB)validate_config(self)
which is used to validate the configuration passed to the Provider
- And 4 functions that are not required:
get_alerts(self)
which is used to fetch configured alerts (not the currently active alerts)deploy_alert(self, alert: dict, alert_id: Optional[str]
which is used to deploy an alert to the providerget_alert_schema(self)
which is used to describe the provider’s API schema of how to deploy alertget_logs(self, limit)
which is used to fetch logs from the provider (currently used by the AI layer to generate more accurate results)
- Providers must be located in the providers directory
- Provider directory must start with the provider’s unique identifier followed by underscore+provider (e.g.
slack_provider
) - Provider file name must start with the provider’s unique identifier followed by underscore+provider+.py (e.g.
slack_provider.py
)
ProviderScope
@dataclass
class ProviderScope:
"""
Provider scope model.
Args:
name (str): The name of the scope.
description (Optional[str]): The description of the scope.
mandatory (bool): Whether the scope is mandatory.
mandatory_for_webhook (bool): Whether the scope is mandatory for webhook auto installation.
documentation_url (Optional[str]): The documentation url of the scope.
alias (Optional[str]): Another alias of the scope.
"""
name: str
description: Optional[str] = None
mandatory: bool = False
mandatory_for_webhook: bool = False
documentation_url: Optional[str] = None
alias: Optional[str] = None
ProviderConfig
@dataclass
class ProviderConfig:
"""
Provider configuration model.
Args:
description (Optional[str]): The description of the provider.
authentication (dict): The configuration for the provider.
"""
authentication: Optional[dict]
name: Optional[str] = None
description: Optional[str] = None
def __post_init__(self):
if not self.authentication:
return
for key, value in self.authentication.items():
if (
isinstance(value, str)
and value.startswith("{{")
and value.endswith("}}")
):
self.authentication[key] = chevron.render(value, {"env": os.environ})
BaseProvider
"""
Base class for all providers.
"""
class BaseProvider(metaclass=abc.ABCMeta):
OAUTH2_URL = None
PROVIDER_SCOPES: list[ProviderScope] = []
PROVIDER_METHODS: list[ProviderMethod] = []
FINGERPRINT_FIELDS: list[str] = []
PROVIDER_TAGS: list[
Literal["alert", "ticketing", "messaging", "data", "queue"]
] = []
def __init__(
self,
context_manager: ContextManager,
provider_id: str,
config: ProviderConfig,
webhooke_template: Optional[str] = None,
webhook_description: Optional[str] = None,
provider_description: Optional[str] = None,
):
"""
Initialize a provider.
Args:
provider_id (str): The provider id.
**kwargs: Provider configuration loaded from the provider yaml file.
"""
self.provider_id = provider_id
self.config = config
self.webhooke_template = webhooke_template
self.webhook_description = webhook_description
self.provider_description = provider_description
self.context_manager = context_manager
self.logger = context_manager.get_logger()
self.validate_config()
self.logger.debug(
"Base provider initalized", extra={"provider": self.__class__.__name__}
)
self.provider_type = self._extract_type()
self.results = []
# tb: we can have this overriden by customer configuration, when initializing the provider
self.fingerprint_fields = self.FINGERPRINT_FIELDS
def _extract_type(self):
"""
Extract the provider type from the provider class name.
Returns:
str: The provider type.
"""
name = self.__class__.__name__
name_without_provider = name.replace("Provider", "")
name_with_spaces = (
re.sub("([A-Z])", r" \1", name_without_provider).lower().strip()
)
return name_with_spaces.replace(" ", ".")
@abc.abstractmethod
def dispose(self):
"""
Dispose of the provider.
"""
raise NotImplementedError("dispose() method not implemented")
@abc.abstractmethod
def validate_config():
"""
Validate provider configuration.
"""
raise NotImplementedError("validate_config() method not implemented")
def validate_scopes(self) -> dict[str, bool | str]:
"""
Validate provider scopes.
Returns:
dict: where key is the scope name and value is whether the scope is valid (True boolean) or string with error message.
"""
return {}
def notify(self, **kwargs):
"""
Output alert message.
Args:
**kwargs (dict): The provider context (with statement)
"""
# trigger the provider
results = self._notify(**kwargs)
self.results.append(results)
# if the alert should be enriched, enrich it
enrich_alert = kwargs.get("enrich_alert", [])
if not enrich_alert or not results:
return results if results else None
self._enrich_alert(enrich_alert, results)
return results
def _enrich_alert(self, enrichments, results):
"""
Enrich alert with provider specific data.
"""
self.logger.debug("Extracting the fingerprint from the alert")
if "fingerprint" in results:
fingerprint = results["fingerprint"]
elif self.context_manager.foreach_context.get("value", {}):
# TODO: if it's zipped, we need to extract the fingerprint from the zip (i.e. multiple foreach)
fingerprint = self.context_manager.foreach_context.get("value", {}).get(
"fingerprint"
)
# else, if we are in an event context, use the event fingerprint
elif self.context_manager.event_context:
# TODO: map all casses event_context is dict and update them to the DTO
# and remove this if statement
if isinstance(self.context_manager.event_context, dict):
fingerprint = self.context_manager.event_context.get("fingerprint")
# Alert DTO
else:
fingerprint = self.context_manager.event_context.fingerprint
else:
fingerprint = None
if not fingerprint:
self.logger.error(
"No fingerprint found for alert enrichment",
extra={"provider": self.provider_id},
)
raise Exception("No fingerprint found for alert enrichment")
self.logger.debug("Fingerprint extracted", extra={"fingerprint": fingerprint})
_enrichments = {}
# enrich only the requested fields
for enrichment in enrichments:
try:
if enrichment["value"].startswith("results."):
val = enrichment["value"].replace("results.", "")
parts = val.split(".")
r = copy.copy(results)
for part in parts:
r = r[part]
_enrichments[enrichment["key"]] = r
else:
_enrichments[enrichment["key"]] = enrichment["value"]
except Exception:
self.logger.error(
f"Failed to enrich alert - enrichment: {enrichment}",
extra={"fingerprint": fingerprint, "provider": self.provider_id},
)
continue
self.logger.info("Enriching alert", extra={"fingerprint": fingerprint})
try:
enrich_alert(self.context_manager.tenant_id, fingerprint, _enrichments)
except Exception as e:
self.logger.error(
"Failed to enrich alert in db",
extra={"fingerprint": fingerprint, "provider": self.provider_id},
)
raise e
self.logger.info("Alert enriched", extra={"fingerprint": fingerprint})
def _notify(self, **kwargs):
"""
Output alert message.
Args:
**kwargs (dict): The provider context (with statement)
"""
raise NotImplementedError("notify() method not implemented")
def _query(self, **kwargs: dict):
"""
Query the provider using the given query
Args:
kwargs (dict): The provider context (with statement)
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError("query() method not implemented")
def query(self, **kwargs: dict):
# just run the query
results = self._query(**kwargs)
# now add the type of the results to the global context
if results and isinstance(results, list):
self.context_manager.dependencies.add(results[0].__class__)
elif results:
self.context_manager.dependencies.add(results.__class__)
enrich_alert = kwargs.get("enrich_alert", [])
if enrich_alert:
self._enrich_alert(enrich_alert, results)
# and return the results
return results
@staticmethod
def _format_alert(event: dict) -> AlertDto | list[AlertDto]:
raise NotImplementedError("format_alert() method not implemented")
@classmethod
def format_alert(cls, event: dict) -> AlertDto | list[AlertDto]:
logger = logging.getLogger(__name__)
logger.debug("Formatting alert")
formatted_alert = cls._format_alert(event)
logger.debug("Alert formatted")
return formatted_alert
@staticmethod
def get_alert_fingerprint(alert: AlertDto, fingerprint_fields: list = []) -> str:
"""
Get the fingerprint of an alert.
Args:
event (AlertDto): The alert to get the fingerprint of.
fingerprint_fields (list, optional): The fields we calculate the fingerprint upon. Defaults to [].
Returns:
str: hexdigest of the fingerprint or the event.name if no fingerprint_fields were given.
"""
if not fingerprint_fields:
return alert.name
fingerprint = hashlib.sha256()
event_dict = alert.dict()
for fingerprint_field in fingerprint_fields:
fingerprint_field_value = event_dict.get(fingerprint_field, None)
if isinstance(fingerprint_field_value, (list, dict)):
fingerprint_field_value = json.dumps(fingerprint_field_value)
if fingerprint_field_value:
fingerprint.update(str(fingerprint_field_value).encode())
return fingerprint.hexdigest()
def get_alerts_configuration(self, alert_id: Optional[str] = None):
"""
Get configuration of alerts from the provider.
Args:
alert_id (Optional[str], optional): If given, gets a specific alert by id. Defaults to None.
"""
# todo: we'd want to have a common alert model for all providers (also for consistent output from GPT)
raise NotImplementedError("get_alerts() method not implemented")
def deploy_alert(self, alert: dict, alert_id: Optional[str] = None):
"""
Deploy an alert to the provider.
Args:
alert (dict): The alert to deploy.
alert_id (Optional[str], optional): If given, deploys a specific alert by id. Defaults to None.
"""
raise NotImplementedError("deploy_alert() method not implemented")
def _get_alerts(self) -> list[AlertDto]:
"""
Get alerts from the provider.
"""
raise NotImplementedError("get_alerts() method not implemented")
def get_alerts(self) -> list[AlertDto]:
"""
Get alerts from the provider.
"""
with tracer.start_as_current_span(f"{self.__class__.__name__}-get_alerts"):
alerts = self._get_alerts()
# enrich alerts with provider id
for alert in alerts:
alert.providerId = self.provider_id
return alerts
def get_alerts_by_fingerprint(self, tenant_id: str) -> dict[str, list[AlertDto]]:
"""
Get alerts from the provider grouped by fingerprint, sorted by lastReceived.
Returns:
dict[str, list[AlertDto]]: A dict of alerts grouped by fingerprint, sorted by lastReceived.
"""
alerts = self.get_alerts()
if not alerts:
return {}
# get alerts, group by fingerprint and sort them by lastReceived
with tracer.start_as_current_span(f"{self.__class__.__name__}-get_last_alerts"):
get_attr = operator.attrgetter("fingerprint")
grouped_alerts = {
fingerprint: list(alerts)
for fingerprint, alerts in itertools.groupby(
sorted(
alerts,
key=get_attr,
),
get_attr,
)
}
# enrich alerts
with tracer.start_as_current_span(f"{self.__class__.__name__}-enrich_alerts"):
pulled_alerts_enrichments = get_enrichments(
tenant_id=tenant_id,
fingerprints=grouped_alerts.keys(),
)
for alert_enrichment in pulled_alerts_enrichments:
if alert_enrichment:
alerts_to_enrich = grouped_alerts.get(
alert_enrichment.alert_fingerprint
)
for alert_to_enrich in alerts_to_enrich:
parse_and_enrich_deleted_and_assignees(
alert_to_enrich, alert_enrichment.enrichments
)
for enrichment in alert_enrichment.enrichments:
# set the enrichment
setattr(
alert_to_enrich,
enrichment,
alert_enrichment.enrichments[enrichment],
)
return grouped_alerts
def setup_webhook(
self, tenant_id: str, keep_api_url: str, api_key: str, setup_alerts: bool = True
):
"""
Setup a webhook for the provider.
Args:
tenant_id (str): _description_
keep_api_url (str): _description_
api_key (str): _description_
setup_alerts (bool, optional): _description_. Defaults to True.
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError("setup_webhook() method not implemented")
@staticmethod
def get_alert_schema() -> dict:
"""
Get the alert schema description for the provider.
e.g. How to define an alert for the provider that can be pushed via the API.
Returns:
str: The alert format description.
"""
raise NotImplementedError(
"get_alert_format_description() method not implemented"
)
@staticmethod
def oauth2_logic(**payload) -> dict:
"""
Logic for oauth2 authentication.
For example, in Slack oauth2, we need to get the code from the payload and exchange it for a token.
return: dict: The secrets to be saved as the provider configuration. (e.g. the Slack access token)
"""
raise NotImplementedError("oauth2_logic() method not implemented")
@staticmethod
def parse_event_raw_body(raw_body: bytes) -> bytes:
"""
Parse the raw body of an event and create an ingestable dict from it.
For instance, in parseable, the "event" is just a string
> b'Alert: Server side error triggered on teststream1\nMessage: server reporting status as 500\nFailing Condition: status column equal to abcd, 2 times'
and we want to return an object
> b"{'alert': 'Server side error triggered on teststream1', 'message': 'server reporting status as 500', 'failing_condition': 'status column equal to abcd, 2 times'}"
If this method is not implemented for a provider, just return the raw body.
Args:
raw_body (bytes): The raw body of the incoming event (/event endpoint in alerts.py)
Returns:
dict: Ingestable event
"""
return raw_body
def get_logs(self, limit: int = 5) -> list:
"""
Get logs from the provider.
Args:
limit (int): The number of logs to get.
"""
raise NotImplementedError("get_logs() method not implemented")
def expose(self):
"""Expose parameters that were calculated during query time.
Each provider can expose parameters that were calculated during query time.
E.g. parameters that were supplied by the user and were rendered by the provider.
A concrete example is the "_from" and "to" of the Datadog Provider which are calculated during execution.
"""
# TODO - implement dynamically using decorators and
return {}
def start_consume(self):
"""Get the consumer for the provider.
should be implemented by the provider if it has a consumer.
for an example, see Kafka Provider
Returns:
Consumer: The consumer for the provider.
"""
return
def status(self) -> bool:
"""Return the status of the provider.
Returns:
bool: The status of the provider.
"""
return {
"status": "should be implemented by the provider if it has a consumer",
"error": "",
}
@property
def is_consumer(self) -> bool:
"""Return consumer if the inherited class has a start_consume method.
Returns:
bool: _description_
"""
return self.start_consume.__qualname__ != "BaseProvider.start_consume"
def _push_alert(self, alert: dict):
"""
Push an alert to the provider.
Args:
alert (dict): The alert to push.
"""
# if this is not a dict, try to convert it to a dict
if not isinstance(alert, dict):
try:
alert_data = json.loads(alert)
except Exception:
alert_data = alert_data
else:
alert_data = alert
# if this is still not a dict, we can't push it
if not isinstance(alert_data, dict):
self.logger.warning(
"We currently support only alert represented as a dict, dismissing alert",
extra={"alert": alert},
)
return
# now try to build the alert model
# we will have a lot of default values here to support all providers and all cases, the
# way to fine tune those would be to use the provider specific model or enforce that the event from the queue will be casted into the fields
alert_model = AlertDto(
id=alert_data.get("id", str(uuid.uuid4())),
name=alert_data.get("name", "alert-from-event-queue"),
status=alert_data.get("status", AlertStatus.FIRING),
lastReceived=alert_data.get("lastReceived", datetime.datetime.now()),
environment=alert_data.get("environment", "alert-from-event-queue"),
isDuplicate=alert_data.get("isDuplicate", False),
duplicateReason=alert_data.get("duplicateReason", None),
service=alert_data.get("service", "alert-from-event-queue"),
source=alert_data.get("source", [self.provider_type]),
message=alert_data.get("message", "alert-from-event-queue"),
description=alert_data.get("description", "alert-from-event-queue"),
severity=alert_data.get("severity", AlertSeverity.INFO),
pushed=alert_data.get("pushed", False),
event_id=alert_data.get("event_id", str(uuid.uuid4())),
url=alert_data.get("url", None),
fingerprint=alert_data.get("fingerprint", None),
)
# push the alert to the provider
url = f'{os.environ["KEEP_API_URL"]}/alerts/event'
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-API-KEY": self.context_manager.api_key,
}
response = requests.post(url, json=alert_model.dict(), headers=headers)
try:
response.raise_for_status()
self.logger.info("Alert pushed successfully")
except Exception:
self.logger.error(
f"Failed to push alert to {self.provider_id}: {response.content}"
)