Files
FusionAGI/tests/test_task_queue.py

69 lines
1.5 KiB
Python
Raw Permalink Normal View History

Full optimization: 38 improvements across frontend, backend, infrastructure, and docs Frontend (17 items): - Virtualized message list with batch loading - CSS split with skeleton, drawer, search filter, message action styles - Code splitting via React.lazy + Suspense for Admin/Ethics/Settings pages - Skeleton loading components (Skeleton, SkeletonCard, SkeletonGrid) - Debounced search/filter component (SearchFilter) - Error boundary with fallback UI - Keyboard shortcuts (Ctrl+K search, Ctrl+Enter send, Escape dismiss) - Page transition animations (fade-in) - PWA support (manifest.json + service worker) - WebSocket auto-reconnect with exponential backoff (10 retries) - Chat history persistence to localStorage (500 msg limit) - Message edit/delete on hover - Copy-to-clipboard on code blocks - Mobile drawer (bottom-sheet for consensus panel) - File upload support - User preferences sync to backend Testing (8 items): - Component tests: Toast, Markdown, ChatMessage, Avatar, ErrorBoundary, Skeleton - Hook tests: useChatHistory - E2E smoke tests (5 tests) - Accessibility audit utility Backend (12 items): - Vector memory with cosine similarity search - TTS/STT adapter factory wiring - Geometry kernel with orphan detection - Tenant registry with CRUD operations - Response cache with TTL - Connection pool (async) - Background task queue - Health check endpoints (/health, /ready) - Request tracing middleware (X-Request-ID) - API key rotation mechanism - Environment-based config (settings.py) - API route documentation improvements Infrastructure (4 items): - Grafana dashboard template - Database migration system - Storybook configuration Documentation (3 items): - ADR-001: Advisory Governance Model - ADR-002: Twelve-Head Architecture - ADR-003: Consequence Engine 552 Python tests + 45 frontend tests passing, 0 ruff errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
2026-05-02 03:08:08 +00:00
"""Tests for background task queue."""
import asyncio
import pytest
from fusionagi.api.task_queue import BackgroundTaskQueue, TaskStatus
@pytest.fixture
def queue():
return BackgroundTaskQueue(max_concurrent=3)
@pytest.mark.asyncio
async def test_submit_and_complete(queue):
async def work():
await asyncio.sleep(0.01)
return 42
tid = queue.submit(work)
await asyncio.sleep(0.05)
result = queue.get_status(tid)
assert result is not None
assert result.status == TaskStatus.COMPLETED
assert result.result == 42
@pytest.mark.asyncio
async def test_failed_task(queue):
async def fail():
raise ValueError("boom")
tid = queue.submit(fail)
await asyncio.sleep(0.05)
result = queue.get_status(tid)
assert result is not None
assert result.status == TaskStatus.FAILED
assert "boom" in (result.error or "")
@pytest.mark.asyncio
async def test_list_tasks(queue):
async def noop():
pass
queue.submit(noop)
queue.submit(noop)
await asyncio.sleep(0.05)
tasks = queue.list_tasks()
assert len(tasks) == 2
@pytest.mark.asyncio
async def test_list_tasks_filtered(queue):
async def noop():
pass
queue.submit(noop)
await asyncio.sleep(0.05)
completed = queue.list_tasks(status=TaskStatus.COMPLETED)
assert len(completed) == 1
pending = queue.list_tasks(status=TaskStatus.PENDING)
assert len(pending) == 0
def test_nonexistent_task(queue):
assert queue.get_status("nonexistent") is None