Skip to content

Monitoring

Functions for monitoring task health and handling failures.

API Reference

monitor

Task monitoring and orphan detection utilities.

This module provides functions for detecting and handling tasks whose worker processes have died unexpectedly, as well as handling task timeouts and subprocess failures.

detect_orphaned_tasks() -> None

Detect and mark orphaned tasks as failed.

Scans all tasks with status PROGRESS and checks if their worker process (identified by worker_pid) is still running. If the process is dead, marks the task as FAILED and fires the on_failure signal.

This function is called periodically by the task_worker command to clean up tasks whose workers crashed unexpectedly.

Note

Uses select_for_update(skip_locked=True) to avoid blocking other workers. If the PID exists but belongs to a different user (PermissionError), the worker is assumed to still be alive.

Source code in django_simple_queue/monitor.py
def detect_orphaned_tasks() -> None:
    """
    Detect and mark orphaned tasks as failed.

    Scans all tasks with status PROGRESS and checks if their worker process
    (identified by worker_pid) is still running. If the process is dead,
    marks the task as FAILED and fires the on_failure signal.

    This function is called periodically by the task_worker command to clean
    up tasks whose workers crashed unexpectedly.

    Note:
        Uses ``select_for_update(skip_locked=True)`` to avoid blocking other workers.
        If the PID exists but belongs to a different user (PermissionError),
        the worker is assumed to still be alive.
    """
    with transaction.atomic():
        in_progress = Task.objects.select_for_update(skip_locked=True).filter(
            status=Task.PROGRESS, worker_pid__isnull=False
        )
        for task in in_progress:
            try:
                os.kill(task.worker_pid, 0)
            except ProcessLookupError:
                task.error = (task.error or "") + (
                    f"\nTask failed: worker process (PID {task.worker_pid}) no longer running"
                )
                task.status = Task.FAILED
                task.worker_pid = None
                task.save(
                    update_fields=["status", "error", "worker_pid", "modified"]
                )
                signals.on_failure.send(sender=Task, task=task, error=None)
            except PermissionError:
                pass  # PID exists, different user — worker is alive

handle_subprocess_exit(task_id: uuid.UUID, exit_code: int | None) -> None

Handle a task subprocess that exited with a non-zero code.

Called by the task_worker after the subprocess finishes. If the exit code indicates failure, marks the task as FAILED and fires the on_failure signal.

Parameters:

Name Type Description Default
task_id UUID

UUID of the task that was being executed.

required
exit_code int | None

The subprocess exit code (None or 0 means success).

required
Source code in django_simple_queue/monitor.py
def handle_subprocess_exit(task_id: uuid.UUID, exit_code: int | None) -> None:
    """
    Handle a task subprocess that exited with a non-zero code.

    Called by the task_worker after the subprocess finishes. If the exit code
    indicates failure, marks the task as FAILED and fires the on_failure signal.

    Args:
        task_id: UUID of the task that was being executed.
        exit_code: The subprocess exit code (None or 0 means success).
    """
    if exit_code is None or exit_code == 0:
        return
    task = Task.objects.get(id=task_id)
    if task.status == Task.PROGRESS:
        task.error = (task.error or "") + f"\nWorker subprocess exited with code {exit_code}"
        task.status = Task.FAILED
        task.worker_pid = None
        task.save(update_fields=["status", "error", "worker_pid", "modified"])
        signals.on_failure.send(sender=Task, task=task, error=None)

handle_task_timeout(task_id: uuid.UUID, timeout_seconds: int) -> None

Mark a task as failed due to exceeding the timeout.

Called by the task_worker when a subprocess doesn't complete within the configured timeout. Marks the task as FAILED and fires the on_failure signal with a TimeoutError.

Parameters:

Name Type Description Default
task_id UUID

UUID of the task that timed out.

required
timeout_seconds int

The timeout value that was exceeded.

required
Source code in django_simple_queue/monitor.py
def handle_task_timeout(task_id: uuid.UUID, timeout_seconds: int) -> None:
    """
    Mark a task as failed due to exceeding the timeout.

    Called by the task_worker when a subprocess doesn't complete within the
    configured timeout. Marks the task as FAILED and fires the on_failure
    signal with a TimeoutError.

    Args:
        task_id: UUID of the task that timed out.
        timeout_seconds: The timeout value that was exceeded.
    """
    task = Task.objects.get(id=task_id)
    if task.status == Task.PROGRESS:
        task.error = (task.error or "") + (
            f"\nTask timed out after {timeout_seconds} seconds"
        )
        task.status = Task.FAILED
        task.worker_pid = None
        task.save(update_fields=["status", "error", "worker_pid", "modified"])
        signals.on_failure.send(sender=Task, task=task, error=TimeoutError(
            f"Task exceeded {timeout_seconds}s timeout"
        ))

Functions

detect_orphaned_tasks()

Scans for tasks whose worker processes have died and marks them as failed.

This function is called automatically by the task worker on each polling cycle. It:

  1. Finds all tasks with status PROGRESS
  2. Checks if each task's worker_pid is still alive
  3. If the process is dead, marks the task as FAILED
  4. Fires the on_failure signal
from django_simple_queue.monitor import detect_orphaned_tasks

# Usually called by the worker, but can be called manually
detect_orphaned_tasks()

handle_subprocess_exit(task_id, exit_code)

Handles non-zero subprocess exit codes.

Called by the worker when a task subprocess exits with a non-zero code but didn't raise an exception (e.g., killed by signal).

from django_simple_queue.monitor import handle_subprocess_exit

# Called internally by the worker
handle_subprocess_exit(task_id, exit_code=1)

handle_task_timeout(task_id, timeout_seconds)

Marks a task as failed due to timeout.

Called by the worker when a task exceeds DJANGO_SIMPLE_QUEUE_TASK_TIMEOUT.

from django_simple_queue.monitor import handle_task_timeout

# Called internally by the worker
handle_task_timeout(task_id, timeout_seconds=300)

How Orphan Detection Works

Worker Process A                  Worker Process B
     │                                  │
     │ Claims Task T1                   │
     │ Sets status=PROGRESS             │
     │ Sets worker_pid=A                │
     │                                  │
     │ Starts executing...              │
     │                                  │
     X Worker A crashes!                │
                                        │ Polls for tasks...
                                        │ Calls detect_orphaned_tasks()
                                        │ Finds T1 with status=PROGRESS
                                        │ Checks: is PID A alive?
                                        │ os.kill(A, 0) → ProcessLookupError
                                        │ Marks T1 as FAILED
                                        │ Fires on_failure signal

Failure Messages

Scenario Error Message
Worker crash "Task failed: worker process (PID X) no longer running"
Timeout "Task timed out after X seconds"
Non-zero exit "Worker subprocess exited with code X"

Monitoring in Production

Check for Orphaned Tasks

from django_simple_queue.models import Task

# Tasks that might be orphaned (in progress for too long)
from django.utils import timezone
from datetime import timedelta

stale = Task.objects.filter(
    status=Task.PROGRESS,
    modified__lt=timezone.now() - timedelta(hours=1)
)
for task in stale:
    print(f"Possibly orphaned: {task.id} (PID: {task.worker_pid})")

Cleanup Script

from django_simple_queue.monitor import detect_orphaned_tasks

# Run periodically as a cron job or management command
detect_orphaned_tasks()
print("Orphan detection complete")

See the Task Lifecycle guide for more on failure handling.