Skip to content

Incidents¤

The incidents module is the core of FireFighter, managing incident lifecycle, priorities, and integrations.

Priority System¤

FireFighter uses a 5-level priority system (P1-P5) to categorize incident severity:

Priority Levels¤

Priority Emoji Level Description Example
P1 🔥 Critical Complete service outage, system-wide failure Payment system down, entire website offline
P2 🚨 High Major functionality impaired, significant user impact Core feature broken, major performance degradation
P3 ⚠️ Medium Minor functionality affected, moderate impact Non-critical feature issue, isolated component problem
P4 📢 Low Small issues, minimal impact UI glitches, minor bugs in secondary features
P5 💡 Lowest Cosmetic issues, enhancement requests Typos, improvement suggestions, non-urgent requests

Priority Usage¤

Incident Creation: When creating an incident, the priority determines: - Notification urgency and channels - Automatic escalations (PagerDuty for P1/P2) - Jira ticket priority mapping - SLA expectations

Integration Mapping: - Jira: P1-P5 maps directly to Jira priorities 1-5 - PagerDuty: P1-P2 typically trigger immediate escalation - Slack: Higher priorities get broader notification reach

Priority Assignment Guidelines¤

P1 (Critical) - Use when: - Complete service unavailability - Data loss or corruption - Security breaches - Payment processing failures

P2 (High) - Use when: - Major feature completely broken - Significant performance degradation - Multiple users affected by the same issue

P3 (Medium) - Use when: - Single feature partially broken - Workaround exists - Limited user impact

P4 (Low) - Use when: - Minor UI issues - Edge case bugs - Enhancement requests with business value

P5 (Lowest) - Use when: - Cosmetic improvements - Documentation updates - Nice-to-have features

Priority Validation

Invalid priority values automatically fallback to P1 to ensure critical handling.

API Reference¤

incidents ¤

Modules:

admin ¤

Classes:

EnvironmentAdmin ¤

Bases: ModelAdmin[Environment]

Methods:

get_readonly_fields ¤

get_readonly_fields(request: HttpRequest, obj: Environment | None = None) -> _ListOrTuple[str]

Deny changing the value of an existing object.

Source code in src/firefighter/incidents/admin.py
def get_readonly_fields(
    self,
    request: HttpRequest,  # noqa: ARG002
    obj: Environment | None = None,
) -> _ListOrTuple[str]:
    """Deny changing the value of an existing object."""
    if obj:  # editing an existing object
        return *self.readonly_fields, "value"
    return self.readonly_fields

IncidentAdmin ¤

Bases: ModelAdmin[Incident]

Methods:

  • compute_and_purge_metrics

    Will compute metrics for selected incidents and delete metrics that can no longer be computed.

  • send_message

    Action to send a message in selected channels.

compute_and_purge_metrics ¤

compute_and_purge_metrics(request: HttpRequest, queryset: QuerySet[Incident]) -> None

Will compute metrics for selected incidents and delete metrics that can no longer be computed.

Source code in src/firefighter/incidents/admin.py
@action(description=_("Compute and purge metrics from milestones"))
def compute_and_purge_metrics(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> None:
    """Will compute metrics for selected incidents and delete metrics that can no longer be computed."""
    for incident in queryset:
        incident.compute_metrics(purge=True)
    self.message_user(
        request,
        ngettext(
            "Computed/purged metrics for %d incident.",
            "Computed/purged metrics for %d incidents.",
            len(queryset),
        )
        % len(queryset),
        constants.SUCCESS,
    )

send_message ¤

send_message(request: HttpRequest, queryset: QuerySet[Incident]) -> TemplateResponse | None

Action to send a message in selected channels. This action first displays a confirmation page to enter the message. Next, it sends the message on all selected objects and redirects back to the change list (other fn).

Source code in src/firefighter/incidents/admin.py
@action(
    description=_("Send message on conversation"),
)
def send_message(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> TemplateResponse | None:
    """Action to send a message in selected channels.
    This action first displays a confirmation page to enter the message.
    Next, it sends the message on all selected objects and redirects back to the change list (other fn).
    """
    opts = self.model._meta  # noqa: SLF001

    # Get all the targeted conversations
    target_conversations: list[IncidentChannel] = [
        inc.conversation for inc in queryset.all() if inc.conversation
    ]

    deletable_objects = target_conversations

    # The user has already entered the messages.
    # Send message(s) and return None to display the change list view again.
    if request.POST.get("post"):
        self.process_action_send_message(
            request=request, target_conversations=target_conversations
        )
        return None

    objects_name = model_ngettext(queryset)

    title = _("Type your message")

    context = {
        **self.admin_site.each_context(request),
        "title": title,
        "objects_name": str(objects_name),
        "deletable_objects": [deletable_objects],
        "queryset": queryset,
        "opts": opts,
        "action_checkbox_name": helpers.ACTION_CHECKBOX_NAME,
        "media": self.media,
    }

    # Display the message edit page
    return TemplateResponse(
        request, "admin/send_message_conversation.html", context
    )

IncidentRoleTypeAdmin ¤

Bases: ModelAdmin[IncidentRoleType]

IncidentRoleTypes are fixed at the moment (only commander and communication lead).

factories ¤

Classes:

PriorityFactory ¤

Bases: DjangoModelFactory[Priority]

Factory for creating Priority instances in tests.

Creates Priority objects with random values. Use specific values in tests when testing priority-specific behavior (e.g., P1-P5 JIRA mapping).

forms ¤

Modules:

select_impact ¤

Classes:

SelectImpactForm ¤

SelectImpactForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

  • save

    Save the impact choices to the incident.

  • suggest_priority_from_impact

    Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices.

Attributes:

  • business_impact_new (str | None) –

    Get business impact. Will return N/A, Lowest, Low, Medium, High or Highest.

Source code in src/firefighter/incidents/forms/select_impact.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    if "data" in kwargs:
        for key, value in kwargs["data"].items():
            if (
                isinstance(key, str)
                and key.startswith("set_impact_type_")
                and isinstance(value, str)
            ):
                impact_type_name = key[len("set_impact_type_") :]
                with contextlib.suppress(ImpactLevel.DoesNotExist):
                    kwargs["data"][key] = ImpactLevel.objects.get(
                        value=value, impact_type__value=impact_type_name
                    )

    super().__init__(*args, **kwargs)

    for impact_type in ImpactType.objects.all().order_by("order"):
        field_name = f"set_impact_type_{impact_type.value}"
        self.fields[field_name] = forms.ModelChoiceField(
            label=impact_type.emoji + " " + impact_type.name,
            queryset=impact_type.levels.all().order_by("-order"),
            help_text=impact_type.help_text,
            initial=impact_type.levels.get(value=LevelChoices.NONE.value),
        )
        self.fields[field_name].label_from_instance = (  # type: ignore[attr-defined]
            lambda obj: obj.emoji + " " + obj.name
        )
business_impact_new property ¤
business_impact_new: str | None

Get business impact. Will return N/A, Lowest, Low, Medium, High or Highest.

save ¤
save(incident: HasImpactProtocol) -> None

Save the impact choices to the incident.

Source code in src/firefighter/incidents/forms/select_impact.py
def save(self, incident: HasImpactProtocol) -> None:
    """Save the impact choices to the incident."""
    if self.is_valid():
        impacts: dict[str, ImpactLevel] = self.cleaned_data

        # Prepare a list of impact type slugs by removing the 'set_impact_type_' prefix
        impact_type_slugs: list[str] = [
            slug.replace("set_impact_type_", "") for slug in impacts
        ]

        # Fetch all relevant ImpactType instances in a single query
        impact_types: dict[str, ImpactType] = ImpactType.objects.in_bulk(
            impact_type_slugs, field_name="value"
        )

        impact_objects: list[Impact] = []

        for impact_type_slug, impact_level in impacts.items():
            impact_type_slug = impact_type_slug.replace(  # noqa: PLW2901
                "set_impact_type_", ""
            )
            impact_type = impact_types.get(impact_type_slug)
            if impact_type is None:
                logger.warning(
                    f"Could not find impact type {impact_type_slug} in incident {incident.id}"
                )
                continue

            impact = Impact(impact_type=impact_type, impact_level=impact_level)
            impact_objects.append(impact)

        # Create and save all Impact instances in one database call
        Impact.objects.bulk_create(impact_objects)
        impacts_related_manager: ManyRelatedManager[Impact] = cast(
            "ManyRelatedManager[Impact]",
            incident.impacts,
        )
        impacts_related_manager.add(*impact_objects)

    else:
        raise forms.ValidationError("Form is not valid")
suggest_priority_from_impact ¤
suggest_priority_from_impact() -> int

Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices.

Source code in src/firefighter/incidents/forms/select_impact.py
def suggest_priority_from_impact(self) -> int:
    """Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices."""
    if self.is_valid():
        impact: dict[str, ImpactLevel] = self.cleaned_data

        impact_values = [impact_type.value for impact_type in impact.values()]
        priorities = [level.priority for level in LevelChoices if level in impact_values]
        return min(priorities) if priorities else LevelChoices.NONE.priority
    return LevelChoices.NONE.priority

update_key_events ¤

Classes:

IncidentUpdateKeyEventsForm ¤

IncidentUpdateKeyEventsForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

Source code in src/firefighter/incidents/forms/update_key_events.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
get_milestones_with_data ¤
get_milestones_with_data(milestones_definitions: Iterable[MilestoneType]) -> list[MilestoneTypeData]

Get each firefighter.incidents.models.milestone_type.MilestoneType with its event_ts from the IncidentUpdate.

Source code in src/firefighter/incidents/forms/update_key_events.py
def get_milestones_with_data(
    self,
    milestones_definitions: Iterable[MilestoneType],
) -> list[MilestoneTypeData]:
    """Get each [firefighter.incidents.models.milestone_type.MilestoneType][] with its `event_ts` from the IncidentUpdate."""
    milestones_objects = self.incident.latest_updates_by_type
    ass = []
    for milestone_def in milestones_definitions:
        event = milestones_objects.get(milestone_def.event_type)
        event_ts = event.event_ts if event and event.event_ts else None
        association = MilestoneTypeData(
            milestone_type=milestone_def, data=MilestoneData(event_ts)
        )
        ass.append(association)
    return ass
save ¤
save() -> None

Custom save method to save each changed key event.

Source code in src/firefighter/incidents/forms/update_key_events.py
def save(self) -> None:
    """Custom save method to save each changed key event."""
    self.fields: dict[str, Field]
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("key_event_"):
            continue

        key_event_type = field_name.replace("key_event_", "")

        logger.debug(f"save {field_name} {self.cleaned_data[field_name]}")
        self._save_key_event(key_event_type, self.cleaned_data[field_name])

update_roles ¤

Classes:

IncidentUpdateRolesForm ¤

IncidentUpdateRolesForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

  • save

    Custom save method to save the updated roles.

Source code in src/firefighter/incidents/forms/update_roles.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident: Incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
save ¤
save() -> None

Custom save method to save the updated roles.

Source code in src/firefighter/incidents/forms/update_roles.py
def save(self) -> None:
    """Custom save method to save the updated roles."""
    self.fields: dict[str, Field]
    roles_to_update: dict[str, User | None] = {}
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("role_"):
            continue

        role_type_slug = field_name.replace("role_", "")

        roles_to_update[role_type_slug] = self.cleaned_data[field_name]
    if self.user is None:
        raise RuntimeError("User is required, was the form validated?")
    self.incident.update_roles(self.user, roles_to_update)

utils ¤

Classes:

EnumChoiceField ¤

EnumChoiceField(*args: Any, enum_class: type[T], **kwargs: Any)

Bases: TypedChoiceField

Methods:

  • to_python

    Return a value from the enum class.

Source code in src/firefighter/incidents/forms/utils.py
def __init__(self, *args: Any, enum_class: type[T], **kwargs: Any) -> None:
    # Explicit for type checking
    self.coerce_func: Callable[[Any], T] = lambda val: enum_class(  # noqa: PLW0108
        val
    )
    self.enum_class = enum_class
    if "choices" not in kwargs:
        kwargs["choices"] = enum_class.choices
    super().__init__(*args, coerce=self.coerce_func, **kwargs)
to_python ¤
to_python(value: Any) -> int | str | Any

Return a value from the enum class.

Source code in src/firefighter/incidents/forms/utils.py
def to_python(self, value: Any) -> int | str | Any:
    """Return a value from the enum class."""
    if value in self.empty_values:
        return ""
    # Cast to int if it's a string representation of an int
    if isinstance(value, str) and value.isdigit():
        value = int(value)

    # If value is already a valid enum member, return it
    if isinstance(value, self.enum_class):
        return value
    try:
        return self.coerce_func(value)
    except ValueError as exc:
        err_msg = f"{value} is not a valid {self.enum_class.__name__}"
        raise ValidationError(err_msg, code="invalid_choice") from exc

menus ¤

Functions:

user_details_url ¤

user_details_url(request: AuthenticatedHttpRequest) -> str | None

Return a personalized title for our profile menu item.

Source code in src/firefighter/incidents/menus.py
def user_details_url(request: AuthenticatedHttpRequest) -> str | None:
    """Return a personalized title for our profile menu item."""
    if request.user.is_authenticated:
        return request.user.get_absolute_url()
    return None  # type: ignore[unreachable]

migrations ¤

Modules:

0006_update_group_names ¤

Functions:

get_group_mappings ¤

get_group_mappings() -> dict

Returns the mapping table for updating existing groups.

Source code in src/firefighter/incidents/migrations/0006_update_group_names.py
def get_group_mappings() -> dict:
    """Returns the mapping table for updating existing groups."""
    return {
        "Platform": ("Platform", 8),
        "Visitors": ("Marketplace", 1),
        "Specialist Offer (Catalog/Seller)": ("Catalog", 4),
        "Operations": ("Operations", 6),
        "Money": ("Payment Operations", 5),
        "Security": ("Security", 10),
        "Corporate IT": ("Corporate IT", 11),
        "Other": ("Other", 12),
        "Data": ("Data", 9),
    }

get_new_groups ¤

get_new_groups() -> dict

Returns a dictionary of new groups to be created.

Source code in src/firefighter/incidents/migrations/0006_update_group_names.py
def get_new_groups() -> dict:
    """Returns a dictionary of new groups to be created."""
    return {
        "Marketing & Communication": 2,
        "Seller": 3,
        "Finance": 7,
    }

0007_update_component_name ¤

Functions:

get_component_mappings ¤

get_component_mappings() -> list

Returns a list of tuples for updating existing component names and their attributes.

Each tuple contains
  • old_name (str): The current name of the component.
  • new_name (str): The new name to assign to the component.
  • slack_channel (str): The associated Slack channel for the component.
  • group_name (str): The name of the group to which the component belongs.

Returns:

  • list ( list ) –

    A list of tuples, each representing the details for a component update.

Source code in src/firefighter/incidents/migrations/0007_update_component_name.py
def get_component_mappings() -> list:
    """
    Returns a list of tuples for updating existing component names and their attributes.

    Each tuple contains:
        - old_name (str): The current name of the component.
        - new_name (str): The new name to assign to the component.
        - slack_channel (str): The associated Slack channel for the component.
        - group_name (str): The name of the group to which the component belongs.

    Returns:
        list: A list of tuples, each representing the details for a component update.
    """
    return [
        ("Commercial Animation (Mabaya cat. integration & ad request, ...)", "Commercial Animation", "impact-commercial-animation", "Marketplace"),
        ("Mobile Apps", "Mobile Apps", "impact-mobile-apps", "Marketplace"),
        ("Spartacux Foundations", "Spartacux Foundations", "impact-spartacux-foundations", "Marketplace"),
        ("Tracking", "Tracking", "impact-tracking", "Marketplace"),
        ("HUB Integrators", "HUB Integrators", "impact-hub-integrators", "Seller"),
        ("Seller Account and Feeds", "Seller Catalog and Offer Management", "impact-seller-catalog-offer-management", "Seller"),
        ("Toolbox", "Seller Admin and Experience", "impact-seller-admin-experience", "Seller"),
        ("Seller Services (Mabaya BO, Subscriptions, MF)", "Seller Services", "impact-seller-services", "Seller"),
        ("Catalog Performance", "Catalog Performance", "impact-catalog-performance", "Catalog"),
        ("Offer (Price, Stock)", "Offer (Price & Stock)", "impact-offer-price-stock", "Catalog"),
        ("Back Office", "BO Catalog - Master Experience", "impact-bo-catalog-master-experience", "Catalog"),
        ("Order Lifecycle", "Order management", "impact-order-management", "Operations"),
        ("Delivery Experience", "Delivery experience", "impact-delivery-experience", "Operations"),
        ("Cloud Infrastructure", "Cloud Infrastructure", "impact-cloud-infrastructure", "Platform"),
        ("Spinak", "Spinak", "impact-spinak", "Platform"),
        ("CDN", "CDN", "impact-cdn", "Platform"),
        ("Gitlab", "Gitlab", "impact-gitlab", "Platform"),
    ]

get_new_components ¤

get_new_components() -> dict

Returns a dictionary of new components to be created.

Each entry in the dictionary maps a component name to a tuple containing: - group_name: The name of the group the component belongs to. - slack_channel: The associated Slack channel for the component.

Returns:

  • dict ( dict ) –

    A mapping of component names to (group name, slack channel) tuples.

Source code in src/firefighter/incidents/migrations/0007_update_component_name.py
def get_new_components() -> dict:
    """
    Returns a dictionary of new components to be created.

    Each entry in the dictionary maps a component name to a tuple containing:
    - group_name: The name of the group the component belongs to.
    - slack_channel: The associated Slack channel for the component.

    Returns:
        dict: A mapping of component names to (group name, slack channel) tuples.
    """
    return {
        "Traffic acquisition": ("Marketing & Communication", "impact-traffic-acquisition"),
        "Company reputation": ("Marketing & Communication", "impact-company-reputation"),
        "Loyalty and coupons": ("Payment Operations", "impact-loyalty-coupons"),
        "Payouts to seller": ("Payment Operations", "impact-payouts-to-seller"),
        "Refunds": ("Payment Operations", "impact-refunds"),
        "Returns": ("Operations", "impact-returns"),
        "Customer service": ("Operations", "impact-customer-service"),
        "Inventory": ("Operations", "impact-inventory"),
        "VAT": ("Finance", "impact-vat"),
        "Seller's invoices": ("Finance", "impact-sellers-invoices"),
        "Customer's invoices": ("Finance", "impact-customers-invoices"),
        "Accounting": ("Finance", "impact-accounting"),
        "Revenue": ("Finance", "impact-revenue"),
        "Compromised laptop / server": ("Security", "impact-compromised-laptop-server"),
    }

0010_update_components ¤

Functions:

  • get_component_mappings

    Returns a list of tuples for updating existing component names and their attributes.

get_component_mappings ¤

get_component_mappings() -> list

Returns a list of tuples for updating existing component names and their attributes.

Each tuple contains
  • old_name (str): The current name of the component.
  • new_name (str): The new name to assign to the component.
  • slack_channel (str): The associated Slack channel for the component.
  • group_name (str): The name of the group to which the component belongs.

Returns:

  • list ( list ) –

    A list of tuples, each representing the details for a component update.

Source code in src/firefighter/incidents/migrations/0010_update_components.py
def get_component_mappings() -> list:
    """
    Returns a list of tuples for updating existing component names and their attributes.

    Each tuple contains:
        - old_name (str): The current name of the component.
        - new_name (str): The new name to assign to the component.
        - slack_channel (str): The associated Slack channel for the component.
        - group_name (str): The name of the group to which the component belongs.

    Returns:
        list: A list of tuples, each representing the details for a component update.
    """
    return [
        # Marketplace
        ("Product Discovery", "Navigation & Product discovery", "impact-nav-product-discovery", "Marketplace"),
        ("User & Purchase", "Cart & funnel", "impact-cart-funnel", "Marketplace"),
        ("Customer Management", "Customer login & signup", "impact-customer-login-signup", "Marketplace"),
        ("Performance", "Web performance", "impact-web-performance", "Marketplace"),
        # Catalog
        ("Product Information", "Product Management", "impact-product-management", "Catalog"),
        ("Taxonomy", "Product Structure", "impact-product-structure", "Catalog"),
        ("Publication on website", "Catalog Exposition", "impact-catalog-exposition", "Catalog"),
        # Payment Operations
        ("Payment", "Payment", "impact-payment", "Payment Operations"),
        # Operations
        ("ManoFulfillment OPS", "MM Fulfillment", "impact-mm-fulfillment", "Operations"),
        ("Helpcenter", "Helpcenter after sales", "impact-helpcenter-after-sales", "Operations"),
        # Finance
        ("Finance Operations", "Controlling", "impact-controlling", "Finance"),
        # Platform
        ("Spinak", "Spinak", "impact-spinak", "Platform"),
        ("CDN", "CDN", "impact-cdn", "Platform"),
        ("Gitlab", "Gitlab", "impact-gitlab", "Platform"),
        # Data
        ("data-platform", "Data Ingestion", "impact-data-ingestion", "Data"),
        ("data-specialist-offer", "Data Warehouse", "impact-data-warehouse", "Data"),
        ("data-wbr", "Data Analytics", "impact-data-analytics", "Data"),
        # Security
        ("Security Misc", "Bot management & rate limiting & WAF", "impact-bot-management-rate-limiting-waf", "Security"),
        ("Attack", "Data leak", "impact-data-leak", "Security"),
        ("System Compromise", "Exploited vulnerability", "impact-exploited-vulnerability", "Security"),
        ("Personal Data Breach", "Stolen account(s) or IT materials", "impact-stolen-accounts-it-materials", "Security"),
    ]

0013_add_missing_component ¤

Functions:

get_new_components ¤

get_new_components() -> dict

Returns a dictionary of new components to be created.

Each entry in the dictionary maps a component name to a tuple containing: - group_name: The name of the group the component belongs to. - slack_channel: The associated Slack channel for the component.

Returns:

  • dict ( dict ) –

    A mapping of component names to (group name, slack channel) tuples.

Source code in src/firefighter/incidents/migrations/0013_add_missing_component.py
def get_new_components() -> dict:
    """
    Returns a dictionary of new components to be created.

    Each entry in the dictionary maps a component name to a tuple containing:
    - group_name: The name of the group the component belongs to.
    - slack_channel: The associated Slack channel for the component.

    Returns:
        dict: A mapping of component names to (group name, slack channel) tuples.
    """
    return {
        "Data Tools": ("Data", "impact-data-tools"),
        "Catalog Access": ("Catalog", "impact-catalog-access"),
    }

0019_set_security_components_private ¤

Functions:

revert_security_components_to_public ¤

revert_security_components_to_public(apps, schema_editor)

Revert all components belonging to the Security group to public, except 'Bot management & rate limiting & WAF'.

Source code in src/firefighter/incidents/migrations/0019_set_security_components_private.py
def revert_security_components_to_public(apps, schema_editor):
    """Revert all components belonging to the Security group to public, except 'Bot management & rate limiting & WAF'."""
    Component = apps.get_model("incidents", "Component")
    Group = apps.get_model("incidents", "Group")

    try:
        security_group = Group.objects.get(name="Security")
        components = Component.objects.filter(group=security_group).exclude(
            name="Bot management & rate limiting & WAF"
        )

        updated_count = 0
        for component in components:
            if component.private:
                logger.info(f"Setting component '{component.name}' to public")
                component.private = False
                component.save()
                updated_count += 1

        logger.info(f"Reverted {updated_count} Security components to public")
    except Group.DoesNotExist:
        logger.warning("Security group not found, skipping reverse migration")

set_security_components_to_private ¤

set_security_components_to_private(apps, schema_editor)

Set all components belonging to the Security group as private, except 'Bot management & rate limiting & WAF'.

Source code in src/firefighter/incidents/migrations/0019_set_security_components_private.py
def set_security_components_to_private(apps, schema_editor):
    """Set all components belonging to the Security group as private, except 'Bot management & rate limiting & WAF'."""
    Component = apps.get_model("incidents", "Component")
    Group = apps.get_model("incidents", "Group")

    try:
        security_group = Group.objects.get(name="Security")
        components = Component.objects.filter(group=security_group).exclude(
            name="Bot management & rate limiting & WAF"
        )

        updated_count = 0
        for component in components:
            if not component.private:
                logger.info(f"Setting component '{component.name}' to private")
                component.private = True
                component.save()
                updated_count += 1

        logger.info(f"Updated {updated_count} Security components to private")
    except Group.DoesNotExist:
        logger.warning("Security group not found, skipping migration")

0021_copy_component_data_to_incident_category ¤

Functions:

copy_component_data_to_incident_category ¤

copy_component_data_to_incident_category(apps, schema_editor)

Copy all data from Component to IncidentCategory

Source code in src/firefighter/incidents/migrations/0021_copy_component_data_to_incident_category.py
def copy_component_data_to_incident_category(apps, schema_editor):
    """Copy all data from Component to IncidentCategory"""
    Component = apps.get_model("incidents", "Component")
    IncidentCategory = apps.get_model("incidents", "IncidentCategory")

    # Copy all components to incident categories with the same fields
    for component in Component.objects.all():
        IncidentCategory.objects.create(
            id=component.id,  # Keep same UUID
            name=component.name,
            description=component.description,
            order=component.order,
            private=component.private,
            deploy_warning=component.deploy_warning,
            created_at=component.created_at,
            updated_at=component.updated_at,
            group=component.group,
        )

reverse_copy_component_data_to_incident_category ¤

reverse_copy_component_data_to_incident_category(apps, schema_editor)

Reverse operation - copy IncidentCategory back to Component if needed

Source code in src/firefighter/incidents/migrations/0021_copy_component_data_to_incident_category.py
def reverse_copy_component_data_to_incident_category(apps, schema_editor):
    """Reverse operation - copy IncidentCategory back to Component if needed"""
    Component = apps.get_model("incidents", "Component")
    IncidentCategory = apps.get_model("incidents", "IncidentCategory")

    # This would only work if Component table still exists during rollback
    for incident_category in IncidentCategory.objects.all():
        Component.objects.create(
            id=incident_category.id,
            name=incident_category.name,
            description=incident_category.description,
            order=incident_category.order,
            private=incident_category.private,
            deploy_warning=incident_category.deploy_warning,
            created_at=incident_category.created_at,
            updated_at=incident_category.updated_at,
            group=incident_category.group,
        )

0023_populate_incident_category_references ¤

Functions:

populate_incident_category_references ¤

populate_incident_category_references(apps, schema_editor)

Copy component references to incident_category references

Source code in src/firefighter/incidents/migrations/0023_populate_incident_category_references.py
def populate_incident_category_references(apps, schema_editor):
    """Copy component references to incident_category references"""
    Incident = apps.get_model("incidents", "Incident")
    IncidentUpdate = apps.get_model("incidents", "IncidentUpdate")

    # Update all incidents to point to the corresponding incident category
    incidents_updated = 0
    for incident in Incident.objects.select_related("component").all():
        if incident.component:
            # Find the corresponding incident category (same UUID)
            incident.incident_category_id = incident.component_id
            incident.save(update_fields=["incident_category"])
            incidents_updated += 1

    # Update all incident updates to point to the corresponding incident category
    updates_updated = 0
    for incident_update in IncidentUpdate.objects.select_related("component").all():
        if incident_update.component:
            incident_update.incident_category_id = incident_update.component_id
            incident_update.save(update_fields=["incident_category"])
            updates_updated += 1

reverse_populate_incident_category_references ¤

reverse_populate_incident_category_references(apps, schema_editor)

Reverse: copy incident_category references back to component references

Source code in src/firefighter/incidents/migrations/0023_populate_incident_category_references.py
def reverse_populate_incident_category_references(apps, schema_editor):
    """Reverse: copy incident_category references back to component references"""
    Incident = apps.get_model("incidents", "Incident")
    IncidentUpdate = apps.get_model("incidents", "IncidentUpdate")

    # Restore component references from incident_category references
    for incident in Incident.objects.select_related("incident_category").all():
        if incident.incident_category:
            incident.component_id = incident.incident_category_id
            incident.save(update_fields=["component"])

    for incident_update in IncidentUpdate.objects.select_related("incident_category").all():
        if incident_update.incident_category:
            incident_update.component_id = incident_update.incident_category_id
            incident_update.save(update_fields=["component"])

models ¤

Modules:

group ¤

Classes:

  • Group

    Group of [firefighter.incidents.models.incident_category.IncidentCategory]. Not a group of users.

Group ¤

Bases: Model

Group of [firefighter.incidents.models.incident_category.IncidentCategory]. Not a group of users.

impact ¤

Classes:

Impact ¤

Bases: Model

Methods:

  • clean

    Ensure impact_type matches impact_level.impact_type.

clean ¤
clean() -> None

Ensure impact_type matches impact_level.impact_type.

Check constraints can't span relationships, so we have to do this manually.

Source code in src/firefighter/incidents/models/impact.py
def clean(self) -> None:
    """Ensure impact_type matches impact_level.impact_type.

    Check constraints can't span relationships, so we have to do this manually.
    """
    if self.impact_type != self.impact_level.impact_type:
        raise ValidationError("impact_type must match impact_level.impact_type")

LevelChoices ¤

Bases: TextChoices

Attributes:

  • emoji (str) –

    Send emoji un function of priority.

  • priority (int) –

    Send level choice priority .

emoji property ¤
emoji: str

Send emoji un function of priority.

priority property ¤
priority: int

Send level choice priority .

incident ¤

Classes:

Functions:

Incident ¤

Bases: Model

Methods:

  • build_invite_list

    Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

  • compute_metrics

    Compute all metrics (time to fix, ...) from events.

  • missing_milestones

    Returns all required Milestones still needed to compute the metrics.

  • update_roles

    Update the roles related to an incident, and create an IncidentUpdate.

Attributes:

  • status_page_url (str) –

    Similar with get_absolute_url but with full domain, to be used out of the website.

status_page_url property ¤
status_page_url: str

Similar with get_absolute_url but with full domain, to be used out of the website.

build_invite_list ¤
build_invite_list() -> list[User]

Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

Returns:

  • list[User]

    list[User]: Potentially non-unique list of Users to invite

Source code in src/firefighter/incidents/models/incident.py
def build_invite_list(self) -> list[User]:
    """Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

    Returns:
        list[User]: Potentially non-unique list of Users to invite
    """
    users_list: list[User] = []

    # Send signal to modules (Confluence, PagerDuty...)
    result_users: list[tuple[Any, Exception | Iterable[User]]] = (
        signals.get_invites.send_robust(sender=None, incident=self)
    )

    # Aggregate the results
    for provider in result_users:
        if isinstance(provider[1], BaseException):
            logger.warning(
                f"Provider {provider[0]} returned an error getting Users to invite: {provider[1]}."
            )
            continue

        users_list.extend(provider[1])

    logger.debug(f"Get invites users list: {users_list}")

    return users_list
compute_metrics ¤
compute_metrics(*, purge: bool = False) -> None

Compute all metrics (time to fix, ...) from events.

Source code in src/firefighter/incidents/models/incident.py
def compute_metrics(self, *, purge: bool = False) -> None:
    """Compute all metrics (time to fix, ...) from events."""
    latest_updates_by_type = self.latest_updates_by_type

    for metric_type in MetricType.objects.all():
        lhs_type = metric_type.milestone_lhs.event_type
        rhs_type = metric_type.milestone_rhs.event_type

        lhs = latest_updates_by_type.get(lhs_type, None)
        rhs = latest_updates_by_type.get(rhs_type, None)

        if not lhs or not lhs.event_ts:
            logger.info(
                f"Missing operand '{lhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        if not rhs or not rhs.event_ts:
            logger.info(
                f"Missing operand '{rhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        duration = lhs.event_ts - rhs.event_ts
        if duration < TD0:
            logger.warning(
                f"Tried to compute a negative metric! Metric {metric_type.type} for #{self.id} has a duration of {duration} ({lhs.event_type}:{lhs.event_ts} - {rhs.event_type}:{rhs.event_ts})"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        IncidentMetric.objects.update_or_create(
            incident=self,
            metric_type=metric_type,
            defaults={"duration": duration},
        )
    self.save()
missing_milestones ¤
missing_milestones() -> list[str]

Returns all required Milestones still needed to compute the metrics.

Source code in src/firefighter/incidents/models/incident.py
def missing_milestones(self) -> list[str]:
    """Returns all required Milestones still needed to compute the metrics."""
    if self.ignore:
        return []

    # We get a list of all event_type of IncidentUpdates...
    milestones_list = (
        IncidentUpdate.objects.filter(
            incident_id=self.id,
            event_ts__isnull=False,
            event_type__isnull=False,
        )
        .aggregate(milestone_list=ArrayAgg("event_type", distinct=True, default=[]))
        .get("milestone_list", [])
    )

    incident_milestones: set[str] = set(cast('list["str"]', milestones_list))
    required_milestone_types = set(
        MilestoneType.objects.filter(required=True).values_list(
            "event_type", flat=True
        )
    )
    # And we make the differences of the sets
    return list(required_milestone_types - incident_milestones)
update_roles ¤
update_roles(updater: User, roles_mapping: dict[str, User | None] | dict[str, User]) -> IncidentUpdate

Update the roles related to an incident, and create an IncidentUpdate. For each role, provide a User or None.

This function will update the incident, create an IncidentUpdate, and trigger the incident_updated signal, with update_roles sender.

Parameters:

  • updater (User) –

    The user who is updating the roles.

  • roles_mapping (dict[str, User | None]) –

    A dict of roles to update, with the new User or None. Defaults to None.

Returns:

  • IncidentUpdate ( IncidentUpdate ) –

    The created IncidentUpdate with the updated roles.

Source code in src/firefighter/incidents/models/incident.py
def update_roles(
    self,
    updater: User,
    roles_mapping: dict[str, User | None] | dict[str, User],
) -> IncidentUpdate:
    """Update the roles related to an incident, and create an IncidentUpdate.
    For each role, provide a User or None.

    This function will update the incident, create an [IncidentUpdate][firefighter.incidents.models.incident_update.IncidentUpdate], and trigger the [incident_updated][firefighter.incidents.signals.incident_updated] signal, with `update_roles` sender.

    Args:
        updater (User): The user who is updating the roles.
        roles_mapping (dict[str, User | None], optional): A dict of roles to update, with the new User or None. Defaults to None.

    Returns:
        IncidentUpdate: The created IncidentUpdate with the updated roles.
    """
    updated_fields: list[str] = []

    # Handle roles
    for role_slug, assigned_user in roles_mapping.items():
        try:
            role_type = IncidentRoleType.objects.get(slug=role_slug)
        except IncidentRoleType.DoesNotExist:
            logger.warning(f"Unknown role type: {role_slug}")
            continue
        if assigned_user is not None:
            IncidentRole.objects.update_or_create(
                incident=self,
                role_type=role_type,
                defaults={"user": assigned_user},
            )
            logger.debug(
                f"Updated role {role_slug} to user ID={assigned_user.id} for #{self.id}"
            )
            updated_fields.append(f"{role_slug}_id")
        elif role_type.required:
            logger.warning(f"Cannot remove required role: {role_slug}")
        else:
            IncidentRole.objects.filter(incident=self, role_type=role_type).delete()
            updated_fields.append(f"{role_slug}_id")

    # Handle legacy roles with IncidentUpdate
    # XXX(dugab): Custom roles are not saved in IncidentUpdate at the moment

    incident_update = IncidentUpdate(
        incident=self,
        created_by=updater,
        commander=roles_mapping.get("commander"),
        communication_lead=roles_mapping.get("communication_lead"),
    )
    incident_update.save()

    incident_updated.send_robust(
        "update_roles",
        incident=self,
        incident_update=incident_update,
        updated_fields=updated_fields,
    )

    return incident_update

IncidentFilterSet ¤

Bases: FilterSet

Set of filters for incidents, shared by Web UI and API.

Methods:

incident_search(queryset: QuerySet[Incident], _name: str, value: str) -> QuerySet[Incident]

Search incidents by title, description, and ID.

Parameters:

  • queryset (QuerySet[Incident]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def incident_search(
    queryset: QuerySet[Incident], _name: str, value: str
) -> QuerySet[Incident]:
    """Search incidents by title, description, and ID.

    Args:
        queryset (QuerySet[Incident]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[Incident]: Search results.
    """
    return Incident.objects.search(queryset=queryset, search_term=value)[0]

IncidentManager ¤

Bases: Manager['Incident']

Methods:

  • declare

    Create an Incident and its first IncidentUpdate.

  • search

    Search for incidents using a search term, on the title and description fields.

declare ¤
declare(**kwargs: Any) -> Incident

Create an Incident and its first IncidentUpdate. Send the incident_created signal. Returns the saved incident, no need to .save().

Source code in src/firefighter/incidents/models/incident.py
def declare(self, **kwargs: Any) -> Incident:
    """Create an Incident and its first IncidentUpdate.
    Send the incident_created signal.
    Returns the saved incident, no need to .save().
    """
    with transaction.atomic():
        if "private" not in kwargs:
            kwargs["private"] = kwargs["incident_category"].private
        if "severity" not in kwargs and "priority" in kwargs:
            kwargs["severity"] = Severity.objects.get(
                value=kwargs["priority"].value
            )
        incident: Incident = super().create(**kwargs)
        for required_default_role in IncidentRoleType.objects.filter(required=True):
            incident.roles_set.add(
                IncidentRole.objects.create(
                    incident=incident,
                    role_type=required_default_role,
                    user=incident.created_by,
                )
            )

        first_incident_update = IncidentUpdate(
            title=incident.title,
            description=incident.description,
            status=incident.status,  # type: ignore[misc]
            priority=incident.priority,
            incident_category=incident.incident_category,
            created_by=incident.created_by,
            commander=incident.created_by,
            incident=incident,
            event_ts=incident.created_at,
        )

        milestone_declared = IncidentUpdate(
            created_by=incident.created_by,
            incident=incident,
            event_type="declared",
            event_ts=incident.created_at,
        )
        first_incident_update.save()
        milestone_declared.save()

        incident.members.add(incident.created_by)

    # Either we have an incident or an error was thrown
    incident_created.send_robust(sender=__name__, incident=incident)
    return incident
search staticmethod ¤
search(queryset: QuerySet[Incident] | None, search_term: str) -> tuple[QuerySet[Incident], bool]

Search for incidents using a search term, on the title and description fields.

Parameters:

  • queryset (QuerySet[Incident] | None) –

    Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).

  • search_term (str) –

    Search term.

Returns:

  • tuple[QuerySet[Incident], bool]

    tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def search(
    queryset: QuerySet[Incident] | None, search_term: str
) -> tuple[QuerySet[Incident], bool]:
    """Search for incidents using a search term, on the title and description fields.

    Args:
        queryset (QuerySet[Incident] | None): Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).
        search_term (str): Search term.

    Returns:
        tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.
    """
    if queryset is None:
        queryset = Incident.objects.all()

    # If not search, return the original queryset
    if search_term is None or search_term.strip() == "":
        return queryset, False

    queryset_search_id = None

    # If the search is just an int, search for this ID + regular search
    try:
        search_term_as_int = int(search_term)
    except ValueError:
        pass
    else:
        queryset_search_id = deepcopy(queryset)
        queryset_search_id = queryset_search_id.filter(id=search_term_as_int)

    # Postgres search on title + description
    # XXX Improve search performance and relevance
    vector = SearchVector("title", config="english", weight="A") + SearchVector(
        "description", config="english", weight="B"
    )
    query = SearchQuery(search_term, config="english", search_type="websearch")
    queryset = (
        queryset.annotate(rank=SearchRank(vector, query))
        .filter(rank__gte=0.1)
        .order_by("-rank")
    )

    # Add the search by id to the search by text if needed
    if queryset_search_id:
        queryset |= queryset_search_id  # type: ignore[operator]

    return queryset, False

incident_category_filter_choices_queryset ¤

incident_category_filter_choices_queryset(_: Any) -> QuerySet[IncidentCategory]

Queryset for choices of IncidentCategories in IncidentFilterSet. Moved it as a function because models are not loaded when creating filters.

Source code in src/firefighter/incidents/models/incident.py
def incident_category_filter_choices_queryset(_: Any) -> QuerySet[IncidentCategory]:
    """Queryset for choices of IncidentCategories in IncidentFilterSet.
    Moved it as a function because models are not loaded when creating filters.
    """
    return (
        IncidentCategory.objects.all()
        .select_related("group")
        .order_by(
            "group__order",
            "name",
        )
    )

incident_category ¤

Classes:

IncidentCategoryFilterSet ¤

Bases: FilterSet

Set of filters for IncidentCategory, share by Web UI and API.

Methods:

incident_category_search(queryset: QuerySet[IncidentCategory], _name: str, value: str) -> QuerySet[IncidentCategory]

Search incident categories by title, description, and ID.

Parameters:

  • queryset (QuerySet[IncidentCategory]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

  • QuerySet[IncidentCategory]

    QuerySet[IncidentCategory]: Search results.

Source code in src/firefighter/incidents/models/incident_category.py
@staticmethod
def incident_category_search(
    queryset: QuerySet[IncidentCategory], _name: str, value: str
) -> QuerySet[IncidentCategory]:
    """Search incident categories by title, description, and ID.

    Args:
        queryset (QuerySet[IncidentCategory]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[IncidentCategory]: Search results.
    """
    return IncidentCategory.objects.search(queryset=queryset, search_term=value)[0]

IncidentCategoryManager ¤

Bases: Manager['IncidentCategory']

Methods:

  • queryset_with_mtbf

    Returns a queryset of incident categories with an additional mtbf field.

queryset_with_mtbf ¤
queryset_with_mtbf(
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[IncidentCategory] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[IncidentCategory]

Returns a queryset of incident categories with an additional mtbf field.

Source code in src/firefighter/incidents/models/incident_category.py
def queryset_with_mtbf(
    self,
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[IncidentCategory] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[IncidentCategory]:
    """Returns a queryset of incident categories with an additional `mtbf` field."""
    date_to = min(date_to, datetime.now(tz=TZ))

    date_interval = date_to - date_from
    queryset = queryset or self.get_queryset()

    return (
        queryset.order_by("group__order", "order")
        .annotate(
            metric_subquery=Subquery(
                IncidentMetric.objects.filter(
                    incident__incident_category=OuterRef("pk"),
                    metric_type__type=metric_type,
                    incident__created_at__gte=date_from,
                    incident__created_at__lte=date_to,
                )
                .values("incident__incident_category")
                .annotate(sum_downtime=Sum("duration"))
                .values("sum_downtime")
            )
        )
        .annotate(
            incident_count=Count(
                "incident",
                filter=Q(
                    incident__created_at__gte=date_from,
                )
                & Q(
                    incident__created_at__lte=date_to,
                ),
            ),
            incidents_downtime=F("metric_subquery"),
            incident_uptime=Value(date_interval) - F("incidents_downtime"),
        )
        .annotate(**{
            field_name: Cast(
                F("incident_uptime") / F("incident_count"),
                output_field=DurationField(),
            )
        })
    )

incident_cost ¤

Classes:

IncidentCost ¤

Bases: Model

Incident Cost is inspired from dispatch.

incident_cost_type ¤

Classes:

IncidentCostType ¤

Bases: Model

Incident Cost Type is inspired from dispatch.

incident_update ¤

Classes:

  • IncidentUpdate

    IncidentUpdate represents a single update to an incident.

Functions:

IncidentUpdate ¤

Bases: Model

IncidentUpdate represents a single update to an incident. One incident can have many incident updates. Only updated fields are stored in the incident update.

set_event_ts ¤

set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None

Add a timestamp on every IncidentUpdate. Not implemented using Django ORM auto_add as it would not make them user editable.

Source code in src/firefighter/incidents/models/incident_update.py
@receiver(pre_save, sender=IncidentUpdate)
# pylint: disable=unused-argument
def set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None:
    """Add a timestamp on every IncidentUpdate.
    Not implemented using Django ORM auto_add as it would not make them user editable.
    """
    if instance.event_ts is None:
        instance.event_ts = timezone.now()  # type: ignore

milestone_type ¤

MilestoneType model.

Classes:

  • MilestoneType

    Represents a milestone type, also known as a key event time or event time.

MilestoneType ¤

Bases: Model

Represents a milestone type, also known as a key event time or event time.

priority ¤

Classes:

  • Priority

    A priority for an incident.

Priority ¤

Bases: Model

A priority for an incident.

user ¤

Classes:

User ¤

Bases: AbstractUser

Attributes:

  • full_name (str) –

    User full name (first + last name). Looks for the first_name and last_name fields, then name, then username. Will return an empty string if none of these are set.

full_name property ¤
full_name: str

User full name (first + last name). Looks for the first_name and last_name fields, then name, then username. Will return an empty string if none of these are set.

signals ¤

Attributes:

create_incident_conversation module-attribute ¤

create_incident_conversation = Signal()

Signal sent to create a conversation for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to create a conversation.

get_invites module-attribute ¤

get_invites = Signal()

Signal sent to retrieve the list of users to invite for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to retrieve the list of users to invite.

Returns:

  • users ( list[User] ) –

    The list of users to invite.

incident_closed module-attribute ¤

incident_closed = Signal()

Signal sent when an incident is closed

Parameters:

  • sender (Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was closed

incident_created module-attribute ¤

incident_created = Signal()

Signal sent when an incident is created

Parameters:

  • incident (Incident) –

    The incident that was created

incident_key_events_updated module-attribute ¤

incident_key_events_updated = Signal()

Signal sent when an incident's key events are updated.

Parameters:

  • incident (Incident) –

    The incident that was updated

incident_updated module-attribute ¤

incident_updated = Signal()

Signal sent when an incident is updated.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was updated

  • incident_update (IncidentUpdate) –

    The incident update that was created

  • update_fields (list[str]) –

    The fields that were updated

  • old_priority (Priority) –

    The old priority of the incident (optional kwarg)

postmortem_created module-attribute ¤

postmortem_created = Signal()

Signal sent when a postmortem is created.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident for which the postmortem was created

tasks ¤

Modules:

updateoncall ¤

Functions:

update_oncall ¤

update_oncall() -> None

Fetch current on-calls and update the on-call Slack topic and Confluence page.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall")
def update_oncall() -> None:
    """Fetch current on-calls and update the on-call Slack topic and Confluence page."""
    chain: Signature[bool] = (
        fetch_oncalls.s()  # pyright: ignore[reportUnboundVariable]
        | update_oncall_views.s()
    )
    chain()

update_oncall_slack_topic ¤

update_oncall_slack_topic(users: dict[str, User]) -> bool

TODO(gab) Move in the Slack app.

Source code in src/firefighter/incidents/tasks/updateoncall.py
def update_oncall_slack_topic(
    users: dict[str, User],
) -> bool:
    """TODO(gab) Move in the Slack app."""
    if not settings.ENABLE_SLACK:
        return False

    oncall_topic = create_oncall_topic(users)
    tech_incidents_conversation = Conversation.objects.get_or_none(tag="tech_incidents")
    if tech_incidents_conversation:
        res = tech_incidents_conversation.update_topic(oncall_topic)
        if res.get("ok"):
            logger.info(
                f"Updated on-call Slack topic on {tech_incidents_conversation}."
            )
            return True
    else:
        logger.warning(
            "Could not find tech_incidents conversation! Is there a channel with tag tech_incidents?"
        )
        return False
    logger.error("Failed to update on-call Slack topic.")
    return False

update_oncall_views ¤

update_oncall_views(*_args: Any, **_kwargs: Any) -> bool

Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall_views")
def update_oncall_views(*_args: Any, **_kwargs: Any) -> bool:
    """Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel."""
    if not settings.ENABLE_PAGERDUTY:
        logger.error("Can't update on-call users without PagerDuty enabled.")
        return False

    oncall_users_grouped_per_ep = PagerDutyOncall.objects.get_current_oncalls_per_escalation_policy_name_first_responder()

    # Check that we have Slack ID and handle for these users, if needed
    if settings.ENABLE_SLACK:
        for user in oncall_users_grouped_per_ep.values():
            if not hasattr(user, "slack_user"):
                SlackUser.objects.add_slack_id_to_user(user)
                logger.debug("Added Slack ID to user: %s", user)
        update_oncall_slack_topic(oncall_users_grouped_per_ep)
    else:
        logger.warning("Not updating on-call Slack topic, Slack integration disabled.")

    update_oncall_confluence(oncall_users_grouped_per_ep)
    logger.info("Updated on-call users on Confluence and Slack topic.")
    return True

views ¤

Modules:

components ¤

Modules:

list ¤

Classes:

IncidentCategoriesViewList ¤

Bases: SingleTableMixin, FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/components/list.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["filter_order"] = [
        "search",
        "metrics_period",
        "group",
    ]
    context["page_title"] = "Issue categories list"
    return context

date_filter ¤

Functions:

get_date_range_from_special_date ¤

get_date_range_from_special_date(unparsed_date: str) -> tuple[datetime | None, datetime | None, str | None, str | None]

TODO Specify the year parameter (ISO or calendar).

Source code in src/firefighter/incidents/views/date_filter.py
def get_date_range_from_special_date(
    unparsed_date: str,
) -> tuple[datetime | None, datetime | None, str | None, str | None]:
    """TODO Specify the year parameter (ISO or calendar)."""
    if "-" in unparsed_date:
        logger.debug("Parsing range, splitting with /")

        if unparsed_date.count("-") == 1:
            split = unparsed_date.split("-")
        elif unparsed_date.count(" - ") == 1:
            split = unparsed_date.split(" - ")
        else:
            err_msg = f"Invalid range: {unparsed_date}. Only one separator is allowed"
            raise ValidationError(err_msg)
        if len(split) == 2:
            for i in range(2):
                split[i] = split[i].strip()

            beg = parse_moment(split[0])
            end = parse_moment(split[1])
            logger.debug(end)
            return (
                beg.start if beg else None,
                end.stop if end else None,
                None,
                split[0] + "-" + split[1],
            )
        # TODO Message warning + logger.warning
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. Only one separator is allowed",
            params={"unparsed_date": unparsed_date},
            code="invalid",
        )
    try:
        date_range = parse_moment(unparsed_date)
    except ValueError as exc:
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. %(error)s",
            params={"unparsed_date": unparsed_date, "error": str(exc)},
            code="invalid",
        ) from exc
    if date_range:
        return date_range.start, date_range.stop, None, "TODO"

    # TODO Message warning + logger.warning
    return None, None, None, None

parse_moment ¤

parse_moment(value: str) -> slice | None

Returns a :class:slice with the :func:slice.start and :func:slice.stop of the given moment. If the moment is not a valid moment, returns None. If the moment is a point in time, returns a slice with the same start and stop.

Source code in src/firefighter/incidents/views/date_filter.py
def parse_moment(value: str) -> slice | None:
    """Returns a :class:`slice` with the :func:`slice.start` and :func:`slice.stop` of the given moment.
    If the moment is not a valid moment, returns None.
    If the moment is a point in time, returns a slice with the same start and stop.
    """
    value = value.strip()

    # Quarter/Week relative (current/previous quarter, current/previous week)
    if value in SPECIAL_RANGES:
        now = timezone.localtime(timezone.now())
        returned_range = get_date_range_from_relative_calendar_value(value, now, value)
        if None not in returned_range:
            return slice(returned_range[0], returned_range[1])

    # Match according to the regex (W20, Q4, YYYY-W20, YYYY-Q4...)
    week_or_quarter_time_frame = (*get_date_range_from_calendar_value(value), value)
    if None not in week_or_quarter_time_frame:
        return slice(week_or_quarter_time_frame[0], week_or_quarter_time_frame[1])

    # Match using dateparser (lots of formats)
    return parse_date_natural(value)

date_utils ¤

Functions:

get_bounds_from_calendar_year ¤

get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]

Calendar Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]:
    """Calendar Year."""
    lower_bound = datetime(year_value, 1, 1, tzinfo=TZ)
    upper_bound = datetime(year_value, 12, 31, 23, 59, 59, 999999, tzinfo=TZ)

    return (
        lower_bound,
        upper_bound,
        f"{year_value}",
    )

get_bounds_from_year ¤

get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]

ISO Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]:
    """ISO Year."""
    lower_bound = datetime.fromisocalendar(year_value, 1, 1)
    upper_week_nb = date(year_value, 12, 28).isocalendar()[1]
    upper_bound = datetime.combine(
        date.fromisocalendar(year=year_value, week=upper_week_nb, day=7),
        time(23, 59, 59, 999999),
    )

    return *make_bounds_timezone_aware(lower_bound, upper_bound), f"ISO{year_value}"

errors ¤

From Django default errors views. https://github.com/django/django/blob/0dd29209091280ccf34e07c9468746c396b7778e/django/views/defaults.py.

Functions:

bad_request ¤

bad_request(request: HttpRequest, exception: Exception, template_name: str = ERROR_400_TEMPLATE_NAME) -> HttpResponse

400 error handler.

Templates: :template:400.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def bad_request(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_400_TEMPLATE_NAME,
) -> HttpResponse:
    """400 error handler.

    Templates: :template:`400.html`
    Context: None
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseBadRequest({
            "error": {"app_code": "BAD_REQUEST", "message": "Bad request"}
        })
    context = {
        "request_path": quote(request.path),
        "page_title": "Bad request",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
        "title": "Bad request",
        "code": "400",
        "message": "Please check the URL in the address bar and try again.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_400_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseBadRequest(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Bad Request (400)", "details": ""},
        )
    # No exception content is passed to the template, to not disclose any
    # sensitive information.
    return HttpResponseBadRequest(template.render(context))

page_not_found ¤

page_not_found(request: HttpRequest, exception: Exception, template_name: str = ERROR_404_TEMPLATE_NAME) -> HttpResponse

Default 404 handler.

Templates: :template:404.html Context: request_path The path of the requested URL (e.g., '/app/pages/bad_page/'). It's quoted to prevent a content injection attack. exception The message from the exception which triggered the 404 (if one was supplied), or the exception class name

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def page_not_found(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_404_TEMPLATE_NAME,
) -> HttpResponse:
    """Default 404 handler.

    Templates: :template:`404.html`
    Context:
        request_path
            The path of the requested URL (e.g., '/app/pages/bad_page/'). It's
            quoted to prevent a content injection attack.
        exception
            The message from the exception which triggered the 404 (if one was
            supplied), or the exception class name
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseNotFound({
            "error": {"app_code": "NOT_FOUND", "message": "Page not found"}
        })
    exception_repr = exception.__class__.__name__
    # Try to get an "interesting" exception message, if any (and not the ugly
    # Resolver404 dictionary)
    try:
        message = exception.args[0] if exception.args else ""
    except (AttributeError, IndexError):
        logger.exception("Error in page_not_found")
    else:
        if isinstance(message, str):
            exception_repr = message
    context = {
        "request_path": quote(request.path),
        "exception": exception_repr,
        "page_title": "Page not found",
        "title": "Page not found",
        "code": "404",
        "message": "Please check the URL in the address bar and try again.",
    }

    try:
        template = loader.get_template(template_name)
        body = template.render(context, request)
    except TemplateDoesNotExist:
        if template_name != ERROR_404_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        # Render template (even though there are no substitutions) to allow
        # inspecting the context in tests.
        template_backup = Engine().from_string(
            BACKUP_ERROR_PAGE_TEMPLATE
            % {
                "title": "Not Found",
                "details": "The requested resource was not found on this server.",
            },
        )

        body = template_backup.render(Context(context))
    return HttpResponseNotFound(body)

permission_denied ¤

permission_denied(request: HttpRequest, exception: Exception, template_name: str = ERROR_403_TEMPLATE_NAME) -> HttpResponse

Permission denied (403) handler.

Templates: :template:403.html Context: exception The message from the exception which triggered the 403 (if one was supplied).

If the template does not exist, an HTTP 403 response containing the text "403 Forbidden" (as per RFC 7231) will be returned.

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def permission_denied(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_403_TEMPLATE_NAME,
) -> HttpResponse:
    """Permission denied (403) handler.

    Templates: :template:`403.html`
    Context:
        exception
            The message from the exception which triggered the 403 (if one was
            supplied).

    If the template does not exist, an HTTP 403 response containing the text
    "403 Forbidden" (as per RFC 7231) will be returned.
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseForbidden({
            "error": {"app_code": "FORBIDDEN", "message": "Forbidden"}
        })
    context = {
        "request_path": quote(request.path),
        "exception": str(exception),
        "page_title": "Forbidden",
        "title": "Forbidden",
        "code": "403",
        "message": "You do not have permission to access this page.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_403_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseForbidden(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "403 Forbidden", "details": ""},
        )
    return HttpResponseForbidden(template.render(request=request, context=context))

server_error ¤

server_error(request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME) -> HttpResponse

500 error handler.

Templates: :template:500.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def server_error(
    request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME
) -> HttpResponse:
    """500 error handler.

    Templates: :template:`500.html`
    Context: None
    """
    context = {
        "page_title": "Server error",
        "title": "Server error",
        "code": "500",
        "message": "Please try again later. This issue has been logged, but feel free to raise the issue.",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
    }
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseServerError({
            "error": {"app_code": "SERVER_ERROR", "message": "Server error"}
        })
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_500_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseServerError(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Server Error (500)", "details": ""},
        )
    return HttpResponseServerError(template.render(context))

users ¤

Modules:

details ¤

Classes:

UserDetailView ¤

Bases: CustomDetailView[User]

In this view, be extra careful.

In this context, user is the logged_in user and target_user is the user of the profile being viewed.

views ¤

Classes:

DashboardView ¤

Bases: ListView[Incident]

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Incidents Dashboard"
    return context

IncidentListView ¤

Bases: SingleTableMixin, FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_range"] = context["table"].paginator.get_elided_page_range(
        context["table"].page.number, on_each_side=2, on_ends=1
    )
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "incident_category",
    ]

    context["page_title"] = "Incidents List"
    context["api_url_export"] = reverse("api:incidents-list")
    return context

IncidentStatisticsView ¤

Bases: FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Statistics"
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "incident_category",
    ]
    context_data = weekly_dashboard_context(
        self.request, context.get("incidents_filtered", [])
    )
    return {**context, **context_data}

IncidentUpdateKeyEventsView ¤

Bases: SingleObjectMixin[Incident], LoginRequiredMixin, FormView[IncidentUpdateKeyEventsForm]

Methods:

  • get_form

    Replace Markdown bold syntax with HTML bold syntax in form labels.

get_form ¤
get_form(form_class: type[IncidentUpdateKeyEventsForm] | None = None) -> IncidentUpdateKeyEventsForm

Replace Markdown bold syntax with HTML bold syntax in form labels.

Source code in src/firefighter/incidents/views/views.py
def get_form(
    self, form_class: type[IncidentUpdateKeyEventsForm] | None = None
) -> IncidentUpdateKeyEventsForm:
    """Replace Markdown bold syntax with HTML bold syntax in form labels."""
    form = super().get_form(form_class)
    for field in form.fields:
        # Replace *ABC* with <bold>ABC</bold>
        form.fields[field].label = re.sub(
            r"\*(.*?)\*", r"<b>\1</b>", str(form.fields[field].label)
        )
    return form

ProcessAfterResponse ¤

ProcessAfterResponse(redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any)

Bases: HttpResponseRedirect

Custom Response, to trigger the Slack workflow after creating the incident and returning HTTP 201.

TODO This does not work, the workflow is triggered before the response is sent. We need to do a celery task! TODO We need to redirect to the incident page or Slack conversation.

Source code in src/firefighter/incidents/views/views.py
def __init__(
    self, redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any
) -> None:
    super().__init__(redirect_to, *args, **kwargs)
    self.data = data