File size: 12,031 Bytes
4343907
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
"""
Privacy Detector for SAAP Multi-Agent System
Detects sensitive data to route to local colossus instead of external OpenRouter
"""

import re
import logging
from typing import Dict, List, Tuple
from enum import Enum

logger = logging.getLogger(__name__)

class PrivacyLevel(Enum):
    """Privacy classification levels"""
    PUBLIC = "public"  # Safe for external processing
    INTERNAL = "internal"  # Prefer local but not critical
    CONFIDENTIAL = "confidential"  # Should be local
    PRIVATE = "private"  # MUST be local (medical, financial, personal)

class PrivacyDetector:
    """
    Detects sensitive data in user messages to ensure privacy-compliant routing
    
    Detection Methods:
    1. Keyword-based (medical, financial, personal data keywords)
    2. Pattern-based (credit cards, SSN, IBAN, etc.)
    3. Agent-based rules (medical agent = always private)
    """
    
    def __init__(self):
        # Sensitive keyword categories
        self.sensitive_keywords = {
            "medical": [
                "patient", "patienten", "diagnosis", "diagnose", "treatment", "behandlung",
                "medication", "medikament", "symptom", "krankheit", "disease", "arzt",
                "doctor", "hospital", "krankenhaus", "gesundheit", "health", "medizin",
                "medicine", "therapie", "therapy", "blut", "blood", "operation"
            ],
            "financial": [
                # Only TRULY sensitive financial data (account numbers, cards, passwords)
                # General financial advice keywords removed (investment, portfolio, sparen, etc.)
                "account number", "kontonummer", "password", "passwort", 
                "credit card", "kreditkarte", "iban", "bic", 
                "pin", "cvv", "security code", "sicherheitscode",
                "salary", "gehalt",  # Personal income data
                "tax id", "steuernummer"  # Personal tax data
            ],
            "personal": [
                "social security", "sozialversicherung", "passport", "reisepass",
                "driver license", "führerschein", "birthday", "geburtstag", "geburtsdatum",
                "address", "adresse", "phone", "telefon", "email", "personalausweis",
                "id card", "tax", "steuer", "insurance", "versicherung"
            ],
            "legal": [
                "contract", "vertrag", "confidential", "vertraulich", "proprietary",
                "nda", "geheimhaltung", "lawsuit", "klage", "court", "gericht",
                "lawyer", "anwalt", "legal", "rechtlich"
            ],
            "security": [
                "secret", "geheim", "private key", "token", "api key", "credentials",
                "zugangsdaten", "authentication", "authentifizierung"
            ]
        }
        
        # Sensitive data patterns (regex)
        self.sensitive_patterns = [
            # Credit card numbers
            (r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "credit_card"),
            # SSN (US)
            (r"\b\d{3}-\d{2}-\d{4}\b", "ssn"),
            # IBAN
            (r"\b[A-Z]{2}\d{2}[A-Z0-9]{13,29}\b", "iban"),
            # Dates (potential birthdays)
            (r"\b\d{2}[./]\d{2}[./]\d{4}\b", "date"),
            # Email addresses
            (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email"),
            # Phone numbers (German format)
            (r"\b(?:\+49|0)\s?\d{3,4}\s?\d{6,8}\b", "phone"),
            # API keys/tokens (long alphanumeric strings)
            (r"\b[A-Za-z0-9]{32,}\b", "api_key")
        ]
        
        # Agent-specific privacy rules
        # Only MANDATORY privacy rules here (content-based detection is preferred)
        self.agent_privacy_rules = {
            "lara_alesi": PrivacyLevel.PRIVATE,  # Medical - ALWAYS private (health data)
            "theo_alesi": PrivacyLevel.INTERNAL,  # Financial - depends on content
            "justus_alesi": PrivacyLevel.INTERNAL,  # Legal - depends on content
            "jane_alesi": PrivacyLevel.INTERNAL,  # Coordinator - depends on content
            "john_alesi": PrivacyLevel.INTERNAL,  # Development - usually safe
            "leon_alesi": PrivacyLevel.INTERNAL,  # System - usually safe
            "luna_alesi": PrivacyLevel.INTERNAL  # Coaching - usually safe
        }
        
    def detect_privacy_level(
        self, 
        message: str, 
        agent_id: str = None,
        user_privacy_flag: str = None
    ) -> Tuple[PrivacyLevel, Dict]:
        """
        Detect privacy level of message
        
        Args:
            message: User message to analyze
            agent_id: Agent that will process this (optional)
            user_privacy_flag: User-specified privacy level (optional)
            
        Returns:
            Tuple of (PrivacyLevel, detection_details)
        """
        
        # User override takes precedence
        if user_privacy_flag:
            level = self._parse_user_privacy_flag(user_privacy_flag)
            return level, {"reason": "user_override", "user_flag": user_privacy_flag}
        
        # Check agent-specific rules first
        if agent_id and agent_id in self.agent_privacy_rules:
            agent_level = self.agent_privacy_rules[agent_id]
            if agent_level == PrivacyLevel.PRIVATE:
                return agent_level, {
                    "reason": "agent_rule",
                    "agent": agent_id,
                    "rule": "always_private"
                }
        
        # Keyword detection
        keyword_matches = self._detect_keywords(message)
        
        # Pattern detection
        pattern_matches = self._detect_patterns(message)
        
        # Combine detections
        total_detections = len(keyword_matches) + len(pattern_matches)
        
        # Determine privacy level based on detections
        details = {
            "keyword_matches": keyword_matches,
            "pattern_matches": pattern_matches,
            "total_detections": total_detections
        }
        
        # Classification logic
        if pattern_matches:  # Any pattern match = high sensitivity
            if any(cat in ["credit_card", "ssn", "iban"] for pat, cat in pattern_matches):
                return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_pattern"}
        
        if keyword_matches:
            categories = set(cat for cat, _ in keyword_matches)
            
            # Medical or financial keywords = PRIVATE
            if "medical" in categories or "financial" in categories:
                return PrivacyLevel.PRIVATE, {**details, "reason": "sensitive_keywords"}
            
            # Personal or legal = CONFIDENTIAL
            if "personal" in categories or "legal" in categories:
                return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "confidential_keywords"}
            
            # Security keywords = CONFIDENTIAL
            if "security" in categories:
                return PrivacyLevel.CONFIDENTIAL, {**details, "reason": "security_keywords"}
        
        # Apply agent rule if no strong detection
        if agent_id and agent_id in self.agent_privacy_rules:
            return self.agent_privacy_rules[agent_id], {
                **details,
                "reason": "agent_default",
                "agent": agent_id
            }
        
        # Default: PUBLIC (safe for external processing)
        return PrivacyLevel.PUBLIC, {**details, "reason": "no_sensitive_data"}
    
    def _detect_keywords(self, message: str) -> List[Tuple[str, str]]:
        """Detect sensitive keywords in message using word boundaries"""
        message_lower = message.lower()
        matches = []
        
        for category, keywords in self.sensitive_keywords.items():
            for keyword in keywords:
                # Use word boundaries to avoid false positives
                # "health" in "wealth" won't match
                pattern = r'\b' + re.escape(keyword) + r'\b'
                if re.search(pattern, message_lower):
                    matches.append((category, keyword))
        
        return matches
    
    def _detect_patterns(self, message: str) -> List[Tuple[str, str]]:
        """Detect sensitive patterns in message"""
        matches = []
        
        for pattern, category in self.sensitive_patterns:
            if re.search(pattern, message):
                matches.append((pattern, category))
        
        return matches
    
    def _parse_user_privacy_flag(self, flag: str) -> PrivacyLevel:
        """Parse user-specified privacy flag"""
        flag_lower = flag.lower()
        
        if flag_lower in ["private", "high", "strict"]:
            return PrivacyLevel.PRIVATE
        elif flag_lower in ["confidential", "medium"]:
            return PrivacyLevel.CONFIDENTIAL
        elif flag_lower in ["internal", "low"]:
            return PrivacyLevel.INTERNAL
        else:
            return PrivacyLevel.PUBLIC
    
    def should_use_local_provider(self, privacy_level: PrivacyLevel, mode: str = "balanced") -> bool:
        """
        Determine if local provider should be used based on privacy level and mode
        
        Args:
            privacy_level: Detected privacy level
            mode: Privacy mode (strict, balanced, performance)
            
        Returns:
            True if should use local provider (colossus), False if external OK (OpenRouter)
        """
        
        if mode == "strict":
            # Strict: Everything goes local
            return True
        
        elif mode == "balanced":
            # Balanced: Private and Confidential go local
            return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]
        
        elif mode == "performance":
            # Performance: Only explicitly PRIVATE goes local
            return privacy_level == PrivacyLevel.PRIVATE
        
        else:
            # Default to balanced
            return privacy_level in [PrivacyLevel.PRIVATE, PrivacyLevel.CONFIDENTIAL]


# Singleton instance
privacy_detector = PrivacyDetector()


# Convenience functions
def detect_privacy_level(message: str, agent_id: str = None, user_flag: str = None) -> Tuple[PrivacyLevel, Dict]:
    """Convenience function to detect privacy level"""
    return privacy_detector.detect_privacy_level(message, agent_id, user_flag)


def should_use_local(message: str, agent_id: str = None, mode: str = "balanced") -> bool:
    """Convenience function to check if local provider should be used"""
    level, details = privacy_detector.detect_privacy_level(message, agent_id)
    use_local = privacy_detector.should_use_local_provider(level, mode)
    
    logger.info(f"Privacy check: {level.value}{'LOCAL' if use_local else 'EXTERNAL'} ({details.get('reason', 'unknown')})")
    
    return use_local


if __name__ == "__main__":
    # Demo privacy detection
    test_cases = [
        ("What is Python?", "john_alesi"),
        ("My patient has symptoms of diabetes", "lara_alesi"),
        ("Analyze my bank account: DE89370400440532013000", "theo_alesi"),
        ("Review this confidential contract", "justus_alesi"),
        ("How to optimize this code?", "john_alesi"),
        ("My credit card number is 4532-1234-5678-9010", None)
    ]
    
    print("🔒 Privacy Detection Demo\n")
    
    for message, agent in test_cases:
        level, details = detect_privacy_level(message, agent)
        use_local = privacy_detector.should_use_local_provider(level, mode="balanced")
        
        print(f"Message: '{message[:50]}...'")
        print(f"Agent: {agent or 'None'}")
        print(f"Privacy Level: {level.value}")
        print(f"Routing: {'LOCAL (colossus)' if use_local else 'EXTERNAL (OpenRouter)'}")
        print(f"Reason: {details.get('reason')}")
        if details.get('keyword_matches'):
            print(f"Keywords: {details['keyword_matches']}")
        if details.get('pattern_matches'):
            print(f"Patterns: {details['pattern_matches']}")
        print()