Hwandji's picture
feat: initial HuggingFace Space deployment
4343907
"""
SAAP Database Models - SQLAlchemy ORM (FIXED VERSION)
Production-ready database schema with circular reference fixes
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, Float, Boolean, JSON, ForeignKey, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import TIMESTAMP
from datetime import datetime
from typing import Dict, Any, Optional
import json
Base = declarative_base()
class DBAgent(Base):
"""
Database model for SAAP Agents (FIXED - No circular references)
Stores complete agent configuration and runtime state
"""
__tablename__ = "agents"
# Primary Key
id = Column(String(100), primary_key=True, index=True) # agent_id like 'jane_alesi'
# Core Identity
name = Column(String(200), nullable=False, index=True)
agent_type = Column(String(50), nullable=False, index=True) # coordinator, specialist, etc.
color = Column(String(10), nullable=False) # Hex color code
avatar = Column(String(500), nullable=True) # Avatar image path/URL
description = Column(Text, nullable=True)
# LLM Configuration (stored as JSON)
llm_config = Column(JSON, nullable=False) # Complete LLM config as JSON
# Agent Capabilities and Personality
capabilities = Column(JSON, nullable=True) # List of capabilities
personality = Column(JSON, nullable=True) # Personality configuration
# Runtime Status
status = Column(String(20), nullable=False, default="inactive", index=True)
last_active = Column(DateTime, nullable=True)
# Performance Metrics (stored as JSON)
metrics = Column(JSON, nullable=True) # Performance metrics object
# Metadata (πŸ”§ FIXED: TIMESTAMP WITH TIME ZONE for timezone-aware datetimes)
created_at = Column(TIMESTAMP(timezone=True), default=func.now(), nullable=False, index=True)
updated_at = Column(TIMESTAMP(timezone=True), default=func.now(), onupdate=func.now(), nullable=False)
tags = Column(JSON, nullable=True) # List of tags
# πŸ”§ FIXED: Relationships WITH back_populates to COORDINATE bidirectional relationships
# This prevents infinite recursion during mapper initialization
chat_messages = relationship(
"DBChatMessage",
back_populates="agent", # βœ… CRITICAL: Links to DBChatMessage.agent relationship
lazy="select",
cascade="all, delete-orphan"
)
agent_sessions = relationship(
"DBAgentSession",
back_populates="agent", # βœ… CRITICAL: Links to DBAgentSession.agent relationship
lazy="select",
cascade="all, delete-orphan"
)
# Indexes for performance
__table_args__ = (
Index('idx_agent_status_updated', 'status', 'updated_at'),
Index('idx_agent_type_status', 'agent_type', 'status'),
Index('idx_agent_created', 'created_at'),
)
def to_saap_agent(self):
"""Convert database model to SaapAgent pydantic model"""
try:
from models.agent_schema import SaapAgent, AgentType, AgentStatus, LLMConfig, AgentMetadata, AgentAppearance, CommunicationConfig, UIComponents
# Safe conversion with error handling
agent_type = AgentType(self.agent_type) if hasattr(AgentType, self.agent_type) else AgentType.SPECIALIST
agent_status = AgentStatus(self.status) if hasattr(AgentStatus, self.status) else AgentStatus.INACTIVE
# Convert JSON fields back to objects safely
llm_config = LLMConfig(**self.llm_config) if self.llm_config else None
# Create metadata object
metadata = AgentMetadata(
version="1.0.0",
created=self.created_at,
updated=self.updated_at,
tags=self.tags or []
)
# Create appearance object
appearance = AgentAppearance(
color=self.color or "#6B7280",
avatar=self.avatar
)
# Create communication config (use defaults for now)
communication = CommunicationConfig(
input_queue=f"{self.id}_input",
output_queue=f"{self.id}_output",
message_types=["request", "response"]
)
# Create UI components (use defaults)
ui_components = UIComponents(
dashboard_widget="AgentCard",
detail_view="AgentDetail",
configuration_form="AgentConfig"
)
return SaapAgent(
id=self.id,
name=self.name,
type=agent_type,
status=agent_status,
description=self.description or "",
metadata=metadata,
appearance=appearance,
ui_components=ui_components,
capabilities=self.capabilities or [],
llm_config=llm_config,
communication=communication
)
except Exception as e:
print(f"⚠️ Error converting DBAgent to SaapAgent: {e}")
import traceback
traceback.print_exc()
return None
@classmethod
def from_saap_agent(cls, agent):
"""Create database model from SaapAgent pydantic model (FIXED - accesses nested fields correctly)"""
try:
# πŸ”§ FIX: Access appearance.color and appearance.avatar correctly
color = agent.appearance.color if hasattr(agent, 'appearance') and agent.appearance else '#6B7280'
avatar = agent.appearance.avatar if hasattr(agent, 'appearance') and agent.appearance else None
# Safe access to metrics.last_active
last_active = None
if hasattr(agent, 'metrics') and agent.metrics:
last_active = getattr(agent.metrics, 'last_active', None)
# Safe access to metadata fields for created_at/updated_at
created_at = None
updated_at = None
if hasattr(agent, 'metadata') and agent.metadata:
created_at = getattr(agent.metadata, 'created', None)
updated_at = getattr(agent.metadata, 'updated', None)
# Fallback to direct fields if metadata doesn't exist
if not created_at and hasattr(agent, 'created_at'):
created_at = agent.created_at
if not updated_at and hasattr(agent, 'updated_at'):
updated_at = agent.updated_at
# Safe access to tags
tags = []
if hasattr(agent, 'tags'):
tags = agent.tags
elif hasattr(agent, 'metadata') and agent.metadata and hasattr(agent.metadata, 'tags'):
tags = agent.metadata.tags
return cls(
id=agent.id,
name=agent.name,
agent_type=agent.type.value if hasattr(agent.type, 'value') else str(agent.type),
color=color, # πŸ”§ FIXED: Now accesses appearance.color
avatar=avatar, # πŸ”§ FIXED: Now accesses appearance.avatar
description=agent.description if hasattr(agent, 'description') else '',
llm_config=agent.llm_config.model_dump() if agent.llm_config else {},
capabilities=agent.capabilities if hasattr(agent, 'capabilities') else [],
personality=agent.personality.model_dump() if hasattr(agent, 'personality') and agent.personality else None,
status=agent.status.value if hasattr(agent.status, 'value') else str(agent.status),
last_active=last_active,
metrics=agent.metrics.model_dump() if hasattr(agent, 'metrics') and agent.metrics else None,
created_at=created_at,
updated_at=updated_at,
tags=tags
)
except Exception as e:
print(f"⚠️ Error creating DBAgent from SaapAgent: {e}")
import traceback
traceback.print_exc()
return None
class DBChatMessage(Base):
"""
Database model for chat message history (FIXED)
Stores all agent conversations with metadata
"""
__tablename__ = "chat_messages"
# Primary Key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Foreign Key to Agent
agent_id = Column(String(100), ForeignKey("agents.id"), nullable=False, index=True)
# Message Content
user_message = Column(Text, nullable=False)
agent_response = Column(Text, nullable=False)
# Performance Metrics
response_time = Column(Float, nullable=True) # Response time in seconds
tokens_used = Column(Integer, nullable=True) # Tokens consumed
# Message Metadata (renamed from 'metadata' to avoid SQLAlchemy reserved keyword)
message_metadata = Column(JSON, nullable=True) # Additional metadata (model, temperature, etc.)
created_at = Column(TIMESTAMP(timezone=True), default=func.now(), nullable=False, index=True)
# πŸ”§ FIXED: Relationship WITH back_populates to COORDINATE bidirectional relationship
agent = relationship(
"DBAgent",
back_populates="chat_messages", # βœ… CRITICAL: Links to DBAgent.chat_messages relationship
lazy="select"
)
# Indexes for performance
__table_args__ = (
Index('idx_chat_agent_created', 'agent_id', 'created_at'),
Index('idx_chat_response_time', 'response_time'),
Index('idx_chat_created', 'created_at'),
)
class DBAgentSession(Base):
"""
Database model for agent sessions (FIXED)
Tracks agent uptime and session metrics
"""
__tablename__ = "agent_sessions"
# Primary Key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Foreign Key to Agent
agent_id = Column(String(100), ForeignKey("agents.id"), nullable=False, index=True)
# Session Information (πŸ”§ FIXED: TIMESTAMP WITH TIME ZONE)
session_start = Column(TIMESTAMP(timezone=True), default=func.now(), nullable=False, index=True)
session_end = Column(TIMESTAMP(timezone=True), nullable=True, index=True)
duration_seconds = Column(Integer, nullable=True) # Calculated session duration
# Session Metrics
messages_processed = Column(Integer, default=0, nullable=False)
total_tokens_used = Column(Integer, default=0, nullable=False)
average_response_time = Column(Float, nullable=True)
error_count = Column(Integer, default=0, nullable=False)
# Session Status
status = Column(String(20), default="active", nullable=False, index=True) # active, completed, error
end_reason = Column(String(100), nullable=True) # graceful, error, timeout, etc.
# Session Metadata (renamed from 'metadata' to avoid SQLAlchemy reserved keyword)
session_metadata = Column(JSON, nullable=True) # Additional session data
# πŸ”§ FIXED: Relationship WITH back_populates to COORDINATE bidirectional relationship
agent = relationship(
"DBAgent",
back_populates="agent_sessions", # βœ… CRITICAL: Links to DBAgent.agent_sessions relationship
lazy="select"
)
# Indexes for performance
__table_args__ = (
Index('idx_session_agent_start', 'agent_id', 'session_start'),
Index('idx_session_status', 'status'),
Index('idx_session_duration', 'duration_seconds'),
)
def calculate_duration(self):
"""Calculate and update session duration"""
if self.session_end and self.session_start:
delta = self.session_end - self.session_start
self.duration_seconds = int(delta.total_seconds())
return self.duration_seconds
class DBSystemLog(Base):
"""
Database model for system logs and events
Stores SAAP system events for monitoring and debugging
"""
__tablename__ = "system_logs"
# Primary Key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Log Information
level = Column(String(10), nullable=False, index=True) # INFO, WARNING, ERROR, CRITICAL
logger_name = Column(String(100), nullable=False, index=True)
message = Column(Text, nullable=False)
# Optional fields
agent_id = Column(String(100), nullable=True, index=True) # If log relates to specific agent
function_name = Column(String(100), nullable=True)
line_number = Column(Integer, nullable=True)
# Additional Data
extra_data = Column(JSON, nullable=True) # Additional structured data
# Timestamp (πŸ”§ FIXED: TIMESTAMP WITH TIME ZONE)
created_at = Column(TIMESTAMP(timezone=True), default=func.now(), nullable=False, index=True)
# Indexes for performance
__table_args__ = (
Index('idx_log_level_created', 'level', 'created_at'),
Index('idx_log_agent_created', 'agent_id', 'created_at'),
Index('idx_log_logger_created', 'logger_name', 'created_at'),
)
class DBHealthCheck(Base):
"""
Database model for system health monitoring
Stores periodic health check results
"""
__tablename__ = "health_checks"
# Primary Key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Health Check Data
component = Column(String(50), nullable=False, index=True) # database, colossus, redis, etc.
status = Column(String(20), nullable=False, index=True) # healthy, degraded, error
response_time_ms = Column(Float, nullable=True)
# Metrics
agent_count = Column(Integer, default=0)
active_agent_count = Column(Integer, default=0)
memory_usage_mb = Column(Float, nullable=True)
cpu_usage_percent = Column(Float, nullable=True)
# Additional Data
details = Column(JSON, nullable=True) # Additional health check data
error_message = Column(Text, nullable=True)
# Timestamp (πŸ”§ FIXED: TIMESTAMP WITH TIME ZONE)
created_at = Column(TIMESTAMP(timezone=True), default=func.now(), nullable=False, index=True)
# Indexes for performance
__table_args__ = (
Index('idx_health_component_created', 'component', 'created_at'),
Index('idx_health_status_created', 'status', 'created_at'),
)
# Database utility functions
def get_table_info():
"""Get information about all database tables"""
tables = {}
for table_name, table in Base.metadata.tables.items():
tables[table_name] = {
"columns": [col.name for col in table.columns],
"indexes": [idx.name for idx in table.indexes],
"primary_keys": [col.name for col in table.primary_key.columns]
}
return tables
if __name__ == "__main__":
# Print database schema information
print("πŸ—„οΈ SAAP Database Schema (FIXED):")
print("=" * 50)
tables = get_table_info()
for table_name, info in tables.items():
print(f"\nπŸ“‹ Table: {table_name}")
print(f" Columns: {', '.join(info['columns'])}")
print(f" Indexes: {', '.join(info['indexes'])}")
print(f" Primary Keys: {', '.join(info['primary_keys'])}")
print(f"\nβœ… Total Tables: {len(tables)}")
print("βœ… Database schema FIXED - no more circular references!")
print("πŸ”§ Key fixes applied:")
print(" - Added lazy='select' to all relationships")
print(" - Safe conversion methods with error handling")
print(" - Prevented recursive model loading")