vvillarreal-cfee commited on
Commit
b8d9e47
·
verified ·
1 Parent(s): 9ac0306

feat: Split CrewAI and logic in modules.

Browse files
Files changed (3) hide show
  1. src/app.py +33 -79
  2. src/custom_tools.py +60 -0
  3. src/incident_crew.py +84 -0
src/app.py CHANGED
@@ -1,5 +1,6 @@
1
  import streamlit as st
2
- from crewai import Agent, Task, Crew, Process, LLM
 
3
 
4
  # --- Interfaz de Usuario con Streamlit ---
5
  st.set_page_config(
@@ -11,38 +12,27 @@ st.set_page_config(
11
  if 'gemini_api_key' not in st.session_state:
12
  st.session_state['gemini_api_key'] = ''
13
 
14
- llm = LLM(
15
- model="gemini/gemini-2.0-flash",
16
- temperature=0.3
17
- )
18
-
19
- threat_hunter_agent = Agent(
20
- role="Security Threat Hunter",
21
- goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.",
22
- backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.",
23
- verbose=True,
24
- allow_delegation=False,
25
- llm=llm,
26
- )
27
-
28
- reporter_agent = Agent(
29
- role="Incident Response Document Specialist",
30
- goal="Generate a formal, professional, and comprehensive Incident Response Report based on the initial alert and the technical intelligence provided by the Threat Hunter.",
31
- backstory="An expert in cybersecurity documentation, focusing on clarity, structure, and adherence to industry-standard reporting formats. You transform raw data into polished, readable documents suitable for management and technical teams.",
32
- verbose=True,
33
- llm=llm,
34
- )
35
-
36
 
37
  st.title("📄 Generador IA de Informes Post-Mortem")
38
 
39
-
40
  # --- Sidebar para la configuración de la API Key ---
41
  with st.sidebar:
42
  st.header("⚙️ Instrucciones")
43
 
44
- st.write("1. 📄 Completa el formulario con los detalles de la incidencia (o pega el texto RAW y deja que la IA extraiga los campos)")
45
- st.write("2. 🚀 Presiona el botón y la IA generará un informe técnico estructurado post-mortem de tu incidente.")
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
 
48
  # --- Formulario de entrada de datos principal ---
@@ -81,67 +71,31 @@ with st.form("post_mortem_form"):
81
 
82
  # --- Lógica de procesamiento ---
83
  if submitted:
 
 
 
84
 
85
- if not all([tipo_alerta, sistema_afectado, fecha_hora, impacto_detalle, acciones_tomadas]):
86
  st.error("❌ Por favor, completa todos los campos del formulario para generar el informe.")
87
 
88
  else:
 
 
 
 
 
 
 
 
89
  with st.spinner("⏳ Generando informe técnico..."):
90
- task_ioc_extraction = Task(
91
- description=f"""
92
- Input: Initial raw incident alert details
93
- {impacto_detalle}
94
- Process:
95
- 1. Extract 10 observable Indicators of Compromise (IOCs) including IPs, domains, file hashes (SHA256, MD5), and URLs from the input.
96
- 2. For each extracted IOC, simulate querying external intelligence services (like VirusTotal, IPInfo, etc.).
97
- 3. Synthesize the findings into a clear, structured intelligence summary.
98
- Output Requirements: The output MUST be a JSON-like or clearly delimited text block, detailing each IOC, its type, and a summary of the associated risk/reputation found (e.g., "Malicious/Known C2," "Clean," "High Reputation," "Related to Phishing Campaign X"). This summary is the ONLY content that should be passed to the next agent.
99
- """,
100
- expected_output="A structured summary of technical findings including IPs, hashes, and domains, and their corresponding threat intelligence status, ready for the final report.",
101
- agent=threat_hunter_agent,
102
- )
103
-
104
- task_report_drafting = Task(
105
- description=f"""
106
- Input:
107
- 1. The original user input:
108
- - **Tipo de Alerta**: {tipo_alerta}
109
- - **Sistema Afectado**: {sistema_afectado}
110
- - **Fecha y Hora**: {fecha_hora}
111
- - **Detalles del Incidente**: {impacto_detalle}
112
- - **Acciones Tomadas (Mitigación Inmediata)**: {acciones_tomadas}
113
- 2. The structured technical intelligence summary provided by the Threat Hunter.
114
-
115
- Process: Draft a complete, professional Incident Response Report.
116
-
117
- Output Requirements: The report MUST be formatted in Markdown and include the following mandatory sections:
118
- 1. Executive Summary (A brief, high-level overview).
119
- 2. Initial Incident Details (Source, Time, Initial Observation).
120
- 3. Technical Analysis & Indicators of Compromise (IOCs) (A detailed section incorporating ALL findings from the Threat Hunter agent information context).
121
- 4. Impact Assessment (Potential or confirmed impact on systems/data).
122
- 5. Mitigation or remediation steps taken (if any).
123
-
124
- The entire final output must be this Markdown report.
125
- """,
126
- expected_output="A complete Incident Response Report in Spanish language. Formatted as markdown without '```'",
127
- agent=reporter_agent,
128
- markdown=True,
129
- context=[task_ioc_extraction] # Esta tarea espera el resultado de la anterior
130
- )
131
-
132
- incident_crew = Crew(
133
- agents=[threat_hunter_agent, reporter_agent],
134
- tasks=[task_ioc_extraction, task_report_drafting],
135
- process=Process.sequential, # Ejecución secuencial: Threat Hunter -> Reporter
136
- verbose=True # Nivel de detalle de la ejecución
137
- )
138
-
139
- final_report = incident_crew.kickoff()
140
 
141
  # Mostrar el resultado
142
  st.divider()
143
- st.success("✅ Informe Post-Mortem Generado")
144
- st.code(incident_crew.usage_metrics)
145
  st.markdown(final_report)
146
  st.download_button(
147
  label="Descargar Informe (.md)",
 
1
  import streamlit as st
2
+ from incident_crew import IncidentReporterCrew
3
+
4
 
5
  # --- Interfaz de Usuario con Streamlit ---
6
  st.set_page_config(
 
12
  if 'gemini_api_key' not in st.session_state:
13
  st.session_state['gemini_api_key'] = ''
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  st.title("📄 Generador IA de Informes Post-Mortem")
17
 
 
18
  # --- Sidebar para la configuración de la API Key ---
19
  with st.sidebar:
20
  st.header("⚙️ Instrucciones")
21
 
22
+ # Campo de entrada para la API Key
23
+ st.session_state['gemini_api_key'] = st.text_input(
24
+ "1. 🔑 Ingresa tu API Key de Gemini",
25
+ type="password",
26
+ value=st.session_state['gemini_api_key']
27
+ )
28
+
29
+ if st.session_state['gemini_api_key']:
30
+ st.success("API Key cargada. ¡Puedes generar el informe!")
31
+ else:
32
+ st.warning("🚨 Por favor, ingresa tu API Key de Gemini.")
33
+
34
+ st.write("2. 📄 Completa el formulario con los detalles de la incidencia")
35
+ st.write("3. 🚀 Presiona el botón y la IA generará un informe técnico estructurado post-mortem de tu incidente.")
36
 
37
 
38
  # --- Formulario de entrada de datos principal ---
 
71
 
72
  # --- Lógica de procesamiento ---
73
  if submitted:
74
+
75
+ if not st.session_state.get('gemini_api_key'):
76
+ st.error("❌ Por favor, ingresa tu API Key de Gemini en el panel lateral (sidebar) antes de continuar.")
77
 
78
+ elif not all([tipo_alerta, sistema_afectado, fecha_hora, impacto_detalle, acciones_tomadas]):
79
  st.error("❌ Por favor, completa todos los campos del formulario para generar el informe.")
80
 
81
  else:
82
+ inputs = {
83
+ "tipo_alerta": tipo_alerta,
84
+ "sistema_afectado": sistema_afectado,
85
+ "fecha_hora": fecha_hora,
86
+ "impacto_detalle": impacto_detalle,
87
+ "acciones_tomadas": acciones_tomadas,
88
+ }
89
+
90
  with st.spinner("⏳ Generando informe técnico..."):
91
+ final_report = IncidentReporterCrew(
92
+ api_key=st.session_state['gemini_api_key'],
93
+ ).crew().kickoff(inputs=inputs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
  # Mostrar el resultado
96
  st.divider()
97
+ st.success("✅ Informe post-mortem generado correctamente.")
98
+ #st.code(incident_crew.usage_metrics)
99
  st.markdown(final_report)
100
  st.download_button(
101
  label="Descargar Informe (.md)",
src/custom_tools.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import requests
3
+ from crewai.tools import BaseTool
4
+ from typing import Type, List
5
+ from pydantic import BaseModel, Field
6
+
7
+ # --- 1. IPInfo Geo Lookup Tool ---
8
+ class IPInfoToolInput(BaseModel):
9
+ """Input schema for IPInfo Geo Lookup Tool."""
10
+ target: List[str] = Field(
11
+ ...,
12
+ description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query."
13
+ )
14
+
15
+ class IPInfoGeoLookup(BaseTool):
16
+ # See https://ipinfo.io/developers/lite-api
17
+ name: str = "IPInfo Geo Lookup"
18
+ description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset."
19
+ args_schema: Type[BaseModel] = IPInfoToolInput
20
+
21
+ def _run(self, target: List[str]) -> str:
22
+ api_key = os.getenv("IPINFO_APIKEY")
23
+ if not api_key:
24
+ return "Error: La variable de entorno IPINFO_APIKEY no está configurada."
25
+
26
+ results = []
27
+ for ip_address in target:
28
+ try:
29
+ url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}"
30
+ response = requests.get(url)
31
+ response.raise_for_status() # Lanza una excepción para respuestas de error (4xx o 5xx)
32
+ data = response.json()
33
+ # Formateamos la salida para que sea legible y útil para el agente
34
+ result_str = (
35
+ f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}"
36
+ )
37
+ results.append(result_str)
38
+ except requests.exceptions.RequestException as e:
39
+ results.append(f"Error al consultar la IP {ip_address}: {e}")
40
+ return "\n".join(results)
41
+
42
+ # --- 2. VirusTotal Scanner Tool ---
43
+ class VirusTotalToolInput(BaseModel):
44
+ """Input schema for VirusTotal Scanner Tool."""
45
+ resources: List[str] = Field(
46
+ ...,
47
+ description="A list of resources to scan. Valid resources include file hash (MD5, SHA256), a URL, a domain, or an IP address."
48
+ )
49
+
50
+ class VirusTotalScanner(BaseTool):
51
+ # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html
52
+ # See https://docs.virustotal.com/reference/ip-info
53
+ name: str = "VirusTotal Scanner"
54
+ description: str = "Analyzes files, URLs, domains, or IP addresses for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report."
55
+ args_schema: Type[BaseModel] = VirusTotalToolInput
56
+
57
+ def _run(self, resources: List[str]) -> str:
58
+ # Placeholder logic for the actual VirusTotal API call
59
+ print(f"DEBUG: Running VirusTotal Scan for {resources}")
60
+ return f"Simulation: Resource {resources} analyzed. 0/90 engines flagged it as malicious. Reputation: Clean."
src/incident_crew.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from crewai import Agent, Task, Crew, Process, LLM
2
+ from custom_tools import IPInfoGeoLookup
3
+
4
+ class IncidentReporterCrew:
5
+ def __init__(self, api_key):
6
+ self.api_key = api_key
7
+
8
+ self.llm = LLM(
9
+ model="gemini/gemini-2.0-flash",
10
+ temperature=0.3,
11
+ api_key=self.api_key
12
+ )
13
+
14
+ def threat_hunter_agent(self) -> Agent:
15
+ return Agent(
16
+ role="Security Threat Hunter",
17
+ goal="Rapidly identify, extract, and analyze key Indicators of Compromise (IOCs) from initial security alerts to provide structured and actionable intelligence.",
18
+ backstory="Highly specialized in threat intelligence, capable of parsing raw incident data, identifying patterns, and performing virtual lookups on security intelligence databases (like VirusTotal, IPInfo, etc.). Your output must be a concise, structured summary.",
19
+ verbose=True,
20
+ tools=[IPInfoGeoLookup()],
21
+ allow_delegation=False,
22
+ llm=self.llm,
23
+ )
24
+
25
+ def reporter_agent(self) -> Agent:
26
+ return Agent(
27
+ role="Incident Response Document Specialist",
28
+ goal="Generate a formal, professional, and comprehensive Incident Response Report based on the initial alert and the technical intelligence provided by the Threat Hunter.",
29
+ backstory="An expert in cybersecurity documentation, focusing on clarity, structure, and adherence to industry-standard reporting formats. You transform raw data into polished, readable documents suitable for management and technical teams.",
30
+ verbose=True,
31
+ llm=self.llm,
32
+ )
33
+
34
+ def task_ioc_extraction(self) -> Task:
35
+ return Task(
36
+ description="""
37
+ Input: Initial raw incident alert details
38
+ {impacto_detalle}
39
+ Process:
40
+ 1. Extract 10 observable Indicators of Compromise (IOCs) including IPs, domains, file hashes (SHA256, MD5), and URLs from the input.
41
+ 2. For each extracted IOC, try querying external intelligence services (like VirusTotal, IPInfo, etc.).
42
+ 3. Synthesize the findings into a clear, structured intelligence summary.
43
+ Output Requirements: The output MUST be a JSON-like or clearly delimited text block, detailing each IOC, its type, and a summary of the associated risk/reputation found (e.g., "Malicious/Known C2," "Clean," "High Reputation," "Related to Phishing Campaign X"). This summary is the ONLY content that should be passed to the next agent.
44
+ """,
45
+ expected_output="A structured summary of technical findings including IPs, hashes, and domains, and their corresponding threat intelligence status, ready for the final report.",
46
+ agent=self.threat_hunter_agent(),
47
+ )
48
+
49
+ def task_report_drafting(self) -> Task:
50
+ return Task(
51
+ description="""
52
+ Input:
53
+ 1. The original user input:
54
+ - **Tipo de Alerta**: {tipo_alerta}
55
+ - **Sistema Afectado**: {sistema_afectado}
56
+ - **Fecha y Hora**: {fecha_hora}
57
+ - **Detalles del Incidente**: {impacto_detalle}
58
+ - **Acciones Tomadas (Mitigación Inmediata)**: {acciones_tomadas}
59
+ 2. The structured technical intelligence summary provided by the Threat Hunter in context.
60
+
61
+ Process: Draft a complete, professional Incident Response Report.
62
+
63
+ Output Requirements: The report MUST be formatted in Markdown and include the following mandatory sections:
64
+ 1. Executive Summary (A brief, high-level overview).
65
+ 2. Initial Incident Details (Source, Time, Initial Observation).
66
+ 3. Technical Analysis & Indicators of Compromise (IOCs) (A detailed section incorporating ALL findings from the Threat Hunter agent information context).
67
+ 4. Impact Assessment (Potential or confirmed impact on systems/data).
68
+ 5. Mitigation or remediation steps taken (if any).
69
+
70
+ The entire final output must be this Markdown report.
71
+ """,
72
+ expected_output="A complete Incident Response Report in Spanish language. Formatted as markdown without '```'",
73
+ agent=self.reporter_agent(),
74
+ markdown=True,
75
+ #context=[self.task_ioc_extraction()] # Esta tarea espera el resultado de la anterior
76
+ )
77
+
78
+ def crew(self) -> Crew:
79
+ return Crew(
80
+ agents=[self.threat_hunter_agent(), self.reporter_agent()],
81
+ tasks=[self.task_ioc_extraction(), self.task_report_drafting()],
82
+ process=Process.sequential, # Ejecución secuencial: Threat Hunter -> Reporter
83
+ verbose=True
84
+ )