import requests from crewai.tools import BaseTool from typing import Type, List from pydantic import BaseModel, Field import streamlit as st # --- 1. IPInfo Geo Lookup Tool --- class IPInfoToolInput(BaseModel): """Input schema for IPInfo Geo Lookup Tool.""" target: List[str] = Field( ..., description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query." ) class IPInfoGeoLookup(BaseTool): # See https://ipinfo.io/developers/lite-api name: str = "IPInfo Geo Lookup" description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset." args_schema: Type[BaseModel] = IPInfoToolInput def _run(self, target: List[str]) -> str: api_key = st.session_state['ipinfo_api_key'] if not api_key: return "Error: La variable de entorno IPINFO_APIKEY no está configurada." results = [] for ip_address in target: try: url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}" response = requests.get(url) response.raise_for_status() data = response.json() # Formateamos la salida para que sea legible y útil para el agente result_str = ( f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}" ) results.append(result_str) except requests.exceptions.RequestException as e: results.append(f"Error al consultar la IP {ip_address}: {e}") return "\n".join(results) # --- 2. VirusTotal Scanner Tool --- class VirusTotalToolInput(BaseModel): """Input schema for VirusTotal Scanner Tool.""" resources: List[str] = Field( ..., description="A list of resources to scan. Valid resources include file hash (MD5, SHA256), a URL, a domain, or an IP address." ) class VirusTotalScanner(BaseTool): # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html # See https://docs.virustotal.com/reference/ip-info name: str = "VirusTotal Scanner" description: str = "Analyzes files, URLs, domains, or IP addresses for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report." args_schema: Type[BaseModel] = VirusTotalToolInput def _run(self, resources: List[str]) -> str: # Placeholder logic for the actual VirusTotal API call print(f"DEBUG: Running VirusTotal Scan for {resources}") return f"Simulation: Resource {resources} analyzed. 0/90 engines flagged it as malicious. Reputation: Clean."