Monitoring¶
Functions for monitoring task health and handling failures.
API Reference¶
monitor
¶
Task monitoring and orphan detection utilities.
This module provides functions for detecting and handling tasks whose worker processes have died unexpectedly, as well as handling task timeouts and subprocess failures.
detect_orphaned_tasks() -> None
¶
Detect and mark orphaned tasks as failed.
Scans all tasks with status PROGRESS and checks if their worker process (identified by worker_pid) is still running. If the process is dead, marks the task as FAILED and fires the on_failure signal.
This function is called periodically by the task_worker command to clean up tasks whose workers crashed unexpectedly.
Note
Uses select_for_update(skip_locked=True) to avoid blocking other workers.
If the PID exists but belongs to a different user (PermissionError),
the worker is assumed to still be alive.
Source code in django_simple_queue/monitor.py
handle_subprocess_exit(task_id: uuid.UUID, exit_code: int | None) -> None
¶
Handle a task subprocess that exited with a non-zero code.
Called by the task_worker after the subprocess finishes. If the exit code indicates failure, marks the task as FAILED and fires the on_failure signal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
UUID
|
UUID of the task that was being executed. |
required |
exit_code
|
int | None
|
The subprocess exit code (None or 0 means success). |
required |
Source code in django_simple_queue/monitor.py
handle_task_timeout(task_id: uuid.UUID, timeout_seconds: int) -> None
¶
Mark a task as failed due to exceeding the timeout.
Called by the task_worker when a subprocess doesn't complete within the configured timeout. Marks the task as FAILED and fires the on_failure signal with a TimeoutError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
UUID
|
UUID of the task that timed out. |
required |
timeout_seconds
|
int
|
The timeout value that was exceeded. |
required |
Source code in django_simple_queue/monitor.py
Functions¶
detect_orphaned_tasks()¶
Scans for tasks whose worker processes have died and marks them as failed.
This function is called automatically by the task worker on each polling cycle. It:
- Finds all tasks with status
PROGRESS - Checks if each task's
worker_pidis still alive - If the process is dead, marks the task as
FAILED - Fires the
on_failuresignal
from django_simple_queue.monitor import detect_orphaned_tasks
# Usually called by the worker, but can be called manually
detect_orphaned_tasks()
handle_subprocess_exit(task_id, exit_code)¶
Handles non-zero subprocess exit codes.
Called by the worker when a task subprocess exits with a non-zero code but didn't raise an exception (e.g., killed by signal).
from django_simple_queue.monitor import handle_subprocess_exit
# Called internally by the worker
handle_subprocess_exit(task_id, exit_code=1)
handle_task_timeout(task_id, timeout_seconds)¶
Marks a task as failed due to timeout.
Called by the worker when a task exceeds DJANGO_SIMPLE_QUEUE_TASK_TIMEOUT.
from django_simple_queue.monitor import handle_task_timeout
# Called internally by the worker
handle_task_timeout(task_id, timeout_seconds=300)
How Orphan Detection Works¶
Worker Process A Worker Process B
│ │
│ Claims Task T1 │
│ Sets status=PROGRESS │
│ Sets worker_pid=A │
│ │
│ Starts executing... │
│ │
X Worker A crashes! │
│
│ Polls for tasks...
│ Calls detect_orphaned_tasks()
│ Finds T1 with status=PROGRESS
│ Checks: is PID A alive?
│ os.kill(A, 0) → ProcessLookupError
│ Marks T1 as FAILED
│ Fires on_failure signal
Failure Messages¶
| Scenario | Error Message |
|---|---|
| Worker crash | "Task failed: worker process (PID X) no longer running" |
| Timeout | "Task timed out after X seconds" |
| Non-zero exit | "Worker subprocess exited with code X" |
Monitoring in Production¶
Check for Orphaned Tasks¶
from django_simple_queue.models import Task
# Tasks that might be orphaned (in progress for too long)
from django.utils import timezone
from datetime import timedelta
stale = Task.objects.filter(
status=Task.PROGRESS,
modified__lt=timezone.now() - timedelta(hours=1)
)
for task in stale:
print(f"Possibly orphaned: {task.id} (PID: {task.worker_pid})")
Cleanup Script¶
from django_simple_queue.monitor import detect_orphaned_tasks
# Run periodically as a cron job or management command
detect_orphaned_tasks()
print("Orphan detection complete")
See the Task Lifecycle guide for more on failure handling.