Skip to content

Incidents¤

The incidents module is the core of FireFighter, managing incident lifecycle, priorities, and integrations.

Priority System¤

FireFighter uses a 5-level priority system (P1-P5) to categorize incident severity:

Priority Levels¤

Priority Emoji Level Description Example
P1 🔥 Critical Complete service outage, system-wide failure Payment system down, entire website offline
P2 🚨 High Major functionality impaired, significant user impact Core feature broken, major performance degradation
P3 ⚠️ Medium Minor functionality affected, moderate impact Non-critical feature issue, isolated component problem
P4 📢 Low Small issues, minimal impact UI glitches, minor bugs in secondary features
P5 💡 Lowest Cosmetic issues, enhancement requests Typos, improvement suggestions, non-urgent requests

Priority Usage¤

Incident Creation: When creating an incident, the priority determines: - Notification urgency and channels - Automatic escalations (PagerDuty for P1/P2) - Jira ticket priority mapping - SLA expectations

Integration Mapping: - Jira: P1-P5 maps directly to Jira priorities 1-5 - PagerDuty: P1-P2 typically trigger immediate escalation - Slack: Higher priorities get broader notification reach

Priority Assignment Guidelines¤

P1 (Critical) - Use when: - Complete service unavailability - Data loss or corruption - Security breaches - Payment processing failures

P2 (High) - Use when: - Major feature completely broken - Significant performance degradation - Multiple users affected by the same issue

P3 (Medium) - Use when: - Single feature partially broken - Workaround exists - Limited user impact

P4 (Low) - Use when: - Minor UI issues - Edge case bugs - Enhancement requests with business value

P5 (Lowest) - Use when: - Cosmetic improvements - Documentation updates - Nice-to-have features

Priority Validation

Invalid priority values automatically fallback to P1 to ensure critical handling.

API Reference¤

incidents ¤

Modules:

admin ¤

Classes:

EnvironmentAdmin ¤

Bases: ModelAdmin[Environment]

Methods:

get_readonly_fields ¤

get_readonly_fields(request: HttpRequest, obj: Environment | None = None) -> _ListOrTuple[str]

Deny changing the value of an existing object.

Source code in src/firefighter/incidents/admin.py
def get_readonly_fields(
    self,
    request: HttpRequest,  # noqa: ARG002
    obj: Environment | None = None,
) -> _ListOrTuple[str]:
    """Deny changing the value of an existing object."""
    if obj:  # editing an existing object
        return *self.readonly_fields, "value"
    return self.readonly_fields

IncidentAdmin ¤

Bases: ModelAdmin[Incident]

Methods:

  • compute_and_purge_metrics

    Will compute metrics for selected incidents and delete metrics that can no longer be computed.

  • send_message

    Action to send a message in selected channels.

compute_and_purge_metrics ¤

compute_and_purge_metrics(request: HttpRequest, queryset: QuerySet[Incident]) -> None

Will compute metrics for selected incidents and delete metrics that can no longer be computed.

Source code in src/firefighter/incidents/admin.py
@action(description=_("Compute and purge metrics from milestones"))
def compute_and_purge_metrics(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> None:
    """Will compute metrics for selected incidents and delete metrics that can no longer be computed."""
    for incident in queryset:
        incident.compute_metrics(purge=True)
    self.message_user(
        request,
        ngettext(
            "Computed/purged metrics for %d incident.",
            "Computed/purged metrics for %d incidents.",
            len(queryset),
        )
        % len(queryset),
        constants.SUCCESS,
    )

send_message ¤

send_message(request: HttpRequest, queryset: QuerySet[Incident]) -> TemplateResponse | None

Action to send a message in selected channels. This action first displays a confirmation page to enter the message. Next, it sends the message on all selected objects and redirects back to the change list (other fn).

Source code in src/firefighter/incidents/admin.py
@action(
    description=_("Send message on conversation"),
)
def send_message(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> TemplateResponse | None:
    """Action to send a message in selected channels.
    This action first displays a confirmation page to enter the message.
    Next, it sends the message on all selected objects and redirects back to the change list (other fn).
    """
    opts = self.model._meta  # noqa: SLF001

    # Get all the targeted conversations
    target_conversations: list[IncidentChannel] = [
        inc.conversation for inc in queryset.all() if inc.conversation
    ]

    deletable_objects = target_conversations

    # The user has already entered the messages.
    # Send message(s) and return None to display the change list view again.
    if request.POST.get("post"):
        self.process_action_send_message(
            request=request, target_conversations=target_conversations
        )
        return None

    objects_name = model_ngettext(queryset)

    title = _("Type your message")

    context = {
        **self.admin_site.each_context(request),
        "title": title,
        "objects_name": str(objects_name),
        "deletable_objects": [deletable_objects],
        "queryset": queryset,
        "opts": opts,
        "action_checkbox_name": helpers.ACTION_CHECKBOX_NAME,
        "media": self.media,
    }

    # Display the message edit page
    return TemplateResponse(
        request, "admin/send_message_conversation.html", context
    )

IncidentRoleTypeAdmin ¤

Bases: ModelAdmin[IncidentRoleType]

IncidentRoleTypes are fixed at the moment (only commander and communication lead).

enums ¤

Classes:

ClosureReason ¤

Bases: TextChoices

Reasons for direct incident closure bypassing normal workflow.

IncidentStatus ¤

Bases: IntegerChoices

Methods:

choices_lte_skip_postmortem staticmethod ¤

choices_lte_skip_postmortem(val: int) -> list[tuple[int, str]]

Return choices up to val but excluding POST_MORTEM (for P3+ incidents).

Source code in src/firefighter/incidents/enums.py
@staticmethod
def choices_lte_skip_postmortem(val: int) -> list[tuple[int, str]]:
    """Return choices up to val but excluding POST_MORTEM (for P3+ incidents)."""
    return [i for i in IncidentStatus.choices if i[0] <= val and i[0] != IncidentStatus.POST_MORTEM]

factories ¤

Classes:

PriorityFactory ¤

Bases: DjangoModelFactory[Priority]

Factory for creating Priority instances in tests.

Creates Priority objects with random values. Use specific values in tests when testing priority-specific behavior (e.g., P1-P5 JIRA mapping).

forms ¤

Modules:

closure_reason ¤

Form for incident closure with reason when closing from early statuses.

Classes:

IncidentClosureReasonForm ¤

IncidentClosureReasonForm(*args: Any, incident: Incident | None = None, **kwargs: Any)

Bases: Form

Form for closing an incident with a mandatory reason from early statuses.

Source code in src/firefighter/incidents/forms/closure_reason.py
def __init__(self, *args: Any, incident: Incident | None = None, **kwargs: Any) -> None:  # noqa: ARG002
    super().__init__(*args, **kwargs)

    # Exclude RESOLVED from choices as it's for normal workflow closure
    closure_field = self.fields["closure_reason"]
    if hasattr(closure_field, "choices"):
        closure_field.choices = [
            choice for choice in ClosureReason.choices
            if choice[0] != ClosureReason.RESOLVED
        ]

edit ¤

Functions:

initial_environments ¤

initial_environments() -> list[Environment]

Get default environments for the form.

Source code in src/firefighter/incidents/forms/edit.py
def initial_environments() -> list[Environment]:
    """Get default environments for the form."""
    return list(Environment.objects.filter(default=True))

select_impact ¤

Classes:

SelectImpactForm ¤

SelectImpactForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

  • save

    Save the impact choices to the incident.

  • suggest_priority_from_impact

    Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices.

Attributes:

  • business_impact_new (str | None) –

    Get business impact. Will return N/A, Lowest, Low, Medium, High or Highest.

Source code in src/firefighter/incidents/forms/select_impact.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    if "data" in kwargs:
        for key, value in kwargs["data"].items():
            if (
                isinstance(key, str)
                and key.startswith("set_impact_type_")
                and isinstance(value, str)
            ):
                impact_type_name = key[len("set_impact_type_") :]
                with contextlib.suppress(ImpactLevel.DoesNotExist):
                    kwargs["data"][key] = ImpactLevel.objects.get(
                        value=value, impact_type__value=impact_type_name
                    )

    super().__init__(*args, **kwargs)

    for impact_type in ImpactType.objects.all().order_by("order"):
        field_name = f"set_impact_type_{impact_type.value}"
        self.fields[field_name] = forms.ModelChoiceField(
            label=impact_type.emoji + " " + impact_type.name,
            queryset=impact_type.levels.all().order_by("-order"),
            help_text=impact_type.help_text,
            initial=impact_type.levels.get(value=LevelChoices.NONE.value),
        )
        self.fields[field_name].label_from_instance = (  # type: ignore[attr-defined]
            lambda obj: obj.emoji + " " + obj.name
        )
business_impact_new property ¤
business_impact_new: str | None

Get business impact. Will return N/A, Lowest, Low, Medium, High or Highest.

save ¤
save(incident: HasImpactProtocol) -> None

Save the impact choices to the incident.

Source code in src/firefighter/incidents/forms/select_impact.py
def save(self, incident: HasImpactProtocol) -> None:
    """Save the impact choices to the incident."""
    if self.is_valid():
        impacts: dict[str, ImpactLevel] = self.cleaned_data

        # Prepare a list of impact type slugs by removing the 'set_impact_type_' prefix
        impact_type_slugs: list[str] = [
            slug.replace("set_impact_type_", "") for slug in impacts
        ]

        # Fetch all relevant ImpactType instances in a single query
        impact_types: dict[str, ImpactType] = ImpactType.objects.in_bulk(
            impact_type_slugs, field_name="value"
        )

        impact_objects: list[Impact] = []

        for impact_type_slug, impact_level in impacts.items():
            impact_type_slug = impact_type_slug.replace(  # noqa: PLW2901
                "set_impact_type_", ""
            )
            impact_type = impact_types.get(impact_type_slug)
            if impact_type is None:
                logger.warning(
                    f"Could not find impact type {impact_type_slug} in incident {incident.id}"
                )
                continue

            impact = Impact(impact_type=impact_type, impact_level=impact_level)
            impact_objects.append(impact)

        # Create and save all Impact instances in one database call
        Impact.objects.bulk_create(impact_objects)
        impacts_related_manager: ManyRelatedManager[Impact] = cast(
            "ManyRelatedManager[Impact]",
            incident.impacts,
        )
        impacts_related_manager.add(*impact_objects)

    else:
        raise forms.ValidationError("Form is not valid")
suggest_priority_from_impact ¤
suggest_priority_from_impact() -> int

Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices.

Source code in src/firefighter/incidents/forms/select_impact.py
def suggest_priority_from_impact(self) -> int:
    """Suggest a priority from 1 (highest) to 5 (lowest) based on the impact choices."""
    if self.is_valid():
        impact: dict[str, ImpactLevel] = self.cleaned_data

        impact_values = [impact_type.value for impact_type in impact.values()]
        priorities = [level.priority for level in LevelChoices if level in impact_values]
        return min(priorities) if priorities else LevelChoices.NONE.priority
    return LevelChoices.NONE.priority

unified_incident ¤

Classes:

Functions:

PlatformChoices ¤

Bases: TextChoices

Platform choices for incidents.

UnifiedIncidentForm ¤

UnifiedIncidentForm(*args: Any, **kwargs: Any)

Bases: CreateIncidentFormBase

Unified form for all incident types and priorities (P1-P5).

This form dynamically shows/hides fields based on: - Priority/response_type (critical vs normal) - Selected impacts (customer, seller, employee)

Common fields (always shown): - title, description, incident_category - environment (multiple choice) - platform (multiple choice, default ALL) - priority (hidden)

Conditional fields: - suggested_team_routing (P4-P5 only) - zendesk_ticket_id (if customer impact selected) - seller_contract_id, is_key_account, etc. (if seller impact selected)

Methods:

Source code in src/firefighter/incidents/forms/unified_incident.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize form with dynamic queryset for suggested_team_routing."""
    super().__init__(*args, **kwargs)

    # Set queryset for suggested_team_routing
    try:
        from firefighter.raid.models import FeatureTeam  # noqa: PLC0415

        field = typing_cast("forms.ModelChoiceField[Any]", self.fields["suggested_team_routing"])
        field.queryset = FeatureTeam.objects.only("name").order_by("name")
    except ImportError:
        # RAID module not available
        logger.warning("RAID module not available, suggested_team_routing will not work")
clean ¤
clean() -> dict[str, Any]

Custom validation based on response type and impacts.

Source code in src/firefighter/incidents/forms/unified_incident.py
def clean(self) -> dict[str, Any]:
    """Custom validation based on response type and impacts."""
    cleaned_data = super().clean()
    if cleaned_data is None:
        cleaned_data = {}

    # Get response_type from initial data if available
    initial = self.initial or {}
    response_type = initial.get("response_type", "critical")

    # Validate suggested_team_routing is required for normal incidents
    if response_type == "normal" and not cleaned_data.get("suggested_team_routing"):
        self.add_error(
            "suggested_team_routing",
            "Feature Team is required for P4/P5 incidents",
        )

    return cleaned_data
get_visible_fields_for_impacts ¤
get_visible_fields_for_impacts(impacts_data: dict[str, ImpactLevel | str], response_type: str) -> list[str]

Determine which fields should be visible based on impacts and response type.

Parameters:

  • impacts_data (dict[str, ImpactLevel | str]) –

    Dictionary of impact type → impact level (ImpactLevel object or UUID string)

  • response_type (str) –

    "critical" or "normal"

Returns:

  • list[str]

    List of field names that should be visible

Source code in src/firefighter/incidents/forms/unified_incident.py
def get_visible_fields_for_impacts(
    self, impacts_data: dict[str, ImpactLevel | str], response_type: str
) -> list[str]:
    """Determine which fields should be visible based on impacts and response type.

    Args:
        impacts_data: Dictionary of impact type → impact level (ImpactLevel object or UUID string)
        response_type: "critical" or "normal"

    Returns:
        List of field names that should be visible
    """
    visible_fields = [
        "title",
        "description",
        "incident_category",
        "environment",
        "platform",
        "priority",
    ]

    # Add suggested_team_routing for normal incidents (P4-P5)
    if response_type == "normal":
        visible_fields.append("suggested_team_routing")

    # Check impact selections
    customer_impact = None
    seller_impact = None

    for field_name, impact_level in impacts_data.items():
        if "customers_impact" in field_name:
            customer_impact = impact_level
        elif "sellers_impact" in field_name:
            seller_impact = impact_level

    # Helper to check if impact is not NONE
    def has_impact(impact: ImpactLevel | str | None) -> bool:
        if impact is None:
            return False

        # If it's a UUID string, fetch the ImpactLevel from database
        if isinstance(impact, str):
            from firefighter.incidents.models.impact import (  # noqa: PLC0415
                ImpactLevel as ImpactLevelModel,
            )

            try:
                impact_obj = ImpactLevelModel.objects.get(id=impact)
            except ImpactLevelModel.DoesNotExist:
                return False
            else:
                return impact_obj.value != LevelChoices.NONE.value

        # Otherwise it's an ImpactLevel object
        return impact.value != LevelChoices.NONE.value

    # Add customer-specific fields
    if has_impact(customer_impact):
        visible_fields.append("zendesk_ticket_id")

    # Add seller-specific fields
    if has_impact(seller_impact):
        visible_fields.extend([
            "seller_contract_id",
            "is_key_account",
            "is_seller_in_golden_list",
            "zoho_desk_ticket_id",
        ])

    return visible_fields
trigger_incident_workflow ¤
trigger_incident_workflow(
    creator: User, impacts_data: dict[str, ImpactLevel], response_type: str = "critical", *args: Any, **kwargs: Any
) -> Incident

Trigger unified incident workflow for all priorities.

This unified workflow: 1. Always creates an Incident in the database (P1-P5) 2. Conditionally creates a Slack channel (P1-P3 only) 3. Always creates a Jira ticket linked to the Incident (P1-P5)

Parameters:

  • creator (User) –

    User creating the incident

  • impacts_data (dict[str, ImpactLevel]) –

    Dictionary of impact data

  • response_type (str, default: 'critical' ) –

    "critical" (P1-P3) or "normal" (P4-P5)

  • *args (Any, default: () ) –

    Additional positional arguments (unused)

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments (unused)

Returns:

  • Incident

    The created Incident object

Source code in src/firefighter/incidents/forms/unified_incident.py
def trigger_incident_workflow(
    self,
    creator: User,
    impacts_data: dict[str, ImpactLevel],
    response_type: str = "critical",
    *args: Any,
    **kwargs: Any,
) -> Incident:
    """Trigger unified incident workflow for all priorities.

    This unified workflow:
    1. Always creates an Incident in the database (P1-P5)
    2. Conditionally creates a Slack channel (P1-P3 only)
    3. Always creates a Jira ticket linked to the Incident (P1-P5)

    Args:
        creator: User creating the incident
        impacts_data: Dictionary of impact data
        response_type: "critical" (P1-P3) or "normal" (P4-P5)
        *args: Additional positional arguments (unused)
        **kwargs: Additional keyword arguments (unused)

    Returns:
        The created Incident object
    """
    # Step 1: Always create Incident in database (ALL priorities)
    incident = self._create_incident(creator)

    # Save impacts
    impacts_form = SelectImpactForm(impacts_data)
    impacts_form.save(incident=incident)

    # Step 2: Conditionally create Slack channel (P1-P3 only)
    if self._should_create_slack_channel(response_type):
        self._create_slack_channel(incident, impacts_data)

    # Step 3: Always create Jira ticket (ALL priorities) - UNIFIED
    self._create_jira_ticket(incident, creator, impacts_data)

    return incident

initial_environments ¤

initial_environments() -> list[Environment]

Get default environments.

Source code in src/firefighter/incidents/forms/unified_incident.py
def initial_environments() -> list[Environment]:
    """Get default environments."""
    return list(Environment.objects.filter(default=True))

initial_platform ¤

initial_platform() -> str

Get default platform.

Source code in src/firefighter/incidents/forms/unified_incident.py
def initial_platform() -> str:
    """Get default platform."""
    return PlatformChoices.ALL.value

initial_priority ¤

initial_priority() -> Priority

Get default priority.

Source code in src/firefighter/incidents/forms/unified_incident.py
def initial_priority() -> Priority:
    """Get default priority."""
    return Priority.objects.get(default=True)

update_key_events ¤

Classes:

IncidentUpdateKeyEventsForm ¤

IncidentUpdateKeyEventsForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

Source code in src/firefighter/incidents/forms/update_key_events.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
get_milestones_with_data ¤
get_milestones_with_data(milestones_definitions: Iterable[MilestoneType]) -> list[MilestoneTypeData]

Get each firefighter.incidents.models.milestone_type.MilestoneType with its event_ts from the IncidentUpdate.

Source code in src/firefighter/incidents/forms/update_key_events.py
def get_milestones_with_data(
    self,
    milestones_definitions: Iterable[MilestoneType],
) -> list[MilestoneTypeData]:
    """Get each [firefighter.incidents.models.milestone_type.MilestoneType][] with its `event_ts` from the IncidentUpdate."""
    milestones_objects = self.incident.latest_updates_by_type
    ass = []
    for milestone_def in milestones_definitions:
        event = milestones_objects.get(milestone_def.event_type)
        event_ts = event.event_ts if event and event.event_ts else None
        association = MilestoneTypeData(
            milestone_type=milestone_def, data=MilestoneData(event_ts)
        )
        ass.append(association)
    return ass
save ¤
save() -> None

Custom save method to save each changed key event.

Source code in src/firefighter/incidents/forms/update_key_events.py
def save(self) -> None:
    """Custom save method to save each changed key event."""
    self.fields: dict[str, Field]
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("key_event_"):
            continue

        key_event_type = field_name.replace("key_event_", "")

        logger.debug(f"save {field_name} {self.cleaned_data[field_name]}")
        self._save_key_event(key_event_type, self.cleaned_data[field_name])

update_roles ¤

Classes:

IncidentUpdateRolesForm ¤

IncidentUpdateRolesForm(*args: Any, **kwargs: Any)

Bases: Form

Methods:

  • save

    Custom save method to save the updated roles.

Source code in src/firefighter/incidents/forms/update_roles.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident: Incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
save ¤
save() -> None

Custom save method to save the updated roles.

Source code in src/firefighter/incidents/forms/update_roles.py
def save(self) -> None:
    """Custom save method to save the updated roles."""
    self.fields: dict[str, Field]
    roles_to_update: dict[str, User | None] = {}
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("role_"):
            continue

        role_type_slug = field_name.replace("role_", "")

        roles_to_update[role_type_slug] = self.cleaned_data[field_name]
    if self.user is None:
        raise RuntimeError("User is required, was the form validated?")
    self.incident.update_roles(self.user, roles_to_update)

update_status ¤

Classes:

UpdateStatusForm ¤

UpdateStatusForm(*args: Any, incident: Incident | None = None, **kwargs: Any)

Bases: Form

Methods:

Source code in src/firefighter/incidents/forms/update_status.py
def __init__(self, *args: Any, incident: Incident | None = None, **kwargs: Any) -> None:
    super().__init__(*args, **kwargs)

    # Dynamically adjust status choices based on incident requirements
    if incident:
        self._set_status_choices(incident)
requires_closure_reason staticmethod ¤
requires_closure_reason(incident: Incident, target_status: IncidentStatus) -> bool

Check if closing this incident to the target status requires a closure reason.

Based on the workflow diagram: - P1/P2 and P3/P4/P5: require reason when closing from Opened or Investigating

Source code in src/firefighter/incidents/forms/update_status.py
@staticmethod
def requires_closure_reason(incident: Incident, target_status: IncidentStatus) -> bool:
    """Check if closing this incident to the target status requires a closure reason.

    Based on the workflow diagram:
    - P1/P2 and P3/P4/P5: require reason when closing from Opened or Investigating
    """
    if target_status != IncidentStatus.CLOSED:
        return False

    current_status = incident.status

    # Require reason if closing from Opened or Investigating (for any priority)
    return current_status.value in {IncidentStatus.OPEN, IncidentStatus.INVESTIGATING}

utils ¤

Classes:

EnumChoiceField ¤

EnumChoiceField(*args: Any, enum_class: type[T], **kwargs: Any)

Bases: TypedChoiceField

Methods:

  • to_python

    Return a value from the enum class.

  • validate

    Log validation for debugging.

Source code in src/firefighter/incidents/forms/utils.py
def __init__(self, *args: Any, enum_class: type[T], **kwargs: Any) -> None:
    # Explicit for type checking
    self.coerce_func: Callable[[Any], T] = lambda val: enum_class(  # noqa: PLW0108
        val
    )
    self.enum_class = enum_class
    if "choices" not in kwargs:
        kwargs["choices"] = enum_class.choices

    # Customize error messages for better UX
    if "error_messages" not in kwargs:
        kwargs["error_messages"] = {}
    if "invalid_choice" not in kwargs["error_messages"]:
        kwargs["error_messages"]["invalid_choice"] = (
            "The selected value is not valid. Please select a value from the dropdown list."
        )

    super().__init__(*args, coerce=self.coerce_func, **kwargs)
to_python ¤
to_python(value: Any) -> int | str | Any

Return a value from the enum class.

Source code in src/firefighter/incidents/forms/utils.py
def to_python(self, value: Any) -> int | str | Any:
    """Return a value from the enum class."""
    if value in self.empty_values:
        return ""
    # Cast to int if it's a string representation of an int
    if isinstance(value, str) and value.isdigit():
        value = int(value)

    # If value is already a valid enum member, return it
    if isinstance(value, self.enum_class):
        return value
    try:
        return self.coerce_func(value)
    except ValueError as exc:
        err_msg = f"{value} is not a valid {self.enum_class.__name__}"
        raise ValidationError(err_msg, code="invalid_choice") from exc
validate ¤
validate(value: Any) -> None

Log validation for debugging.

Source code in src/firefighter/incidents/forms/utils.py
def validate(self, value: Any) -> None:
    """Log validation for debugging."""
    logger.debug(f"EnumChoiceField.validate: value={value!r} (type={type(value).__name__}), choices={self.choices}")
    return super().validate(value)

menus ¤

Functions:

user_details_url ¤

user_details_url(request: AuthenticatedHttpRequest) -> str | None

Return a personalized title for our profile menu item.

Source code in src/firefighter/incidents/menus.py
def user_details_url(request: AuthenticatedHttpRequest) -> str | None:
    """Return a personalized title for our profile menu item."""
    if request.user.is_authenticated:
        return request.user.get_absolute_url()
    return None  # type: ignore[unreachable]

migrations ¤

Modules:

0006_update_group_names ¤

Functions:

get_group_mappings ¤

get_group_mappings() -> dict

Returns the mapping table for updating existing groups.

Source code in src/firefighter/incidents/migrations/0006_update_group_names.py
def get_group_mappings() -> dict:
    """Returns the mapping table for updating existing groups."""
    return {
        "Platform": ("Platform", 8),
        "Visitors": ("Marketplace", 1),
        "Specialist Offer (Catalog/Seller)": ("Catalog", 4),
        "Operations": ("Operations", 6),
        "Money": ("Payment Operations", 5),
        "Security": ("Security", 10),
        "Corporate IT": ("Corporate IT", 11),
        "Other": ("Other", 12),
        "Data": ("Data", 9),
    }

get_new_groups ¤

get_new_groups() -> dict

Returns a dictionary of new groups to be created.

Source code in src/firefighter/incidents/migrations/0006_update_group_names.py
def get_new_groups() -> dict:
    """Returns a dictionary of new groups to be created."""
    return {
        "Marketing & Communication": 2,
        "Seller": 3,
        "Finance": 7,
    }

0007_update_component_name ¤

Functions:

get_component_mappings ¤

get_component_mappings() -> list

Returns a list of tuples for updating existing component names and their attributes.

Each tuple contains
  • old_name (str): The current name of the component.
  • new_name (str): The new name to assign to the component.
  • slack_channel (str): The associated Slack channel for the component.
  • group_name (str): The name of the group to which the component belongs.

Returns:

  • list ( list ) –

    A list of tuples, each representing the details for a component update.

Source code in src/firefighter/incidents/migrations/0007_update_component_name.py
def get_component_mappings() -> list:
    """
    Returns a list of tuples for updating existing component names and their attributes.

    Each tuple contains:
        - old_name (str): The current name of the component.
        - new_name (str): The new name to assign to the component.
        - slack_channel (str): The associated Slack channel for the component.
        - group_name (str): The name of the group to which the component belongs.

    Returns:
        list: A list of tuples, each representing the details for a component update.
    """
    return [
        ("Commercial Animation (Mabaya cat. integration & ad request, ...)", "Commercial Animation", "impact-commercial-animation", "Marketplace"),
        ("Mobile Apps", "Mobile Apps", "impact-mobile-apps", "Marketplace"),
        ("Spartacux Foundations", "Spartacux Foundations", "impact-spartacux-foundations", "Marketplace"),
        ("Tracking", "Tracking", "impact-tracking", "Marketplace"),
        ("HUB Integrators", "HUB Integrators", "impact-hub-integrators", "Seller"),
        ("Seller Account and Feeds", "Seller Catalog and Offer Management", "impact-seller-catalog-offer-management", "Seller"),
        ("Toolbox", "Seller Admin and Experience", "impact-seller-admin-experience", "Seller"),
        ("Seller Services (Mabaya BO, Subscriptions, MF)", "Seller Services", "impact-seller-services", "Seller"),
        ("Catalog Performance", "Catalog Performance", "impact-catalog-performance", "Catalog"),
        ("Offer (Price, Stock)", "Offer (Price & Stock)", "impact-offer-price-stock", "Catalog"),
        ("Back Office", "BO Catalog - Master Experience", "impact-bo-catalog-master-experience", "Catalog"),
        ("Order Lifecycle", "Order management", "impact-order-management", "Operations"),
        ("Delivery Experience", "Delivery experience", "impact-delivery-experience", "Operations"),
        ("Cloud Infrastructure", "Cloud Infrastructure", "impact-cloud-infrastructure", "Platform"),
        ("Spinak", "Spinak", "impact-spinak", "Platform"),
        ("CDN", "CDN", "impact-cdn", "Platform"),
        ("Gitlab", "Gitlab", "impact-gitlab", "Platform"),
    ]

get_new_components ¤

get_new_components() -> dict

Returns a dictionary of new components to be created.

Each entry in the dictionary maps a component name to a tuple containing: - group_name: The name of the group the component belongs to. - slack_channel: The associated Slack channel for the component.

Returns:

  • dict ( dict ) –

    A mapping of component names to (group name, slack channel) tuples.

Source code in src/firefighter/incidents/migrations/0007_update_component_name.py
def get_new_components() -> dict:
    """
    Returns a dictionary of new components to be created.

    Each entry in the dictionary maps a component name to a tuple containing:
    - group_name: The name of the group the component belongs to.
    - slack_channel: The associated Slack channel for the component.

    Returns:
        dict: A mapping of component names to (group name, slack channel) tuples.
    """
    return {
        "Traffic acquisition": ("Marketing & Communication", "impact-traffic-acquisition"),
        "Company reputation": ("Marketing & Communication", "impact-company-reputation"),
        "Loyalty and coupons": ("Payment Operations", "impact-loyalty-coupons"),
        "Payouts to seller": ("Payment Operations", "impact-payouts-to-seller"),
        "Refunds": ("Payment Operations", "impact-refunds"),
        "Returns": ("Operations", "impact-returns"),
        "Customer service": ("Operations", "impact-customer-service"),
        "Inventory": ("Operations", "impact-inventory"),
        "VAT": ("Finance", "impact-vat"),
        "Seller's invoices": ("Finance", "impact-sellers-invoices"),
        "Customer's invoices": ("Finance", "impact-customers-invoices"),
        "Accounting": ("Finance", "impact-accounting"),
        "Revenue": ("Finance", "impact-revenue"),
        "Compromised laptop / server": ("Security", "impact-compromised-laptop-server"),
    }

0010_update_components ¤

Functions:

  • get_component_mappings

    Returns a list of tuples for updating existing component names and their attributes.

get_component_mappings ¤

get_component_mappings() -> list

Returns a list of tuples for updating existing component names and their attributes.

Each tuple contains
  • old_name (str): The current name of the component.
  • new_name (str): The new name to assign to the component.
  • slack_channel (str): The associated Slack channel for the component.
  • group_name (str): The name of the group to which the component belongs.

Returns:

  • list ( list ) –

    A list of tuples, each representing the details for a component update.

Source code in src/firefighter/incidents/migrations/0010_update_components.py
def get_component_mappings() -> list:
    """
    Returns a list of tuples for updating existing component names and their attributes.

    Each tuple contains:
        - old_name (str): The current name of the component.
        - new_name (str): The new name to assign to the component.
        - slack_channel (str): The associated Slack channel for the component.
        - group_name (str): The name of the group to which the component belongs.

    Returns:
        list: A list of tuples, each representing the details for a component update.
    """
    return [
        # Marketplace
        ("Product Discovery", "Navigation & Product discovery", "impact-nav-product-discovery", "Marketplace"),
        ("User & Purchase", "Cart & funnel", "impact-cart-funnel", "Marketplace"),
        ("Customer Management", "Customer login & signup", "impact-customer-login-signup", "Marketplace"),
        ("Performance", "Web performance", "impact-web-performance", "Marketplace"),
        # Catalog
        ("Product Information", "Product Management", "impact-product-management", "Catalog"),
        ("Taxonomy", "Product Structure", "impact-product-structure", "Catalog"),
        ("Publication on website", "Catalog Exposition", "impact-catalog-exposition", "Catalog"),
        # Payment Operations
        ("Payment", "Payment", "impact-payment", "Payment Operations"),
        # Operations
        ("ManoFulfillment OPS", "MM Fulfillment", "impact-mm-fulfillment", "Operations"),
        ("Helpcenter", "Helpcenter after sales", "impact-helpcenter-after-sales", "Operations"),
        # Finance
        ("Finance Operations", "Controlling", "impact-controlling", "Finance"),
        # Platform
        ("Spinak", "Spinak", "impact-spinak", "Platform"),
        ("CDN", "CDN", "impact-cdn", "Platform"),
        ("Gitlab", "Gitlab", "impact-gitlab", "Platform"),
        # Data
        ("data-platform", "Data Ingestion", "impact-data-ingestion", "Data"),
        ("data-specialist-offer", "Data Warehouse", "impact-data-warehouse", "Data"),
        ("data-wbr", "Data Analytics", "impact-data-analytics", "Data"),
        # Security
        ("Security Misc", "Bot management & rate limiting & WAF", "impact-bot-management-rate-limiting-waf", "Security"),
        ("Attack", "Data leak", "impact-data-leak", "Security"),
        ("System Compromise", "Exploited vulnerability", "impact-exploited-vulnerability", "Security"),
        ("Personal Data Breach", "Stolen account(s) or IT materials", "impact-stolen-accounts-it-materials", "Security"),
    ]

0013_add_missing_component ¤

Functions:

get_new_components ¤

get_new_components() -> dict

Returns a dictionary of new components to be created.

Each entry in the dictionary maps a component name to a tuple containing: - group_name: The name of the group the component belongs to. - slack_channel: The associated Slack channel for the component.

Returns:

  • dict ( dict ) –

    A mapping of component names to (group name, slack channel) tuples.

Source code in src/firefighter/incidents/migrations/0013_add_missing_component.py
def get_new_components() -> dict:
    """
    Returns a dictionary of new components to be created.

    Each entry in the dictionary maps a component name to a tuple containing:
    - group_name: The name of the group the component belongs to.
    - slack_channel: The associated Slack channel for the component.

    Returns:
        dict: A mapping of component names to (group name, slack channel) tuples.
    """
    return {
        "Data Tools": ("Data", "impact-data-tools"),
        "Catalog Access": ("Catalog", "impact-catalog-access"),
    }

0019_set_security_components_private ¤

Functions:

revert_security_components_to_public ¤

revert_security_components_to_public(apps, schema_editor)

Revert all components belonging to the Security group to public, except 'Bot management & rate limiting & WAF'.

Source code in src/firefighter/incidents/migrations/0019_set_security_components_private.py
def revert_security_components_to_public(apps, schema_editor):
    """Revert all components belonging to the Security group to public, except 'Bot management & rate limiting & WAF'."""
    Component = apps.get_model("incidents", "Component")
    Group = apps.get_model("incidents", "Group")

    try:
        security_group = Group.objects.get(name="Security")
        components = Component.objects.filter(group=security_group).exclude(
            name="Bot management & rate limiting & WAF"
        )

        updated_count = 0
        for component in components:
            if component.private:
                logger.info(f"Setting component '{component.name}' to public")
                component.private = False
                component.save()
                updated_count += 1

        logger.info(f"Reverted {updated_count} Security components to public")
    except Group.DoesNotExist:
        logger.warning("Security group not found, skipping reverse migration")

set_security_components_to_private ¤

set_security_components_to_private(apps, schema_editor)

Set all components belonging to the Security group as private, except 'Bot management & rate limiting & WAF'.

Source code in src/firefighter/incidents/migrations/0019_set_security_components_private.py
def set_security_components_to_private(apps, schema_editor):
    """Set all components belonging to the Security group as private, except 'Bot management & rate limiting & WAF'."""
    Component = apps.get_model("incidents", "Component")
    Group = apps.get_model("incidents", "Group")

    try:
        security_group = Group.objects.get(name="Security")
        components = Component.objects.filter(group=security_group).exclude(
            name="Bot management & rate limiting & WAF"
        )

        updated_count = 0
        for component in components:
            if not component.private:
                logger.info(f"Setting component '{component.name}' to private")
                component.private = True
                component.save()
                updated_count += 1

        logger.info(f"Updated {updated_count} Security components to private")
    except Group.DoesNotExist:
        logger.warning("Security group not found, skipping migration")

0021_copy_component_data_to_incident_category ¤

Functions:

copy_component_data_to_incident_category ¤

copy_component_data_to_incident_category(apps, schema_editor)

Copy all data from Component to IncidentCategory

Source code in src/firefighter/incidents/migrations/0021_copy_component_data_to_incident_category.py
def copy_component_data_to_incident_category(apps, schema_editor):
    """Copy all data from Component to IncidentCategory"""
    Component = apps.get_model("incidents", "Component")
    IncidentCategory = apps.get_model("incidents", "IncidentCategory")

    # Copy all components to incident categories with the same fields
    for component in Component.objects.all():
        IncidentCategory.objects.create(
            id=component.id,  # Keep same UUID
            name=component.name,
            description=component.description,
            order=component.order,
            private=component.private,
            deploy_warning=component.deploy_warning,
            created_at=component.created_at,
            updated_at=component.updated_at,
            group=component.group,
        )

reverse_copy_component_data_to_incident_category ¤

reverse_copy_component_data_to_incident_category(apps, schema_editor)

Reverse operation - copy IncidentCategory back to Component if needed

Source code in src/firefighter/incidents/migrations/0021_copy_component_data_to_incident_category.py
def reverse_copy_component_data_to_incident_category(apps, schema_editor):
    """Reverse operation - copy IncidentCategory back to Component if needed"""
    Component = apps.get_model("incidents", "Component")
    IncidentCategory = apps.get_model("incidents", "IncidentCategory")

    # This would only work if Component table still exists during rollback
    for incident_category in IncidentCategory.objects.all():
        Component.objects.create(
            id=incident_category.id,
            name=incident_category.name,
            description=incident_category.description,
            order=incident_category.order,
            private=incident_category.private,
            deploy_warning=incident_category.deploy_warning,
            created_at=incident_category.created_at,
            updated_at=incident_category.updated_at,
            group=incident_category.group,
        )

0023_populate_incident_category_references ¤

Functions:

populate_incident_category_references ¤

populate_incident_category_references(apps, schema_editor)

Copy component references to incident_category references

Source code in src/firefighter/incidents/migrations/0023_populate_incident_category_references.py
def populate_incident_category_references(apps, schema_editor):
    """Copy component references to incident_category references"""
    Incident = apps.get_model("incidents", "Incident")
    IncidentUpdate = apps.get_model("incidents", "IncidentUpdate")

    # Update all incidents to point to the corresponding incident category
    incidents_updated = 0
    for incident in Incident.objects.select_related("component").all():
        if incident.component:
            # Find the corresponding incident category (same UUID)
            incident.incident_category_id = incident.component_id
            incident.save(update_fields=["incident_category"])
            incidents_updated += 1

    # Update all incident updates to point to the corresponding incident category
    updates_updated = 0
    for incident_update in IncidentUpdate.objects.select_related("component").all():
        if incident_update.component:
            incident_update.incident_category_id = incident_update.component_id
            incident_update.save(update_fields=["incident_category"])
            updates_updated += 1

reverse_populate_incident_category_references ¤

reverse_populate_incident_category_references(apps, schema_editor)

Reverse: copy incident_category references back to component references

Source code in src/firefighter/incidents/migrations/0023_populate_incident_category_references.py
def reverse_populate_incident_category_references(apps, schema_editor):
    """Reverse: copy incident_category references back to component references"""
    Incident = apps.get_model("incidents", "Incident")
    IncidentUpdate = apps.get_model("incidents", "IncidentUpdate")

    # Restore component references from incident_category references
    for incident in Incident.objects.select_related("incident_category").all():
        if incident.incident_category:
            incident.component_id = incident.incident_category_id
            incident.save(update_fields=["component"])

    for incident_update in IncidentUpdate.objects.select_related("incident_category").all():
        if incident_update.incident_category:
            incident_update.component_id = incident_update.incident_category_id
            incident_update.save(update_fields=["component"])

models ¤

Modules:

group ¤

Classes:

  • Group

    Group of [firefighter.incidents.models.incident_category.IncidentCategory]. Not a group of users.

Group ¤

Bases: Model

Group of [firefighter.incidents.models.incident_category.IncidentCategory]. Not a group of users.

impact ¤

Classes:

Impact ¤

Bases: Model

Methods:

  • clean

    Ensure impact_type matches impact_level.impact_type.

clean ¤
clean() -> None

Ensure impact_type matches impact_level.impact_type.

Check constraints can't span relationships, so we have to do this manually.

Source code in src/firefighter/incidents/models/impact.py
def clean(self) -> None:
    """Ensure impact_type matches impact_level.impact_type.

    Check constraints can't span relationships, so we have to do this manually.
    """
    if self.impact_type != self.impact_level.impact_type:
        raise ValidationError("impact_type must match impact_level.impact_type")

LevelChoices ¤

Bases: TextChoices

Attributes:

  • emoji (str) –

    Send emoji un function of priority.

  • priority (int) –

    Send level choice priority .

emoji property ¤
emoji: str

Send emoji un function of priority.

priority property ¤
priority: int

Send level choice priority .

incident ¤

Classes:

Functions:

Incident ¤

Bases: Model

Methods:

  • build_invite_list

    Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

  • compute_metrics

    Compute all metrics (time to fix, ...) from events.

  • missing_milestones

    Returns all required Milestones still needed to compute the metrics.

  • update_roles

    Update the roles related to an incident, and create an IncidentUpdate.

Attributes:

  • status_page_url (str) –

    Similar with get_absolute_url but with full domain, to be used out of the website.

status_page_url property ¤
status_page_url: str

Similar with get_absolute_url but with full domain, to be used out of the website.

build_invite_list ¤
build_invite_list() -> list[User]

Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

Returns:

  • list[User]

    list[User]: Potentially non-unique list of Users to invite

Source code in src/firefighter/incidents/models/incident.py
def build_invite_list(self) -> list[User]:
    """Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

    Returns:
        list[User]: Potentially non-unique list of Users to invite
    """
    users_list: list[User] = []

    # Send signal to modules (Confluence, PagerDuty...)
    result_users: list[tuple[Any, Exception | Iterable[User]]] = (
        signals.get_invites.send_robust(sender=None, incident=self)
    )

    # Aggregate the results
    for provider in result_users:
        if isinstance(provider[1], BaseException):
            logger.warning(
                f"Provider {provider[0]} returned an error getting Users to invite: {provider[1]}."
            )
            continue

        users_list.extend(provider[1])

    logger.debug(f"Get invites users list: {users_list}")

    return users_list
compute_metrics ¤
compute_metrics(*, purge: bool = False) -> None

Compute all metrics (time to fix, ...) from events.

Source code in src/firefighter/incidents/models/incident.py
def compute_metrics(self, *, purge: bool = False) -> None:
    """Compute all metrics (time to fix, ...) from events."""
    latest_updates_by_type = self.latest_updates_by_type

    for metric_type in MetricType.objects.all():
        lhs_type = metric_type.milestone_lhs.event_type
        rhs_type = metric_type.milestone_rhs.event_type

        lhs = latest_updates_by_type.get(lhs_type, None)
        rhs = latest_updates_by_type.get(rhs_type, None)

        if not lhs or not lhs.event_ts:
            logger.info(
                f"Missing operand '{lhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        if not rhs or not rhs.event_ts:
            logger.info(
                f"Missing operand '{rhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        duration = lhs.event_ts - rhs.event_ts
        if duration < TD0:
            logger.warning(
                f"Tried to compute a negative metric! Metric {metric_type.type} for #{self.id} has a duration of {duration} ({lhs.event_type}:{lhs.event_ts} - {rhs.event_type}:{rhs.event_ts})"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        IncidentMetric.objects.update_or_create(
            incident=self,
            metric_type=metric_type,
            defaults={"duration": duration},
        )
    self.save()
missing_milestones ¤
missing_milestones() -> list[str]

Returns all required Milestones still needed to compute the metrics.

Source code in src/firefighter/incidents/models/incident.py
def missing_milestones(self) -> list[str]:
    """Returns all required Milestones still needed to compute the metrics."""
    if self.ignore:
        return []

    # We get a list of all event_type of IncidentUpdates...
    milestones_list = (
        IncidentUpdate.objects.filter(
            incident_id=self.id,
            event_ts__isnull=False,
            event_type__isnull=False,
        )
        .aggregate(milestone_list=ArrayAgg("event_type", distinct=True, default=[]))
        .get("milestone_list", [])
    )

    incident_milestones: set[str] = set(cast('list["str"]', milestones_list))
    required_milestone_types = set(
        MilestoneType.objects.filter(required=True).values_list(
            "event_type", flat=True
        )
    )
    # And we make the differences of the sets
    return list(required_milestone_types - incident_milestones)
update_roles ¤
update_roles(updater: User, roles_mapping: dict[str, User | None] | dict[str, User]) -> IncidentUpdate

Update the roles related to an incident, and create an IncidentUpdate. For each role, provide a User or None.

This function will update the incident, create an IncidentUpdate, and trigger the incident_updated signal, with update_roles sender.

Parameters:

  • updater (User) –

    The user who is updating the roles.

  • roles_mapping (dict[str, User | None]) –

    A dict of roles to update, with the new User or None. Defaults to None.

Returns:

  • IncidentUpdate ( IncidentUpdate ) –

    The created IncidentUpdate with the updated roles.

Source code in src/firefighter/incidents/models/incident.py
def update_roles(
    self,
    updater: User,
    roles_mapping: dict[str, User | None] | dict[str, User],
) -> IncidentUpdate:
    """Update the roles related to an incident, and create an IncidentUpdate.
    For each role, provide a User or None.

    This function will update the incident, create an [IncidentUpdate][firefighter.incidents.models.incident_update.IncidentUpdate], and trigger the [incident_updated][firefighter.incidents.signals.incident_updated] signal, with `update_roles` sender.

    Args:
        updater (User): The user who is updating the roles.
        roles_mapping (dict[str, User | None], optional): A dict of roles to update, with the new User or None. Defaults to None.

    Returns:
        IncidentUpdate: The created IncidentUpdate with the updated roles.
    """
    updated_fields: list[str] = []

    # Handle roles
    for role_slug, assigned_user in roles_mapping.items():
        try:
            role_type = IncidentRoleType.objects.get(slug=role_slug)
        except IncidentRoleType.DoesNotExist:
            logger.warning(f"Unknown role type: {role_slug}")
            continue
        if assigned_user is not None:
            IncidentRole.objects.update_or_create(
                incident=self,
                role_type=role_type,
                defaults={"user": assigned_user},
            )
            logger.debug(
                f"Updated role {role_slug} to user ID={assigned_user.id} for #{self.id}"
            )
            updated_fields.append(f"{role_slug}_id")
        elif role_type.required:
            logger.warning(f"Cannot remove required role: {role_slug}")
        else:
            IncidentRole.objects.filter(incident=self, role_type=role_type).delete()
            updated_fields.append(f"{role_slug}_id")

    # Handle legacy roles with IncidentUpdate
    # XXX(dugab): Custom roles are not saved in IncidentUpdate at the moment

    incident_update = IncidentUpdate(
        incident=self,
        created_by=updater,
        commander=roles_mapping.get("commander"),
        communication_lead=roles_mapping.get("communication_lead"),
    )
    incident_update.save()

    incident_updated.send_robust(
        "update_roles",
        incident=self,
        incident_update=incident_update,
        updated_fields=updated_fields,
    )

    return incident_update

IncidentFilterSet ¤

Bases: FilterSet

Set of filters for incidents, shared by Web UI and API.

Methods:

incident_search(queryset: QuerySet[Incident], _name: str, value: str) -> QuerySet[Incident]

Search incidents by title, description, and ID.

Parameters:

  • queryset (QuerySet[Incident]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def incident_search(
    queryset: QuerySet[Incident], _name: str, value: str
) -> QuerySet[Incident]:
    """Search incidents by title, description, and ID.

    Args:
        queryset (QuerySet[Incident]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[Incident]: Search results.
    """
    return Incident.objects.search(queryset=queryset, search_term=value)[0]

IncidentManager ¤

Bases: Manager['Incident']

Methods:

  • declare

    Create an Incident and its first IncidentUpdate.

  • search

    Search for incidents using a search term, on the title and description fields.

declare ¤
declare(**kwargs: Any) -> Incident

Create an Incident and its first IncidentUpdate. Send the incident_created signal. Returns the saved incident, no need to .save().

Source code in src/firefighter/incidents/models/incident.py
def declare(self, **kwargs: Any) -> Incident:
    """Create an Incident and its first IncidentUpdate.
    Send the incident_created signal.
    Returns the saved incident, no need to .save().
    """
    with transaction.atomic():
        if "private" not in kwargs:
            kwargs["private"] = kwargs["incident_category"].private
        if "severity" not in kwargs and "priority" in kwargs:
            kwargs["severity"] = Severity.objects.get(
                value=kwargs["priority"].value
            )
        incident: Incident = super().create(**kwargs)
        for required_default_role in IncidentRoleType.objects.filter(required=True):
            incident.roles_set.add(
                IncidentRole.objects.create(
                    incident=incident,
                    role_type=required_default_role,
                    user=incident.created_by,
                )
            )

        first_incident_update = IncidentUpdate(
            title=incident.title,
            description=incident.description,
            status=incident.status,  # type: ignore[misc]
            priority=incident.priority,
            incident_category=incident.incident_category,
            created_by=incident.created_by,
            commander=incident.created_by,
            incident=incident,
            event_ts=incident.created_at,
        )

        milestone_declared = IncidentUpdate(
            created_by=incident.created_by,
            incident=incident,
            event_type="declared",
            event_ts=incident.created_at,
        )
        first_incident_update.save()
        milestone_declared.save()

        incident.members.add(incident.created_by)

    # Either we have an incident or an error was thrown
    incident_created.send_robust(sender=__name__, incident=incident)
    return incident
search staticmethod ¤
search(queryset: QuerySet[Incident] | None, search_term: str) -> tuple[QuerySet[Incident], bool]

Search for incidents using a search term, on the title and description fields.

Parameters:

  • queryset (QuerySet[Incident] | None) –

    Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).

  • search_term (str) –

    Search term.

Returns:

  • tuple[QuerySet[Incident], bool]

    tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def search(
    queryset: QuerySet[Incident] | None, search_term: str
) -> tuple[QuerySet[Incident], bool]:
    """Search for incidents using a search term, on the title and description fields.

    Args:
        queryset (QuerySet[Incident] | None): Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).
        search_term (str): Search term.

    Returns:
        tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.
    """
    if queryset is None:
        queryset = Incident.objects.all()

    # If not search, return the original queryset
    if search_term is None or search_term.strip() == "":
        return queryset, False

    queryset_search_id = None

    # If the search is just an int, search for this ID + regular search
    try:
        search_term_as_int = int(search_term)
    except ValueError:
        pass
    else:
        queryset_search_id = deepcopy(queryset)
        queryset_search_id = queryset_search_id.filter(id=search_term_as_int)

    # Postgres search on title + description
    # XXX Improve search performance and relevance
    vector = SearchVector("title", config="english", weight="A") + SearchVector(
        "description", config="english", weight="B"
    )
    query = SearchQuery(search_term, config="english", search_type="websearch")
    queryset = (
        queryset.annotate(rank=SearchRank(vector, query))
        .filter(rank__gte=0.1)
        .order_by("-rank")
    )

    # Add the search by id to the search by text if needed
    if queryset_search_id:
        queryset |= queryset_search_id  # type: ignore[operator]

    return queryset, False

incident_category_filter_choices_queryset ¤

incident_category_filter_choices_queryset(_: Any) -> QuerySet[IncidentCategory]

Queryset for choices of IncidentCategories in IncidentFilterSet. Moved it as a function because models are not loaded when creating filters.

Source code in src/firefighter/incidents/models/incident.py
def incident_category_filter_choices_queryset(_: Any) -> QuerySet[IncidentCategory]:
    """Queryset for choices of IncidentCategories in IncidentFilterSet.
    Moved it as a function because models are not loaded when creating filters.
    """
    return (
        IncidentCategory.objects.all()
        .select_related("group")
        .order_by(
            "group__order",
            "name",
        )
    )

incident_category ¤

Classes:

IncidentCategoryFilterSet ¤

Bases: FilterSet

Set of filters for IncidentCategory, share by Web UI and API.

Methods:

incident_category_search(queryset: QuerySet[IncidentCategory], _name: str, value: str) -> QuerySet[IncidentCategory]

Search incident categories by title, description, and ID.

Parameters:

  • queryset (QuerySet[IncidentCategory]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

  • QuerySet[IncidentCategory]

    QuerySet[IncidentCategory]: Search results.

Source code in src/firefighter/incidents/models/incident_category.py
@staticmethod
def incident_category_search(
    queryset: QuerySet[IncidentCategory], _name: str, value: str
) -> QuerySet[IncidentCategory]:
    """Search incident categories by title, description, and ID.

    Args:
        queryset (QuerySet[IncidentCategory]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[IncidentCategory]: Search results.
    """
    return IncidentCategory.objects.search(queryset=queryset, search_term=value)[0]

IncidentCategoryManager ¤

Bases: Manager['IncidentCategory']

Methods:

  • queryset_with_mtbf

    Returns a queryset of incident categories with an additional mtbf field.

queryset_with_mtbf ¤
queryset_with_mtbf(
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[IncidentCategory] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[IncidentCategory]

Returns a queryset of incident categories with an additional mtbf field.

Source code in src/firefighter/incidents/models/incident_category.py
def queryset_with_mtbf(
    self,
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[IncidentCategory] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[IncidentCategory]:
    """Returns a queryset of incident categories with an additional `mtbf` field."""
    date_to = min(date_to, datetime.now(tz=TZ))

    date_interval = date_to - date_from
    queryset = queryset or self.get_queryset()

    return (
        queryset.order_by("group__order", "order")
        .annotate(
            metric_subquery=Subquery(
                IncidentMetric.objects.filter(
                    incident__incident_category=OuterRef("pk"),
                    metric_type__type=metric_type,
                    incident__created_at__gte=date_from,
                    incident__created_at__lte=date_to,
                )
                .values("incident__incident_category")
                .annotate(sum_downtime=Sum("duration"))
                .values("sum_downtime")
            )
        )
        .annotate(
            incident_count=Count(
                "incident",
                filter=Q(
                    incident__created_at__gte=date_from,
                )
                & Q(
                    incident__created_at__lte=date_to,
                ),
            ),
            incidents_downtime=F("metric_subquery"),
            incident_uptime=Value(date_interval) - F("incidents_downtime"),
        )
        .annotate(**{
            field_name: Cast(
                F("incident_uptime") / F("incident_count"),
                output_field=DurationField(),
            )
        })
    )

incident_cost ¤

Classes:

IncidentCost ¤

Bases: Model

Incident Cost is inspired from dispatch.

incident_cost_type ¤

Classes:

IncidentCostType ¤

Bases: Model

Incident Cost Type is inspired from dispatch.

incident_update ¤

Classes:

  • IncidentUpdate

    IncidentUpdate represents a single update to an incident.

Functions:

IncidentUpdate ¤

Bases: Model

IncidentUpdate represents a single update to an incident. One incident can have many incident updates. Only updated fields are stored in the incident update.

set_event_ts ¤

set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None

Add a timestamp on every IncidentUpdate. Not implemented using Django ORM auto_add as it would not make them user editable.

Source code in src/firefighter/incidents/models/incident_update.py
@receiver(pre_save, sender=IncidentUpdate)
# pylint: disable=unused-argument
def set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None:
    """Add a timestamp on every IncidentUpdate.
    Not implemented using Django ORM auto_add as it would not make them user editable.
    """
    if instance.event_ts is None:
        instance.event_ts = timezone.now()  # type: ignore

milestone_type ¤

MilestoneType model.

Classes:

  • MilestoneType

    Represents a milestone type, also known as a key event time or event time.

MilestoneType ¤

Bases: Model

Represents a milestone type, also known as a key event time or event time.

priority ¤

Classes:

  • Priority

    A priority for an incident.

Priority ¤

Bases: Model

A priority for an incident.

user ¤

Classes:

User ¤

Bases: AbstractUser

Attributes:

  • full_name (str) –

    User full name (first + last name). Looks for the first_name and last_name fields, then name, then username. Will return an empty string if none of these are set.

full_name property ¤
full_name: str

User full name (first + last name). Looks for the first_name and last_name fields, then name, then username. Will return an empty string if none of these are set.

signals ¤

Attributes:

create_incident_conversation module-attribute ¤

create_incident_conversation = Signal()

Signal sent to create a conversation for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to create a conversation.

get_invites module-attribute ¤

get_invites = Signal()

Signal sent to retrieve the list of users to invite for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to retrieve the list of users to invite.

Returns:

  • users ( list[User] ) –

    The list of users to invite.

incident_closed module-attribute ¤

incident_closed = Signal()

Signal sent when an incident is closed

Parameters:

  • sender (Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was closed

incident_created module-attribute ¤

incident_created = Signal()

Signal sent when an incident is created

Parameters:

  • incident (Incident) –

    The incident that was created

incident_key_events_updated module-attribute ¤

incident_key_events_updated = Signal()

Signal sent when an incident's key events are updated.

Parameters:

  • incident (Incident) –

    The incident that was updated

incident_updated module-attribute ¤

incident_updated = Signal()

Signal sent when an incident is updated.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was updated

  • incident_update (IncidentUpdate) –

    The incident update that was created

  • update_fields (list[str]) –

    The fields that were updated

  • old_priority (Priority) –

    The old priority of the incident (optional kwarg)

postmortem_created module-attribute ¤

postmortem_created = Signal()

Signal sent when a postmortem is created.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident for which the postmortem was created

tasks ¤

Modules:

updateoncall ¤

Functions:

update_oncall ¤

update_oncall() -> None

Fetch current on-calls and update the on-call Slack topic and Confluence page.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall")
def update_oncall() -> None:
    """Fetch current on-calls and update the on-call Slack topic and Confluence page."""
    chain: Signature[bool] = (
        fetch_oncalls.s()  # pyright: ignore[reportUnboundVariable]
        | update_oncall_views.s()
    )
    chain()

update_oncall_slack_topic ¤

update_oncall_slack_topic(users: dict[str, User]) -> bool

TODO(gab) Move in the Slack app.

Source code in src/firefighter/incidents/tasks/updateoncall.py
def update_oncall_slack_topic(
    users: dict[str, User],
) -> bool:
    """TODO(gab) Move in the Slack app."""
    if not settings.ENABLE_SLACK:
        return False

    oncall_topic = create_oncall_topic(users)
    tech_incidents_conversation = Conversation.objects.get_or_none(tag="tech_incidents")
    if tech_incidents_conversation:
        res = tech_incidents_conversation.update_topic(oncall_topic)
        if res.get("ok"):
            logger.info(
                f"Updated on-call Slack topic on {tech_incidents_conversation}."
            )
            return True
    else:
        logger.warning(
            "Could not find tech_incidents conversation! Is there a channel with tag tech_incidents?"
        )
        return False
    logger.error("Failed to update on-call Slack topic.")
    return False

update_oncall_views ¤

update_oncall_views(*_args: Any, **_kwargs: Any) -> bool

Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall_views")
def update_oncall_views(*_args: Any, **_kwargs: Any) -> bool:
    """Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel."""
    if not settings.ENABLE_PAGERDUTY:
        logger.error("Can't update on-call users without PagerDuty enabled.")
        return False

    oncall_users_grouped_per_ep = PagerDutyOncall.objects.get_current_oncalls_per_escalation_policy_name_first_responder()

    # Check that we have Slack ID and handle for these users, if needed
    if settings.ENABLE_SLACK:
        for user in oncall_users_grouped_per_ep.values():
            if not hasattr(user, "slack_user"):
                SlackUser.objects.add_slack_id_to_user(user)
                logger.debug("Added Slack ID to user: %s", user)
        update_oncall_slack_topic(oncall_users_grouped_per_ep)
    else:
        logger.warning("Not updating on-call Slack topic, Slack integration disabled.")

    update_oncall_confluence(oncall_users_grouped_per_ep)
    logger.info("Updated on-call users on Confluence and Slack topic.")
    return True

views ¤

Modules:

components ¤

Modules:

list ¤

Classes:

IncidentCategoriesViewList ¤

Bases: SingleTableMixin, FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/components/list.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["filter_order"] = [
        "search",
        "metrics_period",
        "group",
    ]
    context["page_title"] = "Issue categories list"
    return context

date_filter ¤

Functions:

get_date_range_from_special_date ¤

get_date_range_from_special_date(unparsed_date: str) -> tuple[datetime | None, datetime | None, str | None, str | None]

TODO Specify the year parameter (ISO or calendar).

Source code in src/firefighter/incidents/views/date_filter.py
def get_date_range_from_special_date(
    unparsed_date: str,
) -> tuple[datetime | None, datetime | None, str | None, str | None]:
    """TODO Specify the year parameter (ISO or calendar)."""
    if "-" in unparsed_date:
        logger.debug("Parsing range, splitting with /")

        if unparsed_date.count("-") == 1:
            split = unparsed_date.split("-")
        elif unparsed_date.count(" - ") == 1:
            split = unparsed_date.split(" - ")
        else:
            err_msg = f"Invalid range: {unparsed_date}. Only one separator is allowed"
            raise ValidationError(err_msg)
        if len(split) == 2:
            for i in range(2):
                split[i] = split[i].strip()

            beg = parse_moment(split[0])
            end = parse_moment(split[1])
            logger.debug(end)
            return (
                beg.start if beg else None,
                end.stop if end else None,
                None,
                split[0] + "-" + split[1],
            )
        # TODO Message warning + logger.warning
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. Only one separator is allowed",
            params={"unparsed_date": unparsed_date},
            code="invalid",
        )
    try:
        date_range = parse_moment(unparsed_date)
    except ValueError as exc:
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. %(error)s",
            params={"unparsed_date": unparsed_date, "error": str(exc)},
            code="invalid",
        ) from exc
    if date_range:
        return date_range.start, date_range.stop, None, "TODO"

    # TODO Message warning + logger.warning
    return None, None, None, None

parse_moment ¤

parse_moment(value: str) -> slice | None

Returns a :class:slice with the :func:slice.start and :func:slice.stop of the given moment. If the moment is not a valid moment, returns None. If the moment is a point in time, returns a slice with the same start and stop.

Source code in src/firefighter/incidents/views/date_filter.py
def parse_moment(value: str) -> slice | None:
    """Returns a :class:`slice` with the :func:`slice.start` and :func:`slice.stop` of the given moment.
    If the moment is not a valid moment, returns None.
    If the moment is a point in time, returns a slice with the same start and stop.
    """
    value = value.strip()

    # Quarter/Week relative (current/previous quarter, current/previous week)
    if value in SPECIAL_RANGES:
        now = timezone.localtime(timezone.now())
        returned_range = get_date_range_from_relative_calendar_value(value, now, value)
        if None not in returned_range:
            return slice(returned_range[0], returned_range[1])

    # Match according to the regex (W20, Q4, YYYY-W20, YYYY-Q4...)
    week_or_quarter_time_frame = (*get_date_range_from_calendar_value(value), value)
    if None not in week_or_quarter_time_frame:
        return slice(week_or_quarter_time_frame[0], week_or_quarter_time_frame[1])

    # Match using dateparser (lots of formats)
    return parse_date_natural(value)

date_utils ¤

Functions:

get_bounds_from_calendar_year ¤

get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]

Calendar Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]:
    """Calendar Year."""
    lower_bound = datetime(year_value, 1, 1, tzinfo=TZ)
    upper_bound = datetime(year_value, 12, 31, 23, 59, 59, 999999, tzinfo=TZ)

    return (
        lower_bound,
        upper_bound,
        f"{year_value}",
    )

get_bounds_from_year ¤

get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]

ISO Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]:
    """ISO Year."""
    lower_bound = datetime.fromisocalendar(year_value, 1, 1)
    upper_week_nb = date(year_value, 12, 28).isocalendar()[1]
    upper_bound = datetime.combine(
        date.fromisocalendar(year=year_value, week=upper_week_nb, day=7),
        time(23, 59, 59, 999999),
    )

    return *make_bounds_timezone_aware(lower_bound, upper_bound), f"ISO{year_value}"

errors ¤

From Django default errors views. https://github.com/django/django/blob/0dd29209091280ccf34e07c9468746c396b7778e/django/views/defaults.py.

Functions:

bad_request ¤

bad_request(request: HttpRequest, exception: Exception, template_name: str = ERROR_400_TEMPLATE_NAME) -> HttpResponse

400 error handler.

Templates: :template:400.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def bad_request(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_400_TEMPLATE_NAME,
) -> HttpResponse:
    """400 error handler.

    Templates: :template:`400.html`
    Context: None
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseBadRequest({
            "error": {"app_code": "BAD_REQUEST", "message": "Bad request"}
        })
    context = {
        "request_path": quote(request.path),
        "page_title": "Bad request",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
        "title": "Bad request",
        "code": "400",
        "message": "Please check the URL in the address bar and try again.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_400_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseBadRequest(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Bad Request (400)", "details": ""},
        )
    # No exception content is passed to the template, to not disclose any
    # sensitive information.
    return HttpResponseBadRequest(template.render(context))

page_not_found ¤

page_not_found(request: HttpRequest, exception: Exception, template_name: str = ERROR_404_TEMPLATE_NAME) -> HttpResponse

Default 404 handler.

Templates: :template:404.html Context: request_path The path of the requested URL (e.g., '/app/pages/bad_page/'). It's quoted to prevent a content injection attack. exception The message from the exception which triggered the 404 (if one was supplied), or the exception class name

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def page_not_found(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_404_TEMPLATE_NAME,
) -> HttpResponse:
    """Default 404 handler.

    Templates: :template:`404.html`
    Context:
        request_path
            The path of the requested URL (e.g., '/app/pages/bad_page/'). It's
            quoted to prevent a content injection attack.
        exception
            The message from the exception which triggered the 404 (if one was
            supplied), or the exception class name
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseNotFound({
            "error": {"app_code": "NOT_FOUND", "message": "Page not found"}
        })
    exception_repr = exception.__class__.__name__
    # Try to get an "interesting" exception message, if any (and not the ugly
    # Resolver404 dictionary)
    try:
        message = exception.args[0] if exception.args else ""
    except (AttributeError, IndexError):
        logger.exception("Error in page_not_found")
    else:
        if isinstance(message, str):
            exception_repr = message
    context = {
        "request_path": quote(request.path),
        "exception": exception_repr,
        "page_title": "Page not found",
        "title": "Page not found",
        "code": "404",
        "message": "Please check the URL in the address bar and try again.",
    }

    try:
        template = loader.get_template(template_name)
        body = template.render(context, request)
    except TemplateDoesNotExist:
        if template_name != ERROR_404_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        # Render template (even though there are no substitutions) to allow
        # inspecting the context in tests.
        template_backup = Engine().from_string(
            BACKUP_ERROR_PAGE_TEMPLATE
            % {
                "title": "Not Found",
                "details": "The requested resource was not found on this server.",
            },
        )

        body = template_backup.render(Context(context))
    return HttpResponseNotFound(body)

permission_denied ¤

permission_denied(request: HttpRequest, exception: Exception, template_name: str = ERROR_403_TEMPLATE_NAME) -> HttpResponse

Permission denied (403) handler.

Templates: :template:403.html Context: exception The message from the exception which triggered the 403 (if one was supplied).

If the template does not exist, an HTTP 403 response containing the text "403 Forbidden" (as per RFC 7231) will be returned.

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def permission_denied(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_403_TEMPLATE_NAME,
) -> HttpResponse:
    """Permission denied (403) handler.

    Templates: :template:`403.html`
    Context:
        exception
            The message from the exception which triggered the 403 (if one was
            supplied).

    If the template does not exist, an HTTP 403 response containing the text
    "403 Forbidden" (as per RFC 7231) will be returned.
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseForbidden({
            "error": {"app_code": "FORBIDDEN", "message": "Forbidden"}
        })
    context = {
        "request_path": quote(request.path),
        "exception": str(exception),
        "page_title": "Forbidden",
        "title": "Forbidden",
        "code": "403",
        "message": "You do not have permission to access this page.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_403_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseForbidden(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "403 Forbidden", "details": ""},
        )
    return HttpResponseForbidden(template.render(request=request, context=context))

server_error ¤

server_error(request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME) -> HttpResponse

500 error handler.

Templates: :template:500.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def server_error(
    request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME
) -> HttpResponse:
    """500 error handler.

    Templates: :template:`500.html`
    Context: None
    """
    context = {
        "page_title": "Server error",
        "title": "Server error",
        "code": "500",
        "message": "Please try again later. This issue has been logged, but feel free to raise the issue.",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
    }
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseServerError({
            "error": {"app_code": "SERVER_ERROR", "message": "Server error"}
        })
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_500_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseServerError(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Server Error (500)", "details": ""},
        )
    return HttpResponseServerError(template.render(context))

users ¤

Modules:

details ¤

Classes:

UserDetailView ¤

Bases: CustomDetailView[User]

In this view, be extra careful.

In this context, user is the logged_in user and target_user is the user of the profile being viewed.

views ¤

Classes:

DashboardView ¤

Bases: ListView[Incident]

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Incidents Dashboard"
    return context

IncidentListView ¤

Bases: SingleTableMixin, FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_range"] = context["table"].paginator.get_elided_page_range(
        context["table"].page.number, on_each_side=2, on_ends=1
    )
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "incident_category",
    ]

    context["page_title"] = "Incidents List"
    context["api_url_export"] = reverse("api:incidents-list")
    return context

IncidentStatisticsView ¤

Bases: FilterView

Methods:

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Statistics"
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "incident_category",
    ]
    context_data = weekly_dashboard_context(
        self.request, context.get("incidents_filtered", [])
    )
    return {**context, **context_data}

IncidentUpdateKeyEventsView ¤

Bases: SingleObjectMixin[Incident], LoginRequiredMixin, FormView[IncidentUpdateKeyEventsForm]

Methods:

  • get_form

    Replace Markdown bold syntax with HTML bold syntax in form labels.

get_form ¤
get_form(form_class: type[IncidentUpdateKeyEventsForm] | None = None) -> IncidentUpdateKeyEventsForm

Replace Markdown bold syntax with HTML bold syntax in form labels.

Source code in src/firefighter/incidents/views/views.py
def get_form(
    self, form_class: type[IncidentUpdateKeyEventsForm] | None = None
) -> IncidentUpdateKeyEventsForm:
    """Replace Markdown bold syntax with HTML bold syntax in form labels."""
    form = super().get_form(form_class)
    for field in form.fields:
        # Replace *ABC* with <bold>ABC</bold>
        form.fields[field].label = re.sub(
            r"\*(.*?)\*", r"<b>\1</b>", str(form.fields[field].label)
        )
    return form

ProcessAfterResponse ¤

ProcessAfterResponse(redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any)

Bases: HttpResponseRedirect

Custom Response, to trigger the Slack workflow after creating the incident and returning HTTP 201.

TODO This does not work, the workflow is triggered before the response is sent. We need to do a celery task! TODO We need to redirect to the incident page or Slack conversation.

Source code in src/firefighter/incidents/views/views.py
def __init__(
    self, redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any
) -> None:
    super().__init__(redirect_to, *args, **kwargs)
    self.data = data