Skip to content

incidents ¤

admin ¤

EnvironmentAdmin ¤

Bases: ModelAdmin[Environment]

get_readonly_fields ¤

get_readonly_fields(request: HttpRequest, obj: Environment | None = None) -> _ListOrTuple[str]

Deny changing the value of an existing object.

Source code in src/firefighter/incidents/admin.py
def get_readonly_fields(
    self,
    request: HttpRequest,  # noqa: ARG002
    obj: Environment | None = None,
) -> _ListOrTuple[str]:
    """Deny changing the value of an existing object."""
    if obj:  # editing an existing object
        return *self.readonly_fields, "value"
    return self.readonly_fields

IncidentAdmin ¤

Bases: ModelAdmin[Incident]

compute_and_purge_metrics ¤

compute_and_purge_metrics(request: HttpRequest, queryset: QuerySet[Incident]) -> None

Will compute metrics for selected incidents and delete metrics that can no longer be computed.

Source code in src/firefighter/incidents/admin.py
@action(description=_("Compute and purge metrics from milestones"))
def compute_and_purge_metrics(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> None:
    """Will compute metrics for selected incidents and delete metrics that can no longer be computed."""
    for incident in queryset:
        incident.compute_metrics(purge=True)
    self.message_user(
        request,
        ngettext(
            "Computed/purged metrics for %d incident.",
            "Computed/purged metrics for %d incidents.",
            len(queryset),
        )
        % len(queryset),
        constants.SUCCESS,
    )

send_message ¤

send_message(request: HttpRequest, queryset: QuerySet[Incident]) -> None | TemplateResponse

Action to send a message in selected channels. This action first displays a confirmation page to enter the message. Next, it sends the message on all selected objects and redirects back to the change list (other fn).

Source code in src/firefighter/incidents/admin.py
@action(
    description=_("Send message on conversation"),
)
def send_message(
    self, request: HttpRequest, queryset: QuerySet[Incident]
) -> None | TemplateResponse:
    """Action to send a message in selected channels.
    This action first displays a confirmation page to enter the message.
    Next, it sends the message on all selected objects and redirects back to the change list (other fn).
    """
    opts = self.model._meta  # noqa: SLF001

    # Get all the targeted conversations
    target_conversations: list[IncidentChannel] = [
        inc.conversation for inc in queryset.all() if inc.conversation
    ]

    deletable_objects = target_conversations

    # The user has already entered the messages.
    # Send message(s) and return None to display the change list view again.
    if request.POST.get("post"):
        self.process_action_send_message(
            request=request, target_conversations=target_conversations
        )
        return None

    objects_name = model_ngettext(queryset)

    title = _("Type your message")

    context = {
        **self.admin_site.each_context(request),
        "title": title,
        "objects_name": str(objects_name),
        "deletable_objects": [deletable_objects],
        "queryset": queryset,
        "opts": opts,
        "action_checkbox_name": helpers.ACTION_CHECKBOX_NAME,
        "media": self.media,
    }

    # Display the message edit page
    return TemplateResponse(
        request, "admin/send_message_conversation.html", context
    )

IncidentRoleTypeAdmin ¤

Bases: ModelAdmin[IncidentRoleType]

IncidentRoleTypes are fixed at the moment (only commander and communication lead).

factories ¤

forms ¤

close_incident ¤

create_incident ¤

select_impact ¤

SelectImpactForm ¤

SelectImpactForm(*args: Any, **kwargs: Any)

Bases: Form

Source code in src/firefighter/incidents/forms/select_impact.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    if "data" in kwargs:
        for key, value in kwargs["data"].items():
            if (
                isinstance(key, str)
                and key.startswith("set_impact_type_")
                and isinstance(value, str)
            ):
                impact_type_name = key[len("set_impact_type_") :]
                with contextlib.suppress(ImpactLevel.DoesNotExist):
                    kwargs["data"][key] = ImpactLevel.objects.get(
                        value=value, impact_type__value=impact_type_name
                    )

    super().__init__(*args, **kwargs)

    for impact_type in ImpactType.objects.all().order_by("-order"):
        field_name = f"set_impact_type_{impact_type.value}"
        self.fields[field_name] = forms.ModelChoiceField(
            label=impact_type.emoji + " " + impact_type.name,
            queryset=impact_type.levels.all().order_by("-order"),
            help_text=impact_type.help_text,
            initial=impact_type.levels.get(value=LevelChoices.NONE.value),
        )
        self.fields[field_name].label_from_instance = (  # type: ignore[attr-defined]
            lambda obj: obj.emoji + " " + obj.name
        )
business_impact_new property ¤
business_impact_new: str | None

Get business impact. Will return N/A, Low, Medium or High.

save ¤
save(incident: HasImpactProtocol) -> None

Save the impact choices to the incident.

Source code in src/firefighter/incidents/forms/select_impact.py
def save(self, incident: HasImpactProtocol) -> None:
    """Save the impact choices to the incident."""
    if self.is_valid():
        impacts: dict[str, ImpactLevel] = self.cleaned_data

        # Prepare a list of impact type slugs by removing the 'set_impact_type_' prefix
        impact_type_slugs: list[str] = [
            slug.replace("set_impact_type_", "") for slug in impacts
        ]

        # Fetch all relevant ImpactType instances in a single query
        impact_types: dict[str, ImpactType] = ImpactType.objects.in_bulk(
            impact_type_slugs, field_name="value"
        )

        impact_objects: list[Impact] = []

        for impact_type_slug, impact_level in impacts.items():
            impact_type_slug = impact_type_slug.replace(  # noqa: PLW2901
                "set_impact_type_", ""
            )
            impact_type = impact_types.get(impact_type_slug)
            if impact_type is None:
                logger.warning(
                    f"Could not find impact type {impact_type_slug} in incident {incident.id}"
                )
                continue

            impact = Impact(impact_type=impact_type, impact_level=impact_level)
            impact_objects.append(impact)

        # Create and save all Impact instances in one database call
        Impact.objects.bulk_create(impact_objects)
        impacts_related_manager: ManyRelatedManager[Impact] = cast(
            "ManyRelatedManager[Impact]",
            incident.impacts,
        )
        impacts_related_manager.add(*impact_objects)

    else:
        raise forms.ValidationError("Form is not valid")
suggest_priority_from_impact ¤
suggest_priority_from_impact() -> int

Suggest a priority from 1 (highest) to 4 (lowest) based on the impact choices.

Source code in src/firefighter/incidents/forms/select_impact.py
def suggest_priority_from_impact(self) -> int:
    """Suggest a priority from 1 (highest) to 4 (lowest) based on the impact choices."""
    if self.is_valid():
        impact: dict[str, ImpactLevel] = self.cleaned_data

        impact_values = [impact_type.value for impact_type in impact.values()]
        if "HI" in impact_values:
            return 1
        if "MD" in impact_values:
            return 2
        if impact_values.count("LO") > 2:
            return 3
        if "LO" in impact_values or "NO" in impact_values:
            return 4
        return 4
    return 4

update_key_events ¤

IncidentUpdateKeyEventsForm ¤

IncidentUpdateKeyEventsForm(*args: Any, **kwargs: Any)

Bases: Form

Source code in src/firefighter/incidents/forms/update_key_events.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
get_milestones_with_data ¤
get_milestones_with_data(milestones_definitions: Iterable[MilestoneType]) -> list[MilestoneTypeData]

Get each firefighter.incidents.models.milestone_type.MilestoneType with its event_ts from the IncidentUpdate.

Source code in src/firefighter/incidents/forms/update_key_events.py
def get_milestones_with_data(
    self,
    milestones_definitions: Iterable[MilestoneType],
) -> list[MilestoneTypeData]:
    """Get each [firefighter.incidents.models.milestone_type.MilestoneType][] with its `event_ts` from the IncidentUpdate."""
    milestones_objects = self.incident.latest_updates_by_type
    ass = []
    for milestone_def in milestones_definitions:
        event = milestones_objects.get(milestone_def.event_type)
        event_ts = event.event_ts if event and event.event_ts else None
        association = MilestoneTypeData(
            milestone_type=milestone_def, data=MilestoneData(event_ts)
        )
        ass.append(association)
    return ass
save ¤
save() -> None

Custom save method to save each changed key event.

Source code in src/firefighter/incidents/forms/update_key_events.py
def save(self) -> None:
    """Custom save method to save each changed key event."""
    self.fields: dict[str, Field]
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("key_event_"):
            continue

        key_event_type = field_name.replace("key_event_", "")

        logger.debug(f"save {field_name} {self.cleaned_data[field_name]}")
        self._save_key_event(key_event_type, self.cleaned_data[field_name])

update_roles ¤

IncidentUpdateRolesForm ¤

IncidentUpdateRolesForm(*args: Any, **kwargs: Any)

Bases: Form

Source code in src/firefighter/incidents/forms/update_roles.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    self.incident: Incident = kwargs.pop("incident")
    self.user = kwargs.pop("user", None)
    super().__init__(*args, **kwargs)
    self.generate_fields_dynamically()
save ¤
save() -> None

Custom save method to save the updated roles.

Source code in src/firefighter/incidents/forms/update_roles.py
def save(self) -> None:
    """Custom save method to save the updated roles."""
    self.fields: dict[str, Field]
    roles_to_update: dict[str, User | None] = {}
    for field_name in self.fields:
        if field_name not in self.changed_data or self.fields[field_name].disabled:
            logger.debug("skipping %s", field_name)
            continue
        if not field_name.startswith("role_"):
            continue

        role_type_slug = field_name.replace("role_", "")

        roles_to_update[role_type_slug] = self.cleaned_data[field_name]
    if self.user is None:
        raise RuntimeError("User is required, was the form validated?")
    self.incident.update_roles(self.user, roles_to_update)

update_status ¤

utils ¤

EnumChoiceField ¤

EnumChoiceField(*args: Any, enum_class: type[T], **kwargs: Any)

Bases: TypedChoiceField

Source code in src/firefighter/incidents/forms/utils.py
def __init__(self, *args: Any, enum_class: type[T], **kwargs: Any) -> None:
    # Explicit for type checking
    self.coerce_func: Callable[[Any], T] = lambda val: enum_class(  # noqa: PLW0108
        val
    )
    self.enum_class = enum_class
    if "choices" not in kwargs:
        kwargs["choices"] = enum_class.choices
    super().__init__(*args, coerce=self.coerce_func, **kwargs)
to_python ¤
to_python(value: Any) -> int | str | Any

Return a value from the enum class.

Source code in src/firefighter/incidents/forms/utils.py
def to_python(self, value: Any) -> int | str | Any:
    """Return a value from the enum class."""
    if value in self.empty_values:
        return ""
    # Cast to int if it's a string representation of an int
    if isinstance(value, str) and value.isdigit():
        value = int(value)

    # If value is already a valid enum member, return it
    if isinstance(value, self.enum_class):
        return value
    try:
        return self.coerce_func(value)
    except ValueError as exc:
        err_msg = f"{value} is not a valid {self.enum_class.__name__}"
        raise ValidationError(err_msg, code="invalid_choice") from exc

menus ¤

user_details_url ¤

user_details_url(request: AuthenticatedHttpRequest) -> str | None

Return a personalized title for our profile menu item.

Source code in src/firefighter/incidents/menus.py
def user_details_url(request: AuthenticatedHttpRequest) -> str | None:
    """Return a personalized title for our profile menu item."""
    if request.user.is_authenticated:
        return request.user.get_absolute_url()
    return None  # type: ignore[unreachable]

migrations ¤

0001_initial_oss ¤

models ¤

component ¤

Component ¤

Bases: Model

ComponentFilterSet ¤

Bases: FilterSet

Set of filters for Component, share by Web UI and API.

component_search(queryset: QuerySet[Component], _name: str, value: str) -> QuerySet[Component]

Search incidents by title, description, and ID.

Parameters:

  • queryset (QuerySet[Component]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

  • QuerySet[Component]

    QuerySet[Component]: Search results.

Source code in src/firefighter/incidents/models/component.py
@staticmethod
def component_search(
    queryset: QuerySet[Component], _name: str, value: str
) -> QuerySet[Component]:
    """Search incidents by title, description, and ID.

    Args:
        queryset (QuerySet[Component]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[Component]: Search results.
    """
    return Component.objects.search(queryset=queryset, search_term=value)[0]

ComponentManager ¤

Bases: Manager['Component']

queryset_with_mtbf ¤
queryset_with_mtbf(
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[Component] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[Component]

Returns a queryset of components with an additional mtbf field.

Source code in src/firefighter/incidents/models/component.py
def queryset_with_mtbf(
    self,
    date_from: datetime,
    date_to: datetime,
    queryset: QuerySet[Component] | None = None,
    metric_type: str = "time_to_fix",
    field_name: str = "mtbf",
) -> QuerySet[Component]:
    """Returns a queryset of components with an additional `mtbf` field."""
    if date_to > datetime.now(tz=TZ):
        date_to = datetime.now(tz=TZ)

    date_interval = date_to - date_from
    queryset = queryset or self.get_queryset()

    return (
        queryset.order_by("group__order", "order")
        .annotate(
            metric_subquery=Subquery(
                IncidentMetric.objects.filter(
                    incident__component=OuterRef("pk"),
                    metric_type__type=metric_type,
                    incident__created_at__gte=date_from,
                    incident__created_at__lte=date_to,
                )
                .values("incident__component")
                .annotate(sum_downtime=Sum("duration"))
                .values("sum_downtime")
            )
        )
        .annotate(
            incident_count=Count(
                "incident",
                filter=Q(
                    incident__created_at__gte=date_from,
                )
                & Q(
                    incident__created_at__lte=date_to,
                ),
            ),
            incidents_downtime=F("metric_subquery"),
            incident_uptime=Value(date_interval) - F("incidents_downtime"),
        )
        .annotate(
            **{
                field_name: Cast(
                    F("incident_uptime") / F("incident_count"),
                    output_field=DurationField(),
                )
            }
        )
    )

impact ¤

Impact ¤

Bases: Model

clean ¤
clean() -> None

Ensure impact_type matches impact_level.impact_type.

Check constraints can't span relationships, so we have to do this manually.

Source code in src/firefighter/incidents/models/impact.py
def clean(self) -> None:
    """Ensure impact_type matches impact_level.impact_type.

    Check constraints can't span relationships, so we have to do this manually.
    """
    if self.impact_type != self.impact_level.impact_type:
        raise ValidationError("impact_type must match impact_level.impact_type")

incident ¤

Incident ¤

Bases: Model

status_page_url property ¤
status_page_url: str

Similar with get_absolute_url but with full domain, to be used out of the website.

build_invite_list ¤
build_invite_list() -> list[User]

Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

Returns:

  • list[User]

    list[User]: Potentially non-unique list of Users to invite

Source code in src/firefighter/incidents/models/incident.py
def build_invite_list(self) -> list[User]:
    """Send a Django Signal to get the list of users to invite from different integrations (Slack, Confluence, PagerDuty...).

    Returns:
        list[User]: Potentially non-unique list of Users to invite
    """
    users_list: list[User] = []

    # Send signal to modules (Confluence, PagerDuty...)
    result_users: list[
        tuple[Any, Exception | list[User]]
    ] = signals.get_invites.send_robust(sender=None, incident=self)

    # Aggregate the results
    for provider in result_users:
        if isinstance(provider[1], BaseException):
            logger.warning(
                f"Provider {provider[0]} returned an error getting Users to invite: {provider[1]}."
            )
            continue

        users_list.extend(provider[1])

    logger.debug(f"Get invites users list: {users_list}")

    return users_list
compute_metrics ¤
compute_metrics(*, purge: bool = False) -> None

Compute all metrics (time to fix, ...) from events.

Source code in src/firefighter/incidents/models/incident.py
def compute_metrics(self, *, purge: bool = False) -> None:
    """Compute all metrics (time to fix, ...) from events."""
    latest_updates_by_type = self.latest_updates_by_type

    for metric_type in MetricType.objects.all():
        lhs_type = metric_type.milestone_lhs.event_type
        rhs_type = metric_type.milestone_rhs.event_type

        lhs = latest_updates_by_type.get(lhs_type, None)
        rhs = latest_updates_by_type.get(rhs_type, None)

        if not lhs or not lhs.event_ts:
            logger.info(
                f"Missing operand '{lhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        if not rhs or not rhs.event_ts:
            logger.info(
                f"Missing operand '{rhs_type}' on metric {metric_type.type} for #{self.id}"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        duration = lhs.event_ts - rhs.event_ts
        if duration < TD0:
            logger.warning(
                f"Tried to compute a negative metric! Metric {metric_type.type} for #{self.id} has a duration of {duration} ({lhs.event_type}:{lhs.event_ts} - {rhs.event_type}:{rhs.event_ts})"
            )
            if purge:
                IncidentMetric.objects.filter(
                    incident=self, metric_type=metric_type
                ).delete()
            continue
        IncidentMetric.objects.update_or_create(
            incident=self,
            metric_type=metric_type,
            defaults={"duration": duration},
        )
    self.save()
missing_milestones ¤
missing_milestones() -> list[str]

Returns all required Milestones still needed to compute the metrics.

Source code in src/firefighter/incidents/models/incident.py
def missing_milestones(self) -> list[str]:
    """Returns all required Milestones still needed to compute the metrics."""
    if self.ignore:
        return []

    # We get a list of all event_type of IncidentUpdates...
    milestones_list = (
        IncidentUpdate.objects.filter(
            incident_id=self.id,
            event_ts__isnull=False,
            event_type__isnull=False,
        )
        .aggregate(milestone_list=ArrayAgg("event_type", distinct=True, default=[]))
        .get("milestone_list", [])
    )

    incident_milestones: set[str] = set(cast('list["str"]', milestones_list))
    required_milestone_types = set(
        MilestoneType.objects.filter(required=True).values_list(
            "event_type", flat=True
        )
    )
    # And we make the differences of the sets
    return list(required_milestone_types - incident_milestones)
update_roles ¤
update_roles(updater: User, roles_mapping: dict[str, User | None] | dict[str, User]) -> IncidentUpdate

Update the roles related to an incident, and create an IncidentUpdate. For each role, provide a User or None.

This function will update the incident, create an IncidentUpdate, and trigger the incident_updated signal, with update_roles sender.

Parameters:

  • updater (User) –

    The user who is updating the roles.

  • roles_mapping (dict[str, User | None]) –

    A dict of roles to update, with the new User or None. Defaults to None.

Returns:

  • IncidentUpdate ( IncidentUpdate ) –

    The created IncidentUpdate with the updated roles.

Source code in src/firefighter/incidents/models/incident.py
def update_roles(
    self,
    updater: User,
    roles_mapping: dict[str, User | None] | dict[str, User],
) -> IncidentUpdate:
    """Update the roles related to an incident, and create an IncidentUpdate.
    For each role, provide a User or None.

    This function will update the incident, create an [IncidentUpdate][firefighter.incidents.models.incident_update.IncidentUpdate], and trigger the [incident_updated][firefighter.incidents.signals.incident_updated] signal, with `update_roles` sender.

    Args:
        updater (User): The user who is updating the roles.
        roles_mapping (dict[str, User | None], optional): A dict of roles to update, with the new User or None. Defaults to None.

    Returns:
        IncidentUpdate: The created IncidentUpdate with the updated roles.
    """
    updated_fields: list[str] = []

    # Handle roles
    for role_slug, assigned_user in roles_mapping.items():
        try:
            role_type = IncidentRoleType.objects.get(slug=role_slug)
        except IncidentRoleType.DoesNotExist:
            logger.warning(f"Unknown role type: {role_slug}")
            continue
        if assigned_user is not None:
            IncidentRole.objects.update_or_create(
                incident=self,
                role_type=role_type,
                defaults={"user": assigned_user},
            )
            logger.debug(
                f"Updated role {role_slug} to user ID={assigned_user.id} for #{self.id}"
            )
            updated_fields.append(f"{role_slug}_id")
        elif role_type.required:
            logger.warning(f"Cannot remove required role: {role_slug}")
        else:
            IncidentRole.objects.filter(incident=self, role_type=role_type).delete()
            updated_fields.append(f"{role_slug}_id")

    # Handle legacy roles with IncidentUpdate
    # XXX(dugab): Custom roles are not saved in IncidentUpdate at the moment

    incident_update = IncidentUpdate(
        incident=self,
        created_by=updater,
        commander=roles_mapping.get("commander"),
        communication_lead=roles_mapping.get("communication_lead"),
    )
    incident_update.save()

    incident_updated.send_robust(
        "update_roles",
        incident=self,
        incident_update=incident_update,
        updated_fields=updated_fields,
    )

    return incident_update

IncidentFilterSet ¤

Bases: FilterSet

Set of filters for incidents, shared by Web UI and API.

incident_search(queryset: QuerySet[Incident], _name: str, value: str) -> QuerySet[Incident]

Search incidents by title, description, and ID.

Parameters:

  • queryset (QuerySet[Incident]) –

    Queryset to search in.

  • _name (str) –
  • value (str) –

    Value to search for.

Returns:

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def incident_search(
    queryset: QuerySet[Incident], _name: str, value: str
) -> QuerySet[Incident]:
    """Search incidents by title, description, and ID.

    Args:
        queryset (QuerySet[Incident]): Queryset to search in.
        _name:
        value (str): Value to search for.

    Returns:
        QuerySet[Incident]: Search results.
    """
    return Incident.objects.search(queryset=queryset, search_term=value)[0]

IncidentManager ¤

Bases: Manager['Incident']

declare ¤
declare(**kwargs: Any) -> Incident

Create an Incident and its first IncidentUpdate. Send the incident_created signal. Returns the saved incident, no need to .save().

Source code in src/firefighter/incidents/models/incident.py
def declare(self, **kwargs: Any) -> Incident:
    """Create an Incident and its first IncidentUpdate.
    Send the incident_created signal.
    Returns the saved incident, no need to .save().
    """
    with transaction.atomic():
        if "private" not in kwargs:
            kwargs["private"] = kwargs["component"].private
        if "severity" not in kwargs and "priority" in kwargs:
            kwargs["severity"] = Severity.objects.get(
                value=kwargs["priority"].value
            )
        incident: Incident = super().create(**kwargs)
        for required_default_role in IncidentRoleType.objects.filter(required=True):
            incident.roles_set.add(
                IncidentRole.objects.create(
                    incident=incident,
                    role_type=required_default_role,
                    user=incident.created_by,
                )
            )

        first_incident_update = IncidentUpdate(
            title=incident.title,
            description=incident.description,
            status=incident.status,  # type: ignore[misc]
            priority=incident.priority,
            component=incident.component,
            created_by=incident.created_by,
            commander=incident.created_by,
            incident=incident,
            event_ts=incident.created_at,
        )

        milestone_declared = IncidentUpdate(
            created_by=incident.created_by,
            incident=incident,
            event_type="declared",
            event_ts=incident.created_at,
        )
        first_incident_update.save()
        milestone_declared.save()

        incident.members.add(incident.created_by)

    # Either we have an incident or an error was thrown
    incident_created.send_robust(sender=__name__, incident=incident)
    return incident
search staticmethod ¤
search(queryset: QuerySet[Incident] | None, search_term: str) -> tuple[QuerySet[Incident], bool]

Search for incidents using a search term, on the title and description fields.

Parameters:

  • queryset (QuerySet[Incident] | None) –

    Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).

  • search_term (str) –

    Search term.

Returns:

  • tuple[QuerySet[Incident], bool]

    tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.

Source code in src/firefighter/incidents/models/incident.py
@staticmethod
def search(
    queryset: QuerySet[Incident] | None, search_term: str
) -> tuple[QuerySet[Incident], bool]:
    """Search for incidents using a search term, on the title and description fields.

    Args:
        queryset (QuerySet[Incident] | None): Queryset to search in. If None, search in all incidents. The Queryset allows to search on a subset of incidents (already filtered).
        search_term (str): Search term.

    Returns:
        tuple[QuerySet[Incident], bool]: Queryset of incidents matching the search term, and a boolean indicating if the search may contain duplicates objects.
    """
    if queryset is None:
        queryset = Incident.objects.all()

    # If not search, return the original queryset
    if search_term is None or search_term.strip() == "":
        return queryset, False

    queryset_search_id = None

    # If the search is just an int, search for this ID + regular search
    try:
        search_term_as_int = int(search_term)
    except ValueError:
        pass
    else:
        queryset_search_id = deepcopy(queryset)
        queryset_search_id = queryset_search_id.filter(id=search_term_as_int)

    # Postgres search on title + description
    # XXX Improve search performance and relevance
    vector = SearchVector("title", config="english", weight="A") + SearchVector(
        "description", config="english", weight="B"
    )
    query = SearchQuery(search_term, config="english", search_type="websearch")
    queryset = (
        queryset.annotate(rank=SearchRank(vector, query))
        .filter(rank__gte=0.1)
        .order_by("-rank")
    )

    # Add the search by id to the search by text if needed
    if queryset_search_id:
        queryset |= queryset_search_id  # type: ignore[operator]

    return queryset, False

component_filter_choices_queryset ¤

component_filter_choices_queryset(_: Any) -> QuerySet[Component]

Queryset for choices of Components in IncidentFilterSet. Moved it as a function because models are not loaded when creating filters.

Source code in src/firefighter/incidents/models/incident.py
def component_filter_choices_queryset(_: Any) -> QuerySet[Component]:
    """Queryset for choices of Components in IncidentFilterSet.
    Moved it as a function because models are not loaded when creating filters.
    """
    return (
        Component.objects.all()
        .select_related("group")
        .order_by(
            "group__order",
            "name",
        )
    )

incident_cost ¤

IncidentCost ¤

Bases: Model

Incident Cost is inspired from dispatch.

incident_cost_type ¤

IncidentCostType ¤

Bases: Model

Incident Cost Type is inspired from dispatch.

incident_membership ¤

incident_update ¤

IncidentUpdate ¤

Bases: Model

IncidentUpdate represents a single update to an incident. One incident can have many incident updates. Only updated fields are stored in the incident update.

set_event_ts ¤

set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None

Add a timestamp on every IncidentUpdate. Not implemented using Django ORM auto_add as it would not make them user editable.

Source code in src/firefighter/incidents/models/incident_update.py
@receiver(pre_save, sender=IncidentUpdate)
# pylint: disable=unused-argument
def set_event_ts(sender: Any, instance: IncidentUpdate, **kwargs: Any) -> None:
    """Add a timestamp on every IncidentUpdate.
    Not implemented using Django ORM auto_add as it would not make them user editable.
    """
    if instance.event_ts is None:
        instance.event_ts = timezone.now()  # type: ignore

metric_type ¤

milestone_type ¤

MilestoneType model.

MilestoneType ¤

Bases: Model

Represents a milestone type, also known as a key event time or event time.

priority ¤

Priority ¤

Bases: Model

A priority for an incident.

user ¤

User ¤

Bases: AbstractUser

full_name property ¤
full_name: str

User full name (first + last name). Looks for the first_name and last_name fields, then name, then username. Will return an empty string if none of these are set.

signals ¤

create_incident_conversation module-attribute ¤

create_incident_conversation = django.dispatch.Signal()

Signal sent to create a conversation for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to create a conversation.

get_invites module-attribute ¤

get_invites = django.dispatch.Signal()

Signal sent to retrieve the list of users to invite for an incident.

Parameters:

  • incident (Incident) –

    The incident for which to retrieve the list of users to invite.

Returns:

  • users ( list[User] ) –

    The list of users to invite.

incident_closed module-attribute ¤

incident_closed = django.dispatch.Signal()

Signal sent when an incident is closed

Parameters:

  • sender (Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was closed

incident_created module-attribute ¤

incident_created = django.dispatch.Signal()

Signal sent when an incident is created

Parameters:

  • incident (Incident) –

    The incident that was created

incident_key_events_updated module-attribute ¤

incident_key_events_updated = django.dispatch.Signal()

Signal sent when an incident's key events are updated.

Parameters:

  • incident (Incident) –

    The incident that was updated

incident_updated module-attribute ¤

incident_updated = django.dispatch.Signal()

Signal sent when an incident is updated.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident that was updated

  • incident_update (IncidentUpdate) –

    The incident update that was created

  • update_fields (list[str]) –

    The fields that were updated

  • old_priority (Priority) –

    The old priority of the incident (optional kwarg)

new_qualifier module-attribute ¤

new_qualifier = django.dispatch.Signal()

Signal sent when a new qualifier.

postmortem_created module-attribute ¤

postmortem_created = django.dispatch.Signal()

Signal sent when a postmortem is created.

Parameters:

  • sender (str | Any) –

    The sender of the signal name

  • incident (Incident) –

    The incident for which the postmortem was created

tables ¤

tasks ¤

updateoncall ¤

update_oncall ¤

update_oncall() -> None

Fetch current on-calls and update the on-call Slack topic and Confluence page.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall")
def update_oncall() -> None:
    """Fetch current on-calls and update the on-call Slack topic and Confluence page."""
    chain: Signature[bool] = (
        fetch_oncalls.s()  # pyright: ignore[reportUnboundVariable]
        | update_oncall_views.s()
    )
    chain()

update_oncall_slack_topic ¤

update_oncall_slack_topic(users: dict[str, User]) -> bool

TODO(gab) Move in the Slack app.

Source code in src/firefighter/incidents/tasks/updateoncall.py
def update_oncall_slack_topic(
    users: dict[str, User],
) -> bool:
    """TODO(gab) Move in the Slack app."""
    if not settings.ENABLE_SLACK:
        return False

    oncall_topic = create_oncall_topic(users)
    tech_incidents_conversation = Conversation.objects.get_or_none(tag="tech_incidents")
    if tech_incidents_conversation:
        res = tech_incidents_conversation.update_topic(oncall_topic)
        if res.get("ok"):
            logger.info(
                f"Updated on-call Slack topic on {tech_incidents_conversation}."
            )
            return True
    else:
        logger.warning(
            "Could not find tech_incidents conversation! Is there a channel with tag tech_incidents?"
        )
        return False
    logger.error("Failed to update on-call Slack topic.")
    return False

update_oncall_views ¤

update_oncall_views(*_args: Any, **_kwargs: Any) -> bool

Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel.

Source code in src/firefighter/incidents/tasks/updateoncall.py
@shared_task(name="incidents.update_oncall_views")
def update_oncall_views(*_args: Any, **_kwargs: Any) -> bool:
    """Updates the on-call Slack topic and Confluence page containing the info for the on-call personnel."""
    if not settings.ENABLE_PAGERDUTY:
        logger.error("Can't update on-call users without PagerDuty enabled.")
        return False

    oncall_users_grouped_per_ep = (
        PagerDutyOncall.objects.get_current_oncalls_per_escalation_policy_name_first_responder()
    )

    # Check that we have Slack ID and handle for these users, if needed
    # if settings.ENABLE_SLACK:
    #     for user in oncall_users_grouped_per_ep.values():
    #         if not hasattr(user, "slack_user"):
    #             SlackUser.objects.add_slack_id_to_user(user)
    #             logger.debug("Added Slack ID to user: %s", user)
    #     update_oncall_slack_topic(oncall_users_grouped_per_ep)
    # else:
    #     logger.warning("Not updating on-call Slack topic, Slack integration disabled.")

    update_oncall_confluence(oncall_users_grouped_per_ep)
    logger.info("Updated on-call users on Confluence and Slack topic.")
    return True

urls ¤

views ¤

components ¤

details ¤

list ¤

ComponentsViewList ¤

Bases: SingleTableMixin, FilterView

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/components/list.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["filter_order"] = [
        "search",
        "metrics_period",
        "group",
    ]
    context["page_title"] = "Components List"
    return context

date_filter ¤

get_date_range_from_special_date ¤

get_date_range_from_special_date(unparsed_date: str) -> tuple[datetime | None, datetime | None, str | None, str | None]

TODO Specify the year parameter (ISO or calendar).

Source code in src/firefighter/incidents/views/date_filter.py
def get_date_range_from_special_date(
    unparsed_date: str,
) -> tuple[datetime | None, datetime | None, str | None, str | None]:
    """TODO Specify the year parameter (ISO or calendar)."""
    if "-" in unparsed_date:
        logger.debug("Parsing range, splitting with /")

        if unparsed_date.count("-") == 1:
            split = unparsed_date.split("-")
        elif unparsed_date.count(" - ") == 1:
            split = unparsed_date.split(" - ")
        else:
            err_msg = f"Invalid range: {unparsed_date}. Only one separator is allowed"
            raise ValidationError(err_msg)
        if len(split) == 2:
            for i in range(2):
                split[i] = split[i].strip()

            beg = parse_moment(split[0])
            end = parse_moment(split[1])
            logger.debug(end)
            return (
                beg.start if beg else None,
                end.stop if end else None,
                None,
                split[0] + "-" + split[1],
            )
        # TODO Message warning + logger.warning
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. Only one separator is allowed",
            params={"unparsed_date": unparsed_date},
            code="invalid",
        )
    try:
        date_range = parse_moment(unparsed_date)
    except ValueError as exc:
        raise ValidationError(
            "Invalid range: %(unparsed_date)s. %(error)s",
            params={"unparsed_date": unparsed_date, "error": str(exc)},
            code="invalid",
        ) from exc
    if date_range:
        return date_range.start, date_range.stop, None, "TODO"

    # TODO Message warning + logger.warning
    return None, None, None, None

parse_moment ¤

parse_moment(value: str) -> slice | None

Returns a :class:slice with the :func:slice.start and :func:slice.stop of the given moment. If the moment is not a valid moment, returns None. If the moment is a point in time, returns a slice with the same start and stop.

Source code in src/firefighter/incidents/views/date_filter.py
def parse_moment(value: str) -> slice | None:
    """Returns a :class:`slice` with the :func:`slice.start` and :func:`slice.stop` of the given moment.
    If the moment is not a valid moment, returns None.
    If the moment is a point in time, returns a slice with the same start and stop.
    """
    value = value.strip()

    # Quarter/Week relative (current/previous quarter, current/previous week)
    if value in SPECIAL_RANGES:
        now = timezone.localtime(timezone.now())
        returned_range = get_date_range_from_relative_calendar_value(value, now, value)
        if None not in returned_range:
            return slice(returned_range[0], returned_range[1])

    # Match according to the regex (W20, Q4, YYYY-W20, YYYY-Q4...)
    week_or_quarter_time_frame = (*get_date_range_from_calendar_value(value), value)
    if None not in week_or_quarter_time_frame:
        return slice(week_or_quarter_time_frame[0], week_or_quarter_time_frame[1])

    # Match using dateparser (lots of formats)
    return parse_date_natural(value)

date_utils ¤

get_bounds_from_calendar_year ¤

get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]

Calendar Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_calendar_year(year_value: int) -> tuple[datetime, datetime, str]:
    """Calendar Year."""
    lower_bound = datetime(year_value, 1, 1, tzinfo=TZ)
    upper_bound = datetime(year_value, 12, 31, 23, 59, 59, 999999, tzinfo=TZ)

    return (
        lower_bound,
        upper_bound,
        f"{year_value}",
    )

get_bounds_from_year ¤

get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]

ISO Year.

Source code in src/firefighter/incidents/views/date_utils.py
def get_bounds_from_year(year_value: int) -> tuple[datetime, datetime, str]:
    """ISO Year."""
    lower_bound = datetime.fromisocalendar(year_value, 1, 1)
    upper_week_nb = date(year_value, 12, 28).isocalendar()[1]
    upper_bound = datetime.combine(
        date.fromisocalendar(year=year_value, week=upper_week_nb, day=7),
        time(23, 59, 59, 999999),
    )

    return *make_bounds_timezone_aware(lower_bound, upper_bound), f"ISO{year_value}"

docs ¤

metrics ¤

role_types ¤

errors ¤

From Django default errors views. https://github.com/django/django/blob/0dd29209091280ccf34e07c9468746c396b7778e/django/views/defaults.py.

bad_request ¤

bad_request(request: HttpRequest, exception: Exception, template_name: str = ERROR_400_TEMPLATE_NAME) -> HttpResponse

400 error handler.

Templates: :template:400.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def bad_request(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_400_TEMPLATE_NAME,
) -> HttpResponse:
    """400 error handler.

    Templates: :template:`400.html`
    Context: None
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseBadRequest(
            {"error": {"app_code": "BAD_REQUEST", "message": "Bad request"}}
        )
    context = {
        "request_path": quote(request.path),
        "page_title": "Bad request",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
        "title": "Bad request",
        "code": "400",
        "message": "Please check the URL in the address bar and try again.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_400_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseBadRequest(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Bad Request (400)", "details": ""},
        )
    # No exception content is passed to the template, to not disclose any
    # sensitive information.
    return HttpResponseBadRequest(template.render(context))

page_not_found ¤

page_not_found(request: HttpRequest, exception: Exception, template_name: str = ERROR_404_TEMPLATE_NAME) -> HttpResponse

Default 404 handler.

Templates: :template:404.html Context: request_path The path of the requested URL (e.g., '/app/pages/bad_page/'). It's quoted to prevent a content injection attack. exception The message from the exception which triggered the 404 (if one was supplied), or the exception class name

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def page_not_found(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_404_TEMPLATE_NAME,
) -> HttpResponse:
    """Default 404 handler.

    Templates: :template:`404.html`
    Context:
        request_path
            The path of the requested URL (e.g., '/app/pages/bad_page/'). It's
            quoted to prevent a content injection attack.
        exception
            The message from the exception which triggered the 404 (if one was
            supplied), or the exception class name
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseNotFound(
            {"error": {"app_code": "NOT_FOUND", "message": "Page not found"}}
        )
    exception_repr = exception.__class__.__name__
    # Try to get an "interesting" exception message, if any (and not the ugly
    # Resolver404 dictionary)
    try:
        message = exception.args[0] if exception.args else ""
    except (AttributeError, IndexError):
        logger.exception("Error in page_not_found")
    else:
        if isinstance(message, str):
            exception_repr = message
    context = {
        "request_path": quote(request.path),
        "exception": exception_repr,
        "page_title": "Page not found",
        "title": "Page not found",
        "code": "404",
        "message": "Please check the URL in the address bar and try again.",
    }

    try:
        template = loader.get_template(template_name)
        body = template.render(context, request)
    except TemplateDoesNotExist:
        if template_name != ERROR_404_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        # Render template (even though there are no substitutions) to allow
        # inspecting the context in tests.
        template_backup = Engine().from_string(
            BACKUP_ERROR_PAGE_TEMPLATE
            % {
                "title": "Not Found",
                "details": "The requested resource was not found on this server.",
            },
        )

        body = template_backup.render(Context(context))
    return HttpResponseNotFound(body)

permission_denied ¤

permission_denied(request: HttpRequest, exception: Exception, template_name: str = ERROR_403_TEMPLATE_NAME) -> HttpResponse

Permission denied (403) handler.

Templates: :template:403.html Context: exception The message from the exception which triggered the 403 (if one was supplied).

If the template does not exist, an HTTP 403 response containing the text "403 Forbidden" (as per RFC 7231) will be returned.

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def permission_denied(
    request: HttpRequest,
    exception: Exception,
    template_name: str = ERROR_403_TEMPLATE_NAME,
) -> HttpResponse:
    """Permission denied (403) handler.

    Templates: :template:`403.html`
    Context:
        exception
            The message from the exception which triggered the 403 (if one was
            supplied).

    If the template does not exist, an HTTP 403 response containing the text
    "403 Forbidden" (as per RFC 7231) will be returned.
    """
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseForbidden(
            {"error": {"app_code": "FORBIDDEN", "message": "Forbidden"}}
        )
    context = {
        "request_path": quote(request.path),
        "exception": str(exception),
        "page_title": "Forbidden",
        "title": "Forbidden",
        "code": "403",
        "message": "You do not have permission to access this page.",
    }
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_403_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseForbidden(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "403 Forbidden", "details": ""},
        )
    return HttpResponseForbidden(template.render(request=request, context=context))

server_error ¤

server_error(request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME) -> HttpResponse

500 error handler.

Templates: :template:500.html Context: None

Source code in src/firefighter/incidents/views/errors.py
@requires_csrf_token
def server_error(
    request: HttpRequest, template_name: str = ERROR_500_TEMPLATE_NAME
) -> HttpResponse:
    """500 error handler.

    Templates: :template:`500.html`
    Context: None
    """
    context = {
        "page_title": "Server error",
        "title": "Server error",
        "code": "500",
        "message": "Please try again later. This issue has been logged, but feel free to raise the issue.",
        "APP_DISPLAY_NAME": settings.APP_DISPLAY_NAME,
    }
    if (
        request.headers.get("Accept", "") == JSON_CONTENT_TYPE
        or request.headers.get("Content-Type", "") == JSON_CONTENT_TYPE
    ):
        return JsonHttpResponseServerError(
            {"error": {"app_code": "SERVER_ERROR", "message": "Server error"}}
        )
    try:
        template = loader.get_template(template_name)
    except TemplateDoesNotExist:
        if template_name != ERROR_500_TEMPLATE_NAME:
            # Reraise if it's a missing custom template.
            raise
        return HttpResponseServerError(
            BACKUP_ERROR_PAGE_TEMPLATE % {"title": "Server Error (500)", "details": ""},
        )
    return HttpResponseServerError(template.render(context))

reports ¤

users ¤

details ¤

UserDetailView ¤

Bases: CustomDetailView[User]

In this view, be extra careful.

In this context, user is the logged_in user and target_user is the user of the profile being viewed.

views ¤

DashboardView ¤

Bases: ListView[Incident]

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Incidents Dashboard"
    return context

IncidentListView ¤

Bases: SingleTableMixin, FilterView

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    # Call the base implementation first to get a context
    context = super().get_context_data(**kwargs)
    context["page_range"] = context["table"].paginator.get_elided_page_range(
        context["table"].page.number, on_each_side=2, on_ends=1
    )
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "component",
    ]

    context["page_title"] = "Incidents List"
    context["api_url_export"] = reverse("api:incidents-list")
    return context

IncidentStatisticsView ¤

Bases: FilterView

get_context_data ¤
get_context_data(**kwargs: Any) -> dict[str, Any]

No *args to pass.

Source code in src/firefighter/incidents/views/views.py
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
    """No *args to pass."""
    context = super().get_context_data(**kwargs)
    context["page_title"] = "Statistics"
    context["filter_order"] = [
        "search",
        "created_at",
        "status",
        "environment",
        "priority",
        "component",
    ]
    context_data = weekly_dashboard_context(
        self.request, context.get("incidents_filtered", [])
    )
    return {**context, **context_data}

IncidentUpdateKeyEventsView ¤

Bases: SingleObjectMixin[Incident], LoginRequiredMixin, FormView[IncidentUpdateKeyEventsForm]

get_form ¤
get_form(form_class: type[IncidentUpdateKeyEventsForm] | None = None) -> IncidentUpdateKeyEventsForm

Replace Markdown bold syntax with HTML bold syntax in form labels.

Source code in src/firefighter/incidents/views/views.py
def get_form(
    self, form_class: type[IncidentUpdateKeyEventsForm] | None = None
) -> IncidentUpdateKeyEventsForm:
    """Replace Markdown bold syntax with HTML bold syntax in form labels."""
    form = super().get_form(form_class)
    for field in form.fields:
        # Replace *ABC* with <bold>ABC</bold>
        form.fields[field].label = re.sub(
            r"\*(.*?)\*", r"<b>\1</b>", str(form.fields[field].label)
        )
    return form

ProcessAfterResponse ¤

ProcessAfterResponse(redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any)

Bases: HttpResponseRedirect

Custom Response, to trigger the Slack workflow after creating the incident and returning HTTP 201.

TODO This does not work, the workflow is triggered before the response is sent. We need to do a celery task! TODO We need to redirect to the incident page or Slack conversation.

Source code in src/firefighter/incidents/views/views.py
def __init__(
    self, redirect_to: str, data: dict[str, Any], *args: Any, **kwargs: Any
) -> None:
    super().__init__(redirect_to, *args, **kwargs)
    self.data = data