Skip to content

jira_app ¤

Modules:

client ¤

Classes:

JiraClient ¤

JiraClient()

Methods:

Source code in src/firefighter/jira_app/client.py
def __init__(self) -> None:
    self.url = RAID_JIRA_API_URL

assign_issue ¤

assign_issue(issue_key: str, account_id: str) -> bool

Assign a Jira issue to a user.

Parameters:

  • issue_key (str) –

    Jira issue key (e.g., "INCIDENT-123")

  • account_id (str) –

    Jira account ID of the user

Returns:

  • bool

    True if assignment succeeded, False otherwise

Note

This method does not raise exceptions. Assignment failures are logged as warnings since assignment is typically an optional operation.

Source code in src/firefighter/jira_app/client.py
def assign_issue(self, issue_key: str, account_id: str) -> bool:
    """Assign a Jira issue to a user.

    Args:
        issue_key: Jira issue key (e.g., "INCIDENT-123")
        account_id: Jira account ID of the user

    Returns:
        True if assignment succeeded, False otherwise

    Note:
        This method does not raise exceptions. Assignment failures are logged
        as warnings since assignment is typically an optional operation.
    """
    try:
        self.jira.assign_issue(issue_key, account_id)
    except exceptions.JIRAError as e:
        logger.warning(
            "Failed to assign issue %s to user %s: %s",
            issue_key,
            account_id,
            e.text if hasattr(e, "text") else str(e),
        )
        return False
    else:
        logger.info("Assigned issue %s to user %s", issue_key, account_id)
        return True

create_postmortem_issue ¤

create_postmortem_issue(project_key: str, issue_type: str, fields: dict[str, Any], parent_issue_key: str | None = None) -> dict[str, Any]

Create a Jira post-mortem issue with custom fields.

Parameters:

  • project_key (str) –

    Jira project key (e.g., "INCIDENT")

  • issue_type (str) –

    Issue type name (e.g., "Post-mortem")

  • fields (dict[str, Any]) –

    Dictionary of field IDs to values

  • parent_issue_key (str | None, default: None ) –

    Optional parent issue key to link this post-mortem to

Returns:

  • dict[str, Any]

    Dictionary with 'key' and 'id' of created issue

Raises:

  • JiraAPIError

    If issue creation fails

Source code in src/firefighter/jira_app/client.py
def create_postmortem_issue(
    self,
    project_key: str,
    issue_type: str,
    fields: dict[str, Any],
    parent_issue_key: str | None = None,
) -> dict[str, Any]:
    """Create a Jira post-mortem issue with custom fields.

    Args:
        project_key: Jira project key (e.g., "INCIDENT")
        issue_type: Issue type name (e.g., "Post-mortem")
        fields: Dictionary of field IDs to values
        parent_issue_key: Optional parent issue key to link this post-mortem to

    Returns:
        Dictionary with 'key' and 'id' of created issue

    Raises:
        JiraAPIError: If issue creation fails
    """
    try:
        issue_dict: dict[str, Any] = {
            "project": {"key": project_key},
            "issuetype": {"name": issue_type},
            **fields,
        }

        # Create the issue first without parent link
        issue = self.jira.create_issue(fields=issue_dict)
        logger.info(
            "Created post-mortem issue %s in project %s", issue.key, project_key
        )

        # Create issue link to parent if provided
        # Using link instead of parent to avoid hierarchy restrictions
        if parent_issue_key:
            self._create_issue_link_safe(
                parent_issue_key=parent_issue_key,
                postmortem_issue_key=issue.key,
            )

    except exceptions.JIRAError as e:
        logger.exception("Failed to create Jira issue in project %s", project_key)
        error_msg = f"Failed to create Jira issue: {e.status_code} {e.text}"
        raise JiraAPIError(error_msg) from e
    else:
        return {
            "key": issue.key,
            "id": issue.id,
        }

get_jira_user_from_jira_id ¤

get_jira_user_from_jira_id(jira_account_id: str) -> JiraUser

Look for a Jira User in DB, if not found, fetch it from Jira API.

Parameters:

  • jira_account_id (str) –

    Jira account id

Raises:

  • JiraUserNotFoundError

    User not found in Jira nor in DB

  • JiraUserDatabaseError

    Unable to create user in DB

  • ValueError

    Empty jira_account_id

Returns:

  • JiraUser ( JiraUser ) –

    Jira user object

Source code in src/firefighter/jira_app/client.py
def get_jira_user_from_jira_id(self, jira_account_id: str) -> JiraUser:
    """Look for a Jira User in DB, if not found, fetch it from Jira API.

    Args:
        jira_account_id (str): Jira account id

    Raises:
        JiraUserNotFoundError: User not found in Jira nor in DB
        JiraUserDatabaseError: Unable to create user in DB
        ValueError: Empty jira_account_id

    Returns:
        JiraUser: Jira user object
    """
    if jira_account_id is None or jira_account_id == "":
        err_msg = f"Jira account id is empty ('{jira_account_id}')"
        raise ValueError(err_msg)

    # Look in the DB
    try:
        return JiraUser.objects.get(id=jira_account_id)
    except JiraUser.DoesNotExist:
        logger.debug(
            f"Jira user {jira_account_id} not found in DB, fetching from Jira API"
        )
    logger.info("User %s not found in DB. Check sync user task.", jira_account_id)

    # Look on JIRA API
    jira_api_user, email = self._get_user_from_api(jira_account_id)

    username: str = email.split("@")[0]
    # Check if we have user with same email
    try:
        user: User = User.objects.select_related("jira_user").get(email=email)
        if (
            hasattr(user, "jira_user")
            and user.jira_user
            and isinstance(user.jira_user, JiraUser)
        ):
            return user.jira_user
        try:
            return JiraUser.objects.create(
                id=jira_account_id,
                user=user,
            )
        except db.IntegrityError as e:
            logger.exception("Error creating user %s", jira_account_id)
            raise JiraUserDatabaseError("Unable to create user") from e

    except User.DoesNotExist:
        logger.warning("User %s not found in DB. Creating it...", jira_account_id)
        user = self._create_user_from_jira_info(
            jira_account_id, jira_api_user, email, username
        )

    try:
        return JiraUser.objects.create(
            id=jira_account_id,
            user=user,
        )
    except db.IntegrityError as e:
        logger.exception("Error creating user %s", jira_account_id)
        raise JiraUserDatabaseError("Unable to create user") from e

get_jira_user_from_user ¤

get_jira_user_from_user(user: User) -> JiraUser

Fetches a Jira user from the Jira API.

Parameters:

  • user (User) –

    User object

Raises:

  • JiraUserNotFoundError

    User not found in Jira

Returns:

  • JiraAPIUser ( JiraUser ) –

    Jira API user object

Source code in src/firefighter/jira_app/client.py
def get_jira_user_from_user(self, user: User) -> JiraUser:
    """Fetches a Jira user from the Jira API.

    Args:
        user (User): User object

    Raises:
        JiraUserNotFoundError: User not found in Jira

    Returns:
        JiraAPIUser: Jira API user object
    """
    # Check if user has a Jira user
    if hasattr(user, "jira_user") and user.jira_user:
        return user.jira_user

    username = user.email.split("@")[0]
    jira_user = self._fetch_jira_user(username)

    return JiraUser.objects.update_or_create(
        id=jira_user.raw.get("accountId"), defaults={"user": user}
    )[0]

get_watchers_from_jira_ticket ¤

get_watchers_from_jira_ticket(jira_issue_id: int | str) -> list[User]

Fetch watchers for a specific Jira ticket from Jira API.

Parameters:

  • jira_issue_id (str | int) –

    Jira issue id

Raises:

Returns:

  • list ( User ) –

    List of Jira users object, or empty list if ticket doesn't exist

Source code in src/firefighter/jira_app/client.py
def get_watchers_from_jira_ticket(
    self, jira_issue_id: int | str
) -> list[JiraAPIUser]:
    """Fetch watchers for a specific Jira ticket from Jira API.

    Args:
        jira_issue_id (str | int): Jira issue id

    Raises:
        ValueError: Empty issue id

    Returns:
        list(JiraAPIUser): List of Jira users object, or empty list if ticket doesn't exist
    """
    try:
        watchers = self.jira.watchers(jira_issue_id).raw.get("watchers")
    except exceptions.JIRAError as e:
        if e.status_code == 404:
            logger.warning(
                "Jira ticket %s not found or no permission to access it. Cannot fetch watchers.",
                jira_issue_id,
            )
            return []
        raise
    else:
        if len(watchers) == 0:
            logger.debug("No watchers found for jira_issue_id '%s'.", jira_issue_id)
        return watchers

transition_issue_auto ¤

transition_issue_auto(issue_id: str | int, target_status_name: str, workflow_name: str) -> None

Attempts to close an issue by applying transitions to it.

Parameters:

  • issue_id (str | int) –

    Jira issue id

  • target_status_name (str) –

    target status name

  • workflow_name (str) –

    workflow name

Source code in src/firefighter/jira_app/client.py
def transition_issue_auto(
    self, issue_id: str | int, target_status_name: str, workflow_name: str
) -> None:
    """Attempts to close an issue by applying transitions to it.

    Args:
        issue_id (str | int): Jira issue id
        target_status_name (str): target status name
        workflow_name (str): workflow name
    """
    issue_id = str(issue_id)
    transitions_info = self._get_transitions(
        self._get_project_config_workflow_from_builder_base(workflow_name)
    )
    if len(transitions_info) == 0:
        logger.error(
            f"Could not find transitions for issue id={issue_id}! Not closing issue."
        )

    # Get closed state id
    # XXX Use a list of closed states to support multiple workflows, or better
    closed_state_id = get_status_id_from_name(transitions_info, target_status_name)
    if closed_state_id is None:
        logger.warning(
            f"Could not find target status '{target_status_name}' id for issue {issue_id}! Not closing issue."
        )
        return

    # Get current issue status
    issue = self.jira.issue(issue_id)
    current_status_id = int(issue.fields.status.id)

    # Get transitions to apply
    transitions_to_apply = get_transitions_to_apply(
        current_status_id, transitions_info, closed_state_id
    )

    if len(transitions_to_apply) == 0:
        logger.info(f"Issue {issue_id} is already closed. Not closing again.")

    # Apply transitions
    # XXX Better error handling
    for transition in transitions_to_apply:
        logger.debug(f"Running transition: {transition}")
        self.jira.transition_issue(
            issue=issue_id,
            transition=transition,
            fields={},
        )

management ¤

Management commands for jira_app.

Modules:

  • commands

    Management commands for jira_app.

commands ¤

Management commands for jira_app.

models ¤

Classes:

JiraIssue ¤

Bases: Model

Jira issue model.

JiraPostMortem ¤

Bases: Model

Jira Post-mortem linked to an Incident.

Attributes:

issue_url property ¤

issue_url: str

Return Jira issue URL.

JiraUser ¤

Bases: Model

Jira user model. It maps an User with the Jira account id.

service_postmortem ¤

Service for creating and managing Jira post-mortems.

Classes:

JiraPostMortemService ¤

JiraPostMortemService()

Service for creating and managing Jira post-mortems.

Methods:

Source code in src/firefighter/jira_app/service_postmortem.py
def __init__(self) -> None:
    self.client = JiraClient()
    self.project_key = getattr(settings, "JIRA_POSTMORTEM_PROJECT_KEY", "INCIDENT")
    self.issue_type = getattr(settings, "JIRA_POSTMORTEM_ISSUE_TYPE", "Post-mortem")
    self.field_ids = getattr(
        settings,
        "JIRA_POSTMORTEM_FIELDS",
        {
            "incident_summary": "customfield_12699",
            "timeline": "customfield_12700",
            "root_causes": "customfield_12701",
            "impact": "customfield_12702",
            "mitigation_actions": "customfield_12703",
            "incident_category": "customfield_12369",
        },
    )

create_postmortem_for_incident ¤

create_postmortem_for_incident(incident: Incident, created_by: User | None = None) -> JiraPostMortem

Create a Jira post-mortem for an incident.

Parameters:

  • incident (Incident) –

    Incident to create post-mortem for

  • created_by (User | None, default: None ) –

    User creating the post-mortem

Returns:

Raises:

  • ValueError

    If incident already has a Jira post-mortem

  • JiraAPIError

    If Jira API call fails

Source code in src/firefighter/jira_app/service_postmortem.py
def create_postmortem_for_incident(
    self,
    incident: Incident,
    created_by: User | None = None,
) -> JiraPostMortem:
    """Create a Jira post-mortem for an incident.

    Args:
        incident: Incident to create post-mortem for
        created_by: User creating the post-mortem

    Returns:
        JiraPostMortem instance

    Raises:
        ValueError: If incident already has a Jira post-mortem
        JiraAPIError: If Jira API call fails
    """
    if hasattr(incident, "jira_postmortem_for"):
        error_msg = f"Incident #{incident.id} already has a Jira post-mortem"
        raise ValueError(error_msg)

    logger.info(f"Creating Jira post-mortem for incident #{incident.id}")

    # Prefetch incident updates and jira_ticket for timeline and parent link
    from firefighter.incidents.models.incident import Incident  # noqa: PLC0415

    incident = (
        Incident.objects.select_related("priority", "environment", "jira_ticket")
        .prefetch_related("incidentupdate_set")
        .get(pk=incident.pk)
    )

    # Generate content from templates
    fields = self._generate_issue_fields(incident)

    # Get parent issue key from RAID Jira ticket if available
    parent_issue_key = None
    if hasattr(incident, "jira_ticket") and incident.jira_ticket:
        parent_issue_key = incident.jira_ticket.key

    # Create Jira issue with optional parent link
    jira_issue = self.client.create_postmortem_issue(
        project_key=self.project_key,
        issue_type=self.issue_type,
        fields=fields,
        parent_issue_key=parent_issue_key,
    )

    # Assign to incident commander if available
    commander = (
        incident.roles_set.select_related("user__jira_user", "role_type")
        .filter(role_type__slug="commander")
        .first()
    )
    if commander:
        jira_user = getattr(commander.user, "jira_user", None)
        if jira_user is None:
            try:
                jira_user = self.client.get_jira_user_from_user(commander.user)
            except (JiraUserNotFoundError, JiraUserDatabaseError) as exc:
                logger.warning(
                    "Unable to fetch Jira user for commander %s: %s",
                    commander.user_id,
                    exc,
                )
        if jira_user is not None:
            assigned = self.client.assign_issue(
                issue_key=jira_issue["key"],
                account_id=jira_user.id,
            )
            if assigned:
                logger.info(
                    "Assigned post-mortem %s to commander %s",
                    jira_issue["key"],
                    commander.user.username,
                )

    # Create JiraPostMortem record
    jira_postmortem = JiraPostMortem.objects.create(
        incident=incident,
        jira_issue_key=jira_issue["key"],
        jira_issue_id=jira_issue["id"],
        created_by=created_by,
    )

    logger.info(
        f"Created Jira post-mortem {jira_postmortem.jira_issue_key} "
        f"for incident #{incident.id}"
    )

    return jira_postmortem

signals ¤

Modules:

Functions:

postmortem_created_handler ¤

postmortem_created_handler(
    sender: Any, incident: Incident, incident_update: IncidentUpdate, updated_fields: list[str], **kwargs: Never
) -> None

Handle post-mortem creation when incident reaches MITIGATED status.

This handler is registered in jira_app to ensure it works independently of Confluence being enabled. It creates post-mortems for both Confluence and Jira based on their respective feature flags.

Source code in src/firefighter/jira_app/signals/postmortem_created.py
@receiver(signal=incident_updated)
def postmortem_created_handler(
    sender: Any,
    incident: Incident,
    incident_update: IncidentUpdate,
    updated_fields: list[str],
    **kwargs: Never,
) -> None:
    """Handle post-mortem creation when incident reaches MITIGATED status.

    This handler is registered in jira_app to ensure it works independently
    of Confluence being enabled. It creates post-mortems for both Confluence
    and Jira based on their respective feature flags.
    """
    logger.debug(
        f"postmortem_created_handler called with sender={sender}, "
        f"incident_id={incident.id}, status={incident_update.status}, "
        f"updated_fields={updated_fields}"
    )

    if not apps.is_installed("firefighter.slack"):
        logger.error("Slack app is not installed. Skipping.")
        return

    # Import Slack tasks after apps are loaded
    from firefighter.slack.tasks.reminder_postmortem import (  # noqa: PLC0415
        publish_fixed_next_actions,
        publish_postmortem_reminder,
    )

    logger.debug(f"Checking sender: sender={sender}, type={type(sender)}")
    if sender != "update_status":
        logger.debug(f"Ignoring signal from sender={sender}")
        return

    logger.debug("Sender is update_status, checking postmortem conditions")

    # Check if we should create post-mortem(s)
    if (
        "_status" not in updated_fields
        or incident_update.status
        not in {IncidentStatus.MITIGATED, IncidentStatus.POST_MORTEM}
        or not incident.needs_postmortem
    ):
        logger.debug(
            f"Not creating post-mortem: _status in fields={('_status' in updated_fields)}, "
            f"status={incident_update.status}, needs_postmortem={incident.needs_postmortem}"
        )
        # For P3+ incidents, publish next actions reminder
        if (
            "_status" in updated_fields
            and incident_update.status
            in {IncidentStatus.MITIGATED, IncidentStatus.POST_MORTEM}
            and not incident.needs_postmortem
        ):
            publish_fixed_next_actions(incident)
        return

    logger.info(
        f"Creating post-mortem(s) for incident #{incident.id} "
        f"(status={incident_update.status}, needs_postmortem={incident.needs_postmortem})"
    )

    enable_confluence = getattr(settings, "ENABLE_CONFLUENCE", False)
    enable_jira_postmortem = getattr(settings, "ENABLE_JIRA_POSTMORTEM", False)

    confluence_pm = None
    jira_pm = None

    # Check and create Confluence post-mortem
    if enable_confluence:
        has_confluence = hasattr(incident, "postmortem_for")
        logger.debug(f"Confluence enabled, has_confluence={has_confluence}")
        if not has_confluence:
            confluence_manager = _get_confluence_postmortem_manager()
            if confluence_manager:
                logger.info(f"Creating Confluence post-mortem for incident #{incident.id}")
                try:
                    confluence_pm = confluence_manager._create_confluence_postmortem(  # noqa: SLF001
                        incident
                    )
                except Exception:
                    logger.exception(
                        f"Failed to create Confluence post-mortem for incident #{incident.id}"
                    )
        else:
            logger.debug(f"Confluence post-mortem already exists for incident #{incident.id}")

    # Check and create Jira post-mortem
    if enable_jira_postmortem:
        has_jira = hasattr(incident, "jira_postmortem_for")
        logger.debug(f"Jira post-mortem enabled, has_jira={has_jira}")
        if not has_jira:
            logger.info(f"Creating Jira post-mortem for incident #{incident.id}")
            try:
                jira_service = _get_jira_postmortem_service()
                jira_pm = jira_service.create_postmortem_for_incident(incident)
            except Exception:
                logger.exception(
                    f"Failed to create Jira post-mortem for incident #{incident.id}"
                )
        else:
            logger.debug(f"Jira post-mortem already exists for incident #{incident.id}")

    # Send signal if at least one post-mortem was created
    if confluence_pm or jira_pm:
        from firefighter.incidents.signals import postmortem_created  # noqa: PLC0415

        logger.info(
            f"Post-mortem(s) created for incident #{incident.id}: "
            f"confluence={confluence_pm is not None}, jira={jira_pm is not None}"
        )
        postmortem_created.send_robust(sender=__name__, incident=incident)

    # Publish reminder
    publish_postmortem_reminder(incident)

sync_key_events_to_jira_postmortem ¤

sync_key_events_to_jira_postmortem(sender: Any, incident: Incident, **kwargs: dict[str, Any]) -> None

Update Jira post-mortem timeline when key events are updated.

This handler is triggered when incident key events are updated via the web UI or Slack, and syncs the timeline to the associated Jira post-mortem ticket.

Parameters:

  • sender (Any) –

    The sender of the signal

  • incident (Incident) –

    The incident whose key events were updated

  • **kwargs (dict[str, Any], default: {} ) –

    Additional keyword arguments

Source code in src/firefighter/jira_app/signals/incident_key_events_updated.py
@receiver(signal=incident_key_events_updated)
def sync_key_events_to_jira_postmortem(
    sender: Any, incident: Incident, **kwargs: dict[str, Any]
) -> None:
    """Update Jira post-mortem timeline when key events are updated.

    This handler is triggered when incident key events are updated via the web UI
    or Slack, and syncs the timeline to the associated Jira post-mortem ticket.

    Args:
        sender: The sender of the signal
        incident: The incident whose key events were updated
        **kwargs: Additional keyword arguments
    """
    # Check if Jira post-mortem is enabled
    if not getattr(settings, "ENABLE_JIRA_POSTMORTEM", False):
        logger.debug("Jira post-mortem disabled, skipping timeline sync")
        return

    # Check if incident has a Jira post-mortem
    if not hasattr(incident, "jira_postmortem_for") or not incident.jira_postmortem_for:
        logger.debug(f"Incident #{incident.id} has no Jira post-mortem, skipping timeline sync")
        return

    jira_postmortem = incident.jira_postmortem_for
    logger.info(
        f"Syncing key events timeline to Jira post-mortem {jira_postmortem.jira_issue_key} "
        f"for incident #{incident.id}"
    )

    try:
        # Prefetch incident updates for timeline generation
        incident_refreshed = (
            IncidentModel.objects.select_related("priority", "environment")
            .prefetch_related("incidentupdate_set")
            .get(pk=incident.pk)
        )

        # Generate updated timeline from template
        timeline_content = render_to_string(
            "jira/postmortem/timeline.txt",
            {"incident": incident_refreshed},
        )

        # Get the field ID for timeline from service
        service = JiraPostMortemService()
        timeline_field_id = service.field_ids.get("timeline")

        if not timeline_field_id:
            logger.error("Timeline field ID not found in Jira post-mortem service configuration")
            return

        # Update the Jira ticket
        client = JiraClient()
        issue = client.jira.issue(jira_postmortem.jira_issue_key)
        issue.update(fields={timeline_field_id: timeline_content})

        logger.info(
            f"Successfully updated timeline in Jira post-mortem {jira_postmortem.jira_issue_key}"
        )

    except Exception:
        logger.exception(
            f"Failed to update timeline in Jira post-mortem {jira_postmortem.jira_issue_key} "
            f"for incident #{incident.id}"
        )

incident_key_events_updated ¤

Signal handlers for incident key events updates to sync with Jira post-mortem.

Functions:

sync_key_events_to_jira_postmortem ¤

sync_key_events_to_jira_postmortem(sender: Any, incident: Incident, **kwargs: dict[str, Any]) -> None

Update Jira post-mortem timeline when key events are updated.

This handler is triggered when incident key events are updated via the web UI or Slack, and syncs the timeline to the associated Jira post-mortem ticket.

Parameters:

  • sender (Any) –

    The sender of the signal

  • incident (Incident) –

    The incident whose key events were updated

  • **kwargs (dict[str, Any], default: {} ) –

    Additional keyword arguments

Source code in src/firefighter/jira_app/signals/incident_key_events_updated.py
@receiver(signal=incident_key_events_updated)
def sync_key_events_to_jira_postmortem(
    sender: Any, incident: Incident, **kwargs: dict[str, Any]
) -> None:
    """Update Jira post-mortem timeline when key events are updated.

    This handler is triggered when incident key events are updated via the web UI
    or Slack, and syncs the timeline to the associated Jira post-mortem ticket.

    Args:
        sender: The sender of the signal
        incident: The incident whose key events were updated
        **kwargs: Additional keyword arguments
    """
    # Check if Jira post-mortem is enabled
    if not getattr(settings, "ENABLE_JIRA_POSTMORTEM", False):
        logger.debug("Jira post-mortem disabled, skipping timeline sync")
        return

    # Check if incident has a Jira post-mortem
    if not hasattr(incident, "jira_postmortem_for") or not incident.jira_postmortem_for:
        logger.debug(f"Incident #{incident.id} has no Jira post-mortem, skipping timeline sync")
        return

    jira_postmortem = incident.jira_postmortem_for
    logger.info(
        f"Syncing key events timeline to Jira post-mortem {jira_postmortem.jira_issue_key} "
        f"for incident #{incident.id}"
    )

    try:
        # Prefetch incident updates for timeline generation
        incident_refreshed = (
            IncidentModel.objects.select_related("priority", "environment")
            .prefetch_related("incidentupdate_set")
            .get(pk=incident.pk)
        )

        # Generate updated timeline from template
        timeline_content = render_to_string(
            "jira/postmortem/timeline.txt",
            {"incident": incident_refreshed},
        )

        # Get the field ID for timeline from service
        service = JiraPostMortemService()
        timeline_field_id = service.field_ids.get("timeline")

        if not timeline_field_id:
            logger.error("Timeline field ID not found in Jira post-mortem service configuration")
            return

        # Update the Jira ticket
        client = JiraClient()
        issue = client.jira.issue(jira_postmortem.jira_issue_key)
        issue.update(fields={timeline_field_id: timeline_content})

        logger.info(
            f"Successfully updated timeline in Jira post-mortem {jira_postmortem.jira_issue_key}"
        )

    except Exception:
        logger.exception(
            f"Failed to update timeline in Jira post-mortem {jira_postmortem.jira_issue_key} "
            f"for incident #{incident.id}"
        )

postmortem_created ¤

Functions:

postmortem_created_handler ¤

postmortem_created_handler(
    sender: Any, incident: Incident, incident_update: IncidentUpdate, updated_fields: list[str], **kwargs: Never
) -> None

Handle post-mortem creation when incident reaches MITIGATED status.

This handler is registered in jira_app to ensure it works independently of Confluence being enabled. It creates post-mortems for both Confluence and Jira based on their respective feature flags.

Source code in src/firefighter/jira_app/signals/postmortem_created.py
@receiver(signal=incident_updated)
def postmortem_created_handler(
    sender: Any,
    incident: Incident,
    incident_update: IncidentUpdate,
    updated_fields: list[str],
    **kwargs: Never,
) -> None:
    """Handle post-mortem creation when incident reaches MITIGATED status.

    This handler is registered in jira_app to ensure it works independently
    of Confluence being enabled. It creates post-mortems for both Confluence
    and Jira based on their respective feature flags.
    """
    logger.debug(
        f"postmortem_created_handler called with sender={sender}, "
        f"incident_id={incident.id}, status={incident_update.status}, "
        f"updated_fields={updated_fields}"
    )

    if not apps.is_installed("firefighter.slack"):
        logger.error("Slack app is not installed. Skipping.")
        return

    # Import Slack tasks after apps are loaded
    from firefighter.slack.tasks.reminder_postmortem import (  # noqa: PLC0415
        publish_fixed_next_actions,
        publish_postmortem_reminder,
    )

    logger.debug(f"Checking sender: sender={sender}, type={type(sender)}")
    if sender != "update_status":
        logger.debug(f"Ignoring signal from sender={sender}")
        return

    logger.debug("Sender is update_status, checking postmortem conditions")

    # Check if we should create post-mortem(s)
    if (
        "_status" not in updated_fields
        or incident_update.status
        not in {IncidentStatus.MITIGATED, IncidentStatus.POST_MORTEM}
        or not incident.needs_postmortem
    ):
        logger.debug(
            f"Not creating post-mortem: _status in fields={('_status' in updated_fields)}, "
            f"status={incident_update.status}, needs_postmortem={incident.needs_postmortem}"
        )
        # For P3+ incidents, publish next actions reminder
        if (
            "_status" in updated_fields
            and incident_update.status
            in {IncidentStatus.MITIGATED, IncidentStatus.POST_MORTEM}
            and not incident.needs_postmortem
        ):
            publish_fixed_next_actions(incident)
        return

    logger.info(
        f"Creating post-mortem(s) for incident #{incident.id} "
        f"(status={incident_update.status}, needs_postmortem={incident.needs_postmortem})"
    )

    enable_confluence = getattr(settings, "ENABLE_CONFLUENCE", False)
    enable_jira_postmortem = getattr(settings, "ENABLE_JIRA_POSTMORTEM", False)

    confluence_pm = None
    jira_pm = None

    # Check and create Confluence post-mortem
    if enable_confluence:
        has_confluence = hasattr(incident, "postmortem_for")
        logger.debug(f"Confluence enabled, has_confluence={has_confluence}")
        if not has_confluence:
            confluence_manager = _get_confluence_postmortem_manager()
            if confluence_manager:
                logger.info(f"Creating Confluence post-mortem for incident #{incident.id}")
                try:
                    confluence_pm = confluence_manager._create_confluence_postmortem(  # noqa: SLF001
                        incident
                    )
                except Exception:
                    logger.exception(
                        f"Failed to create Confluence post-mortem for incident #{incident.id}"
                    )
        else:
            logger.debug(f"Confluence post-mortem already exists for incident #{incident.id}")

    # Check and create Jira post-mortem
    if enable_jira_postmortem:
        has_jira = hasattr(incident, "jira_postmortem_for")
        logger.debug(f"Jira post-mortem enabled, has_jira={has_jira}")
        if not has_jira:
            logger.info(f"Creating Jira post-mortem for incident #{incident.id}")
            try:
                jira_service = _get_jira_postmortem_service()
                jira_pm = jira_service.create_postmortem_for_incident(incident)
            except Exception:
                logger.exception(
                    f"Failed to create Jira post-mortem for incident #{incident.id}"
                )
        else:
            logger.debug(f"Jira post-mortem already exists for incident #{incident.id}")

    # Send signal if at least one post-mortem was created
    if confluence_pm or jira_pm:
        from firefighter.incidents.signals import postmortem_created  # noqa: PLC0415

        logger.info(
            f"Post-mortem(s) created for incident #{incident.id}: "
            f"confluence={confluence_pm is not None}, jira={jira_pm is not None}"
        )
        postmortem_created.send_robust(sender=__name__, incident=incident)

    # Publish reminder
    publish_postmortem_reminder(incident)

utils ¤

Functions:

  • pythonic_keys

    Converts camelCase keys in a dict or list of dict to snake_case. Works recursively.

pythonic_keys ¤

pythonic_keys(d: T) -> T

Converts camelCase keys in a dict or list of dict to snake_case. Works recursively.

Source code in src/firefighter/jira_app/utils.py
def pythonic_keys(d: T) -> T:
    """Converts camelCase keys in a dict or list of dict to snake_case. Works recursively."""
    if isinstance(d, dict):
        new_d = {}
        for key, value in d.items():
            new_key = _snake_case_key(key)
            new_d[new_key] = pythonic_keys(value)
        return new_d  # type: ignore[return-value]
    if isinstance(d, list):
        return [pythonic_keys(v) for v in d]  # type: ignore[return-value]
    return d