Files
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

71 lines
2.3 KiB
Python

"""Goal and budget schemas for AGI executive layer."""
from datetime import datetime, timezone
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, field_validator
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
class GoalStatus(str, Enum):
"""Goal lifecycle status."""
ACTIVE = "active"
ACHIEVED = "achieved"
ABANDONED = "abandoned"
BLOCKED = "blocked"
SUSPENDED = "suspended"
class GoalBudget(BaseModel):
"""Time and compute budget for a goal/task."""
time_seconds: float | None = Field(default=None, ge=0, description="Max wall-clock seconds")
compute_budget: float | None = Field(default=None, ge=0, description="Max compute units (e.g. token budget)")
started_at: datetime | None = Field(default=None, description="When execution started")
class Blocker(BaseModel):
"""Reason a task or goal is stuck."""
blocker_id: str = Field(..., min_length=1)
task_id: str = Field(..., min_length=1)
reason: str = Field(default="")
dependency_step_id: str | None = Field(default=None)
reported_at: datetime = Field(default_factory=_utc_now)
class Checkpoint(BaseModel):
"""Resumable point in task execution."""
checkpoint_id: str = Field(..., min_length=1)
task_id: str = Field(..., min_length=1)
step_ids_completed: list[str] = Field(default_factory=list)
state_snapshot: dict[str, Any] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=_utc_now)
class Goal(BaseModel):
"""Explicit goal with objectives, priorities, constraints, and budget."""
goal_id: str = Field(..., min_length=1)
objective: str = Field(..., min_length=1)
constraints: list[str] = Field(default_factory=list)
priority: int = Field(default=0, ge=0, le=10)
status: GoalStatus = Field(default=GoalStatus.ACTIVE)
budget: GoalBudget | None = Field(default=None)
metadata: dict[str, Any] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=_utc_now)
updated_at: datetime | None = Field(default=None)
@field_validator("goal_id", "objective")
@classmethod
def validate_non_whitespace(cls, v: str) -> str:
if not v.strip():
raise ValueError("Field cannot be empty or whitespace")
return v