huggingface_ai_final / code_agent.py
alfulanny's picture
Update code_agent.py
037bacf verified
"""
Real `smolagents` CodeAgent integration for the final project.
This module initializes a `CodeAgent` with standard tools and exposes a
`run_agent(prompt)` function returning the final answer string.
Notes:
- Requires `smolagents` installed in the environment.
- For serverless inference via Hugging Face, you must be logged in
(`huggingface-cli login`) or have `HF_TOKEN` environment variable set,
and have sufficient provider credits.
"""
from typing import List, Any
import logging
import os
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
from smolagents import (
CodeAgent,
InferenceClientModel,
DuckDuckGoSearchTool,
FinalAnswerTool,
VisitWebpageTool,
)
# Get HF token from environment (set by huggingface-cli login or HF_TOKEN env var)
def _get_hf_token() -> str | None:
"""Get HF token from environment or huggingface_hub cache."""
token = os.environ.get("HF_TOKEN")
if token:
return token
# Fallback: try huggingface_hub.get_token() if available
try:
from huggingface_hub import get_token
token = get_token()
if token:
logger.info("βœ“ Using HF token from huggingface_hub cache")
return token
except ImportError:
pass
logger.warning(
"⚠️ HF_TOKEN not found in environment. Run 'huggingface-cli login' or set HF_TOKEN env var. "
"CodeAgent initialization will fail without valid credentials."
)
return None
HF_TOKEN = _get_hf_token()
def make_code_agent(
max_steps: int = 8,
verbosity: int = 1,
model_name: str | None = None,
):
"""Create and return a smolagents CodeAgent configured with standard tools.
Args:
max_steps: max reasoning steps for the agent.
verbosity: logging/output verbosity level.
model_name: HF model ID for serverless inference (e.g., 'allenai/Olmo-3-7B-Instruct').
If None, uses the default InferenceClientModel.
Returns:
CodeAgent instance.
Raises:
Exception: if InferenceClientModel initialization fails (missing HF login/credits).
"""
tools: List[Any] = []
# Standard tools from smolagents
try:
tools.append(DuckDuckGoSearchTool())
except Exception as e:
logger.debug("DuckDuckGoSearchTool unavailable: %s", e)
try:
tools.append(VisitWebpageTool())
logger.info("Using smolagents.VisitWebpageTool for webpage visits")
except Exception as e:
logger.debug("VisitWebpageTool unavailable: %s", e)
try:
tools.append(FinalAnswerTool())
except Exception as e:
logger.debug("FinalAnswerTool unavailable: %s", e)
# Initialize serverless inference model
try:
if model_name:
try:
model = InferenceClientModel(model_id=model_name, token=HF_TOKEN)
logger.info(f"InferenceClientModel initialized with custom model: {model_name}")
except Exception as e:
logger.warning(f"Failed to initialize custom model {model_name}: {e}. Falling back to OLMo-Instruct")
try:
model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Instruct', token=HF_TOKEN)
logger.info("InferenceClientModel initialized with fallback model: allenai/Olmo-3-7B-Instruct")
except Exception as e2:
logger.warning(f"Failed to initialize OLMo-Instruct: {e2}. Falling back to default model")
model = InferenceClientModel(token=HF_TOKEN)
logger.info("InferenceClientModel initialized with default model")
else:
# Default to OLMo-Think model
try:
model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Think', token=HF_TOKEN)
logger.info("InferenceClientModel initialized with default model: allenai/Olmo-3-7B-Think")
except Exception as e:
logger.warning(f"Failed to initialize OLMo-Think: {e}. Falling back to OLMo-Instruct")
try:
model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Instruct', token=HF_TOKEN)
logger.info("InferenceClientModel initialized with fallback model: allenai/Olmo-3-7B-Instruct")
except Exception as e2:
logger.warning(f"Failed to initialize OLMo-Instruct: {e2}. Using default model")
model = InferenceClientModel(token=HF_TOKEN)
logger.info("InferenceClientModel initialized with default model")
except Exception as e:
logger.error(
"InferenceClientModel initialization failed (ensure HF_TOKEN is set and has credits): %s", e
)
raise
agent = CodeAgent(tools=tools, model=model, max_steps=max_steps, verbosity_level=verbosity)
return agent
def _extract_clean_answer(text: str) -> str:
"""Clean agent output by removing markdown, code blocks, and formatting.
Handles:
- Code fences (```python ... ```)
- Code keywords (import, def, class, from)
- Markdown formatting (*#-`)
- Excess whitespace
Args:
text: raw agent output (may contain code, markdown)
Returns:
Clean plain-text answer string.
"""
lines = text.strip().split('\n')
cleaned_lines = []
in_code_block = False
for line in lines:
stripped = line.strip()
# Track code blocks
if '```' in stripped:
in_code_block = not in_code_block
continue
# Skip code blocks
if in_code_block:
continue
# Skip lines that look like code
if stripped.startswith(('import ', 'from ', 'def ', 'class ', '>>>')):
continue
# Skip empty lines at boundaries
if stripped:
cleaned_lines.append(stripped)
# Join and clean markdown formatting
answer = ' '.join(cleaned_lines)
# Remove markdown bold/italic/strikethrough
for char in ('*', '**', '_', '~~'):
answer = answer.replace(char, '')
return answer.strip()
_AGENT_SINGLETON = None
def get_agent():
"""Get or create the singleton CodeAgent instance."""
global _AGENT_SINGLETON
if _AGENT_SINGLETON is None:
logger.info("⏳ Initializing CodeAgent singleton with model: allenai/Olmo-3-7B-Think")
try:
_AGENT_SINGLETON = make_code_agent(model_name="allenai/Olmo-3-7B-Think")
logger.info("βœ“ CodeAgent singleton initialized successfully")
except Exception as e:
logger.error("❌ Failed to initialize CodeAgent: %s", e)
raise
return _AGENT_SINGLETON
def run_agent(prompt: str) -> str:
"""Run the CodeAgent and return the final answer string.
Args:
prompt: the reasoning task/question for the agent.
Returns:
The agent's final answer as a string (plain-text, no code/markdown).
Raises:
Exception: if CodeAgent.run fails (e.g., no HF credentials or credits).
"""
agent = get_agent()
# System-level instruction to force plain-text output only
system_instruction = (
"IMPORTANT: Your final answer MUST be plain text only. "
"Do NOT produce code blocks (```), do NOT return Python code, and do NOT include explanations. "
"Return ONLY the direct answer as a single line or paragraph of plain text."
)
full_prompt = f"{system_instruction}\n\n{prompt}"
try:
res = agent.run(full_prompt)
logger.debug("Agent raw response type: %s", type(res).__name__)
except Exception as e:
# Surface the error (likely HF auth/credits)
logger.error("❌ CodeAgent.run failed: %s", e)
raise
# Handle dict-like responses
if isinstance(res, dict):
for key in ("answer", "final_answer", "final", "output"):
if key in res and isinstance(res[key], str):
answer = _extract_clean_answer(res[key])
logger.debug("Extracted answer from key '%s': %s", key, answer[:50])
return answer
# Fallback: stringify
return _extract_clean_answer(str(res))
# Handle string responses
if isinstance(res, str):
text = res.strip()
# Check for code-like output
has_code_fence = "```" in text
has_code_keyword = any(text.lstrip().startswith(kw) for kw in ["import ", "def ", "class ", ">>>"])
looks_like_code = has_code_fence or has_code_keyword
if looks_like_code:
logger.warning("⚠️ Agent returned code-like output; retrying with stricter instruction")
stricter = (
"FINAL INSTRUCTION: Return ONLY the plain-text answer. No code blocks, no code, no explanations. "
"If the answer is a number, return only the number. If it's a date, return only the date. "
"Do not explain your answer."
)
retry_prompt = f"{stricter}\n\n{prompt}"
try:
retry_res = agent.run(retry_prompt)
if isinstance(retry_res, str):
answer = _extract_clean_answer(retry_res)
logger.info("βœ“ Retry successful, extracted clean answer")
return answer
if isinstance(retry_res, dict):
for key in ("answer", "final_answer", "final", "output"):
if key in retry_res and isinstance(retry_res[key], str):
answer = _extract_clean_answer(retry_res[key])
return answer
return _extract_clean_answer(str(retry_res))
except Exception as e:
logger.error("❌ Retry failed: %s", e)
# Fallthrough to clean original response
# Clean and return
answer = _extract_clean_answer(text)
logger.debug("βœ“ Final answer (cleaned): %s", answer[:80])
return answer
# Fallback for other types
return _extract_clean_answer(str(res))
if __name__ == "__main__":
print(run_agent("Give me a short list of 3 fruits."))