Spaces:
Running
Running
| """ | |
| Real `smolagents` CodeAgent integration for the final project. | |
| This module initializes a `CodeAgent` with standard tools and exposes a | |
| `run_agent(prompt)` function returning the final answer string. | |
| Notes: | |
| - Requires `smolagents` installed in the environment. | |
| - For serverless inference via Hugging Face, you must be logged in | |
| (`huggingface-cli login`) or have `HF_TOKEN` environment variable set, | |
| and have sufficient provider credits. | |
| """ | |
| from typing import List, Any | |
| import logging | |
| import os | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| from smolagents import ( | |
| CodeAgent, | |
| InferenceClientModel, | |
| DuckDuckGoSearchTool, | |
| FinalAnswerTool, | |
| VisitWebpageTool, | |
| ) | |
| # Get HF token from environment (set by huggingface-cli login or HF_TOKEN env var) | |
| def _get_hf_token() -> str | None: | |
| """Get HF token from environment or huggingface_hub cache.""" | |
| token = os.environ.get("HF_TOKEN") | |
| if token: | |
| return token | |
| # Fallback: try huggingface_hub.get_token() if available | |
| try: | |
| from huggingface_hub import get_token | |
| token = get_token() | |
| if token: | |
| logger.info("β Using HF token from huggingface_hub cache") | |
| return token | |
| except ImportError: | |
| pass | |
| logger.warning( | |
| "β οΈ HF_TOKEN not found in environment. Run 'huggingface-cli login' or set HF_TOKEN env var. " | |
| "CodeAgent initialization will fail without valid credentials." | |
| ) | |
| return None | |
| HF_TOKEN = _get_hf_token() | |
| def make_code_agent( | |
| max_steps: int = 8, | |
| verbosity: int = 1, | |
| model_name: str | None = None, | |
| ): | |
| """Create and return a smolagents CodeAgent configured with standard tools. | |
| Args: | |
| max_steps: max reasoning steps for the agent. | |
| verbosity: logging/output verbosity level. | |
| model_name: HF model ID for serverless inference (e.g., 'allenai/Olmo-3-7B-Instruct'). | |
| If None, uses the default InferenceClientModel. | |
| Returns: | |
| CodeAgent instance. | |
| Raises: | |
| Exception: if InferenceClientModel initialization fails (missing HF login/credits). | |
| """ | |
| tools: List[Any] = [] | |
| # Standard tools from smolagents | |
| try: | |
| tools.append(DuckDuckGoSearchTool()) | |
| except Exception as e: | |
| logger.debug("DuckDuckGoSearchTool unavailable: %s", e) | |
| try: | |
| tools.append(VisitWebpageTool()) | |
| logger.info("Using smolagents.VisitWebpageTool for webpage visits") | |
| except Exception as e: | |
| logger.debug("VisitWebpageTool unavailable: %s", e) | |
| try: | |
| tools.append(FinalAnswerTool()) | |
| except Exception as e: | |
| logger.debug("FinalAnswerTool unavailable: %s", e) | |
| # Initialize serverless inference model | |
| try: | |
| if model_name: | |
| try: | |
| model = InferenceClientModel(model_id=model_name, token=HF_TOKEN) | |
| logger.info(f"InferenceClientModel initialized with custom model: {model_name}") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize custom model {model_name}: {e}. Falling back to OLMo-Instruct") | |
| try: | |
| model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Instruct', token=HF_TOKEN) | |
| logger.info("InferenceClientModel initialized with fallback model: allenai/Olmo-3-7B-Instruct") | |
| except Exception as e2: | |
| logger.warning(f"Failed to initialize OLMo-Instruct: {e2}. Falling back to default model") | |
| model = InferenceClientModel(token=HF_TOKEN) | |
| logger.info("InferenceClientModel initialized with default model") | |
| else: | |
| # Default to OLMo-Think model | |
| try: | |
| model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Think', token=HF_TOKEN) | |
| logger.info("InferenceClientModel initialized with default model: allenai/Olmo-3-7B-Think") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize OLMo-Think: {e}. Falling back to OLMo-Instruct") | |
| try: | |
| model = InferenceClientModel(model_id='allenai/Olmo-3-7B-Instruct', token=HF_TOKEN) | |
| logger.info("InferenceClientModel initialized with fallback model: allenai/Olmo-3-7B-Instruct") | |
| except Exception as e2: | |
| logger.warning(f"Failed to initialize OLMo-Instruct: {e2}. Using default model") | |
| model = InferenceClientModel(token=HF_TOKEN) | |
| logger.info("InferenceClientModel initialized with default model") | |
| except Exception as e: | |
| logger.error( | |
| "InferenceClientModel initialization failed (ensure HF_TOKEN is set and has credits): %s", e | |
| ) | |
| raise | |
| agent = CodeAgent(tools=tools, model=model, max_steps=max_steps, verbosity_level=verbosity) | |
| return agent | |
| def _extract_clean_answer(text: str) -> str: | |
| """Clean agent output by removing markdown, code blocks, and formatting. | |
| Handles: | |
| - Code fences (```python ... ```) | |
| - Code keywords (import, def, class, from) | |
| - Markdown formatting (*#-`) | |
| - Excess whitespace | |
| Args: | |
| text: raw agent output (may contain code, markdown) | |
| Returns: | |
| Clean plain-text answer string. | |
| """ | |
| lines = text.strip().split('\n') | |
| cleaned_lines = [] | |
| in_code_block = False | |
| for line in lines: | |
| stripped = line.strip() | |
| # Track code blocks | |
| if '```' in stripped: | |
| in_code_block = not in_code_block | |
| continue | |
| # Skip code blocks | |
| if in_code_block: | |
| continue | |
| # Skip lines that look like code | |
| if stripped.startswith(('import ', 'from ', 'def ', 'class ', '>>>')): | |
| continue | |
| # Skip empty lines at boundaries | |
| if stripped: | |
| cleaned_lines.append(stripped) | |
| # Join and clean markdown formatting | |
| answer = ' '.join(cleaned_lines) | |
| # Remove markdown bold/italic/strikethrough | |
| for char in ('*', '**', '_', '~~'): | |
| answer = answer.replace(char, '') | |
| return answer.strip() | |
| _AGENT_SINGLETON = None | |
| def get_agent(): | |
| """Get or create the singleton CodeAgent instance.""" | |
| global _AGENT_SINGLETON | |
| if _AGENT_SINGLETON is None: | |
| logger.info("β³ Initializing CodeAgent singleton with model: allenai/Olmo-3-7B-Think") | |
| try: | |
| _AGENT_SINGLETON = make_code_agent(model_name="allenai/Olmo-3-7B-Think") | |
| logger.info("β CodeAgent singleton initialized successfully") | |
| except Exception as e: | |
| logger.error("β Failed to initialize CodeAgent: %s", e) | |
| raise | |
| return _AGENT_SINGLETON | |
| def run_agent(prompt: str) -> str: | |
| """Run the CodeAgent and return the final answer string. | |
| Args: | |
| prompt: the reasoning task/question for the agent. | |
| Returns: | |
| The agent's final answer as a string (plain-text, no code/markdown). | |
| Raises: | |
| Exception: if CodeAgent.run fails (e.g., no HF credentials or credits). | |
| """ | |
| agent = get_agent() | |
| # System-level instruction to force plain-text output only | |
| system_instruction = ( | |
| "IMPORTANT: Your final answer MUST be plain text only. " | |
| "Do NOT produce code blocks (```), do NOT return Python code, and do NOT include explanations. " | |
| "Return ONLY the direct answer as a single line or paragraph of plain text." | |
| ) | |
| full_prompt = f"{system_instruction}\n\n{prompt}" | |
| try: | |
| res = agent.run(full_prompt) | |
| logger.debug("Agent raw response type: %s", type(res).__name__) | |
| except Exception as e: | |
| # Surface the error (likely HF auth/credits) | |
| logger.error("β CodeAgent.run failed: %s", e) | |
| raise | |
| # Handle dict-like responses | |
| if isinstance(res, dict): | |
| for key in ("answer", "final_answer", "final", "output"): | |
| if key in res and isinstance(res[key], str): | |
| answer = _extract_clean_answer(res[key]) | |
| logger.debug("Extracted answer from key '%s': %s", key, answer[:50]) | |
| return answer | |
| # Fallback: stringify | |
| return _extract_clean_answer(str(res)) | |
| # Handle string responses | |
| if isinstance(res, str): | |
| text = res.strip() | |
| # Check for code-like output | |
| has_code_fence = "```" in text | |
| has_code_keyword = any(text.lstrip().startswith(kw) for kw in ["import ", "def ", "class ", ">>>"]) | |
| looks_like_code = has_code_fence or has_code_keyword | |
| if looks_like_code: | |
| logger.warning("β οΈ Agent returned code-like output; retrying with stricter instruction") | |
| stricter = ( | |
| "FINAL INSTRUCTION: Return ONLY the plain-text answer. No code blocks, no code, no explanations. " | |
| "If the answer is a number, return only the number. If it's a date, return only the date. " | |
| "Do not explain your answer." | |
| ) | |
| retry_prompt = f"{stricter}\n\n{prompt}" | |
| try: | |
| retry_res = agent.run(retry_prompt) | |
| if isinstance(retry_res, str): | |
| answer = _extract_clean_answer(retry_res) | |
| logger.info("β Retry successful, extracted clean answer") | |
| return answer | |
| if isinstance(retry_res, dict): | |
| for key in ("answer", "final_answer", "final", "output"): | |
| if key in retry_res and isinstance(retry_res[key], str): | |
| answer = _extract_clean_answer(retry_res[key]) | |
| return answer | |
| return _extract_clean_answer(str(retry_res)) | |
| except Exception as e: | |
| logger.error("β Retry failed: %s", e) | |
| # Fallthrough to clean original response | |
| # Clean and return | |
| answer = _extract_clean_answer(text) | |
| logger.debug("β Final answer (cleaned): %s", answer[:80]) | |
| return answer | |
| # Fallback for other types | |
| return _extract_clean_answer(str(res)) | |
| if __name__ == "__main__": | |
| print(run_agent("Give me a short list of 3 fruits.")) | |