|
|
|
|
|
""" |
|
|
Ghostprint Semantic Encryption Engine with Information Geometry AI Poisoning |
|
|
========================================================================== |
|
|
|
|
|
Complete implementation integrating proven AI poisoning techniques through |
|
|
information geometry masks and hybrid perturbation methods. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
import json |
|
|
import hashlib |
|
|
import lzma |
|
|
import base64 |
|
|
import math |
|
|
import cmath |
|
|
import itertools |
|
|
import struct |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Tuple, Any, Optional |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from scipy.special import gamma, airy, jv |
|
|
from ghost_engine_configs import get_config |
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
|
from PIL import Image, PngImagePlugin |
|
|
from io import BytesIO |
|
|
from armor import ArmorGenerator, ArmorConfig, Ring, apply_delta_autosize, analyze_array_bands |
|
|
|
|
|
|
|
|
try: |
|
|
import cv2 |
|
|
CV2_AVAILABLE = True |
|
|
except ImportError: |
|
|
cv2 = None |
|
|
CV2_AVAILABLE = False |
|
|
|
|
|
|
|
|
import os |
|
|
if os.environ.get('GHOSTPRINT_DEBUG', '').lower() == 'true': |
|
|
print("[INFO] OpenCV not installed. Using Pillow for all image operations (fully functional).") |
|
|
|
|
|
|
|
|
C1 = complex(0.587, 1.223) |
|
|
C2 = complex(-0.994, 0.0) |
|
|
|
|
|
|
|
|
HARMONIC_FREQUENCIES = [271, 341, 491] |
|
|
HARMONIC_AMPLITUDES = [0.033, 0.050, 0.100] |
|
|
|
|
|
|
|
|
HOMOGLYPHS = { |
|
|
"a": "Π°", |
|
|
"e": "Π΅", |
|
|
"i": "Ρ", |
|
|
"o": "ΠΎ", |
|
|
"p": "Ρ", |
|
|
"c": "Ρ", |
|
|
"x": "Ρ
", |
|
|
} |
|
|
|
|
|
|
|
|
REVERSE_HOMOGLYPHS = {v: k for k, v in HOMOGLYPHS.items()} |
|
|
|
|
|
class LensFunction(Enum): |
|
|
"""Mathematical lens functions for CMT transformation""" |
|
|
GAMMA = "gamma" |
|
|
AIRY = "airy" |
|
|
BESSEL = "bessel" |
|
|
|
|
|
@dataclass |
|
|
class GeometricFingerprint: |
|
|
"""Container for complete geometric fingerprint data""" |
|
|
cmt_signature: List[complex] |
|
|
lehi_pattern: List[float] |
|
|
lgrm_map: np.ndarray |
|
|
holographic_field: List[complex] |
|
|
srl_stability: float |
|
|
sefa_emergence: float |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
class InformationGeometryEngine: |
|
|
"""Core engine for information geometry transformations with AI poisoning""" |
|
|
|
|
|
def __init__(self): |
|
|
self.c1 = C1 |
|
|
self.c2 = C2 |
|
|
|
|
|
|
|
|
|
|
|
def golden_mask_positions(self, n: int, phi: float = (1 + 5 ** 0.5) / 2) -> List[int]: |
|
|
"""Generate positions using golden ratio distribution""" |
|
|
positions = [] |
|
|
x = 0.0 |
|
|
for i in range(n): |
|
|
x = (x + phi) % 1.0 |
|
|
positions.append(int(x * n)) |
|
|
return sorted(set(positions)) |
|
|
|
|
|
def fibonacci_mask_positions(self, n: int) -> List[int]: |
|
|
"""Generate positions using Fibonacci sequence""" |
|
|
fib = [1, 2] |
|
|
while fib[-1] < n: |
|
|
fib.append(fib[-1] + fib[-2]) |
|
|
return [f % n for f in fib if f < n] |
|
|
|
|
|
def harmonic_mask_positions(self, n: int, freq: int = 5) -> List[int]: |
|
|
"""Generate positions using harmonic wave distribution""" |
|
|
return [int((np.sin(i * freq) + 1) / 2 * n) for i in range(1, n, max(1, n // 20))] |
|
|
|
|
|
def generate_content_keyed_harmonics(self, data: bytes, content_hash: str) -> List[float]: |
|
|
"""Generate harmonics keyed to content hash for deterministic perturbations""" |
|
|
hash_bytes = bytes.fromhex(content_hash[:12]) |
|
|
|
|
|
|
|
|
freq1 = 271 + (hash_bytes[0] % 50) |
|
|
freq2 = 341 + (hash_bytes[1] % 50) |
|
|
freq3 = 491 + (hash_bytes[2] % 50) |
|
|
|
|
|
amp1 = 0.033 * (1 + hash_bytes[3] / 512) |
|
|
amp2 = 0.050 * (1 + hash_bytes[4] / 512) |
|
|
amp3 = 0.100 * (1 + hash_bytes[5] / 512) |
|
|
|
|
|
harmonics = [] |
|
|
N = len(data) |
|
|
for k in range(N): |
|
|
harmonic_value = ( |
|
|
amp1 * math.sin(2 * math.pi * freq1 * k / N) + |
|
|
amp2 * math.sin(2 * math.pi * freq2 * k / N) + |
|
|
amp3 * math.sin(2 * math.pi * freq3 * k / N) |
|
|
) |
|
|
harmonics.append(harmonic_value) |
|
|
|
|
|
return harmonics |
|
|
|
|
|
|
|
|
|
|
|
def apply_homoglyph_substitution(self, text: str, ratio: float = 0.25, content_hash: str = None) -> str: |
|
|
"""Apply homoglyph character substitution for AI poisoning""" |
|
|
if content_hash: |
|
|
|
|
|
np.random.seed(int(content_hash[:8], 16) % (2**32)) |
|
|
|
|
|
chars = list(text) |
|
|
n = len(chars) |
|
|
swap_count = int(ratio * n) |
|
|
|
|
|
if swap_count == 0: |
|
|
return text |
|
|
|
|
|
swap_positions = np.random.choice(n, swap_count, replace=False) |
|
|
|
|
|
for pos in swap_positions: |
|
|
ch = chars[pos].lower() |
|
|
if ch in HOMOGLYPHS: |
|
|
chars[pos] = HOMOGLYPHS[ch] |
|
|
|
|
|
return "".join(chars) |
|
|
|
|
|
def apply_multi_mask_injection(self, text: str, content_hash: str = None) -> str: |
|
|
"""Apply zero-width space injection at geometrically determined positions""" |
|
|
n = len(text) |
|
|
|
|
|
|
|
|
golden_positions = self.golden_mask_positions(n) |
|
|
fibonacci_positions = self.fibonacci_mask_positions(n) |
|
|
harmonic_positions = self.harmonic_mask_positions(n) |
|
|
|
|
|
|
|
|
injection_positions = set(golden_positions + fibonacci_positions + harmonic_positions) |
|
|
|
|
|
|
|
|
zwsp = "\u200b" |
|
|
result = [] |
|
|
|
|
|
for i, char in enumerate(text): |
|
|
result.append(char) |
|
|
if i in injection_positions: |
|
|
result.append(zwsp) |
|
|
|
|
|
return "".join(result) |
|
|
|
|
|
def apply_hybrid_text_poisoning(self, text: str, strength: float = 1.0, content_hash: str = None) -> str: |
|
|
"""Apply hybrid AI poisoning combining multiple techniques""" |
|
|
if not content_hash: |
|
|
content_hash = hashlib.sha256(text.encode()).hexdigest() |
|
|
|
|
|
|
|
|
homoglyph_ratio = 0.15 + (strength * 0.15) |
|
|
step1 = self.apply_homoglyph_substitution(text, homoglyph_ratio, content_hash) |
|
|
|
|
|
|
|
|
step2 = self.apply_multi_mask_injection(step1, content_hash) |
|
|
|
|
|
return step2 |
|
|
|
|
|
def apply_invisible_armor_image_poisoning(self, image_data: bytes, strength: float = 1.0, |
|
|
focus_parameter: float = 3.5, |
|
|
frequency_strategy: str = 'auto') -> bytes: |
|
|
""" |
|
|
Applies the full suite of Invisible Armor protections using the advanced Chimera Engine. |
|
|
This function is the core of the image protection service. |
|
|
""" |
|
|
try: |
|
|
import tempfile |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_input: |
|
|
temp_input.write(image_data) |
|
|
temp_input_path = temp_input.name |
|
|
|
|
|
temp_output_path = temp_input_path + ".armored.png" |
|
|
|
|
|
armor_gen = ArmorGenerator() |
|
|
config = ArmorConfig() |
|
|
|
|
|
armor_gen.apply_to_image( |
|
|
image_path=temp_input_path, |
|
|
out_path=temp_output_path, |
|
|
cfg=config, |
|
|
return_metrics=False, |
|
|
|
|
|
strength=strength, |
|
|
focus_parameter=focus_parameter, |
|
|
frequency_strategy=frequency_strategy |
|
|
) |
|
|
|
|
|
with open(temp_output_path, "rb") as f: |
|
|
armored_data = f.read() |
|
|
|
|
|
|
|
|
os.remove(temp_input_path) |
|
|
os.remove(temp_output_path) |
|
|
|
|
|
return armored_data |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] Invisible Armor application failed: {e}.") |
|
|
return image_data |
|
|
|
|
|
def apply_ultra_subtle_armor(self, image_data: bytes) -> bytes: |
|
|
""" |
|
|
Apply ultra-subtle protection that is completely invisible to human eyes. |
|
|
Only modifies 0.01% of pixels by Β±1 value. |
|
|
""" |
|
|
try: |
|
|
img = Image.open(BytesIO(image_data)) |
|
|
original_mode = img.mode |
|
|
original_format = img.format if img.format else "PNG" |
|
|
img_array = np.array(img).astype(np.float32) |
|
|
|
|
|
if img_array.ndim == 2: |
|
|
h, w = img_array.shape |
|
|
channels = 1 |
|
|
elif img_array.ndim == 3: |
|
|
h, w, channels = img_array.shape |
|
|
else: |
|
|
return image_data |
|
|
|
|
|
|
|
|
data_hash_int = int(hashlib.sha256(image_data).hexdigest()[:16], 16) |
|
|
rng = np.random.default_rng(data_hash_int) |
|
|
|
|
|
|
|
|
total_pixels = h * w |
|
|
pixels_to_modify = max(1, int(total_pixels * 0.0001)) |
|
|
|
|
|
|
|
|
flat_indices = rng.choice(total_pixels, size=pixels_to_modify, replace=False) |
|
|
|
|
|
|
|
|
y_coords = flat_indices // w |
|
|
x_coords = flat_indices % w |
|
|
|
|
|
|
|
|
modified_img_array = img_array.copy() |
|
|
|
|
|
for i in range(pixels_to_modify): |
|
|
y, x = y_coords[i], x_coords[i] |
|
|
change = rng.choice([-1, 1]) |
|
|
|
|
|
if channels == 1: |
|
|
modified_img_array[y, x] = np.clip(modified_img_array[y, x] + change, 0, 255) |
|
|
else: |
|
|
|
|
|
channel = rng.integers(0, channels) |
|
|
modified_img_array[y, x, channel] = np.clip( |
|
|
modified_img_array[y, x, channel] + change, 0, 255 |
|
|
) |
|
|
|
|
|
|
|
|
modified_img_array = modified_img_array.astype(np.uint8) |
|
|
result_img = Image.fromarray(modified_img_array, mode=original_mode) |
|
|
|
|
|
|
|
|
output = BytesIO() |
|
|
if original_format == 'JPEG': |
|
|
result_img.save(output, format='JPEG', quality=100, optimize=True) |
|
|
else: |
|
|
result_img.save(output, format=original_format) |
|
|
|
|
|
return output.getvalue() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] Ultra-subtle armor failed: {e}") |
|
|
return image_data |
|
|
|
|
|
def apply_subtle_geometric_watermark(self, data: bytes, fingerprint: GeometricFingerprint, watermark_text: str) -> bytes: |
|
|
"""Applies a very subtle, almost imperceptible geometric watermark to the data bytes. |
|
|
This is NOT AI poisoning; it's minimal modification for watermarking purposes. |
|
|
""" |
|
|
if not watermark_text or len(data) == 0: |
|
|
return data |
|
|
|
|
|
modified_data = bytearray(data) |
|
|
watermark_hash = hashlib.sha256(watermark_text.encode('utf-8')).hexdigest() |
|
|
hash_int = int(watermark_hash[:8], 16) |
|
|
|
|
|
|
|
|
import random |
|
|
random_gen = random.Random(hash_int) |
|
|
|
|
|
|
|
|
|
|
|
cmt_magnitudes = [abs(c) for c in fingerprint.cmt_signature] |
|
|
if not cmt_magnitudes: cmt_magnitudes = [1.0] |
|
|
|
|
|
|
|
|
|
|
|
num_modifications = min(len(data) // 1000, 100) |
|
|
if num_modifications == 0 and len(data) > 0: num_modifications = 1 |
|
|
|
|
|
|
|
|
|
|
|
potential_positions = [] |
|
|
for i in range(num_modifications * 5): |
|
|
|
|
|
fp_value = cmt_magnitudes[i % len(cmt_magnitudes)] |
|
|
pos = (hash_int + i + int(fp_value * 1000)) % len(data) |
|
|
potential_positions.append(pos) |
|
|
|
|
|
|
|
|
chosen_positions = sorted(list(set(potential_positions)))[:num_modifications] |
|
|
|
|
|
for pos in chosen_positions: |
|
|
if pos < len(modified_data): |
|
|
|
|
|
|
|
|
change_value = (random_gen.randint(-1, 1)) |
|
|
modified_data[pos] = (modified_data[pos] + change_value) % 256 |
|
|
|
|
|
return bytes(modified_data) |
|
|
|
|
|
def apply_geometric_byte_poisoning(self, data: bytes, strength: float = 1.0) -> bytes: |
|
|
"""Apply aggressive geometric byte perturbations. |
|
|
Changes are significant (-20 to +20) to maximize AI confusion. |
|
|
Strength controls the intensity and density of changes. |
|
|
""" |
|
|
poisoned_data = bytearray(data) |
|
|
if len(data) == 0: return bytes(poisoned_data) |
|
|
|
|
|
golden_positions = self.golden_mask_positions(len(data)) |
|
|
|
|
|
|
|
|
data_hash_int = int(hashlib.sha256(data).hexdigest()[:8], 16) |
|
|
import random |
|
|
rng = random.Random(data_hash_int) |
|
|
|
|
|
|
|
|
|
|
|
target_mod_count = int(len(golden_positions) * (strength * 0.375)) |
|
|
target_mod_count = min(target_mod_count, len(data) // 2) |
|
|
if target_mod_count == 0 and len(data) > 0: target_mod_count = len(data) // 10 |
|
|
|
|
|
|
|
|
actual_positions_to_modify = rng.sample(golden_positions, min(target_mod_count, len(golden_positions))) |
|
|
|
|
|
|
|
|
for layer in range(3): |
|
|
layer_strength = strength * (1.0 + layer * 0.4) |
|
|
|
|
|
|
|
|
for i, pos in enumerate(actual_positions_to_modify): |
|
|
if pos < len(poisoned_data): |
|
|
|
|
|
pattern_val = math.sin(i / 10.0 + layer * math.pi/3) * math.cos(i / 15.0 + layer * math.pi/4) |
|
|
|
|
|
|
|
|
change_value = int(pattern_val * 15 * layer_strength) |
|
|
new_val = (poisoned_data[pos] + change_value) % 256 |
|
|
|
|
|
|
|
|
random_boost = rng.randint(-4, 4) |
|
|
new_val = (new_val + random_boost) % 256 |
|
|
|
|
|
poisoned_data[pos] = new_val |
|
|
|
|
|
|
|
|
for pos in range(0, len(poisoned_data), 100): |
|
|
if pos < len(poisoned_data): |
|
|
pattern_val = math.sin(pos / 25.0) * math.cos(pos / 30.0) |
|
|
change_value = int(pattern_val * 11.25 * strength) |
|
|
poisoned_data[pos] = (poisoned_data[pos] + change_value) % 256 |
|
|
|
|
|
return bytes(poisoned_data) |
|
|
|
|
|
|
|
|
|
|
|
def remove_homoglyph_substitution(self, text: str) -> str: |
|
|
"""Revert homoglyph substitutions.""" |
|
|
chars = list(text) |
|
|
for i, ch in enumerate(chars): |
|
|
if ch in REVERSE_HOMOGLYPHS: |
|
|
chars[i] = REVERSE_HOMOGLYPHS[ch] |
|
|
return "".join(chars) |
|
|
|
|
|
def remove_multi_mask_injection(self, text: str) -> str: |
|
|
"""Remove injected zero-width spaces.""" |
|
|
return text.replace("\u200b", "") |
|
|
|
|
|
def remove_hybrid_text_poisoning(self, text: str) -> str: |
|
|
"""Revert hybrid text poisoning.""" |
|
|
|
|
|
step1 = self.remove_multi_mask_injection(text) |
|
|
step2 = self.remove_homoglyph_substitution(step1) |
|
|
return step2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_entropy_landscape(self, data: bytes, block_size: int = 64) -> List[Tuple[int, float]]: |
|
|
"""Analyze entropy landscape of data""" |
|
|
regions = [] |
|
|
|
|
|
for i in range(0, len(data), block_size): |
|
|
block = data[i:i+block_size] |
|
|
if len(block) == 0: |
|
|
continue |
|
|
|
|
|
freq_count = {} |
|
|
for byte in block: |
|
|
freq_count[byte] = freq_count.get(byte, 0) + 1 |
|
|
|
|
|
total = len(block) |
|
|
entropy = 0.0 |
|
|
for count in freq_count.values(): |
|
|
p = count / total |
|
|
if p > 0: |
|
|
entropy -= p * math.log2(p) |
|
|
|
|
|
normalized_entropy = entropy / 8.0 if entropy > 0 else 0.0 |
|
|
regions.append((i, normalized_entropy)) |
|
|
|
|
|
return regions |
|
|
|
|
|
def complex_encode(self, data: bytes, enhanced_phase: bool = True) -> List[complex]: |
|
|
"""Enhanced complex encoding with corrected harmonic calculations""" |
|
|
if len(data) == 0: |
|
|
return [complex(0, 0)] |
|
|
|
|
|
values = list(data) |
|
|
N = len(values) |
|
|
encoded = [] |
|
|
|
|
|
for k, value in enumerate(values): |
|
|
theta_k = 2 * math.pi * k / N |
|
|
|
|
|
phi_k = 0.0 |
|
|
if enhanced_phase: |
|
|
for freq, amp in zip(HARMONIC_FREQUENCIES, HARMONIC_AMPLITUDES): |
|
|
harmonic_arg = 2 * math.pi * freq * k / N |
|
|
phi_k += amp * math.sin(harmonic_arg) |
|
|
|
|
|
total_phase = theta_k + phi_k |
|
|
normalized_value = value / 255.0 |
|
|
z_k = normalized_value * cmath.exp(1j * total_phase) |
|
|
encoded.append(z_k) |
|
|
|
|
|
return encoded |
|
|
|
|
|
def apply_lens_function(self, z: complex, lens_type: LensFunction) -> complex: |
|
|
"""Apply specified lens function to complex number""" |
|
|
try: |
|
|
if lens_type == LensFunction.GAMMA: |
|
|
if abs(z) > 50: |
|
|
return complex(1e-12, 1e-12) |
|
|
result = gamma(z) |
|
|
if not np.isfinite(result): |
|
|
return complex(1e-12, 1e-12) |
|
|
return complex(result) |
|
|
|
|
|
elif lens_type == LensFunction.AIRY: |
|
|
result = airy(z)[0] |
|
|
if not np.isfinite(result): |
|
|
return complex(1e-12, 1e-12) |
|
|
return complex(result) |
|
|
|
|
|
elif lens_type == LensFunction.BESSEL: |
|
|
result = jv(0, z) |
|
|
if not np.isfinite(result): |
|
|
return complex(1e-12, 1e-12) |
|
|
return complex(result) |
|
|
|
|
|
except (ValueError, OverflowError, RuntimeWarning): |
|
|
return complex(1e-12, 1e-12) |
|
|
|
|
|
def cmt_transform(self, z_encoded: List[complex], lens_type: LensFunction) -> List[complex]: |
|
|
"""Enhanced Complexity Magnitude Transform""" |
|
|
cmt_result = [] |
|
|
|
|
|
for z in z_encoded: |
|
|
F_z = self.apply_lens_function(z, lens_type) |
|
|
complexity_component = self.c1 * F_z |
|
|
magnitude_component = self.c2 * abs(z) |
|
|
phi_z = complexity_component + magnitude_component |
|
|
cmt_result.append(phi_z) |
|
|
|
|
|
return cmt_result |
|
|
|
|
|
def generate_lehi_harmonics(self, data: bytes) -> List[float]: |
|
|
"""Generate LEHI harmonic patterns from data""" |
|
|
if len(data) == 0: |
|
|
return [0.0] |
|
|
|
|
|
values = np.array(list(data), dtype=float) |
|
|
|
|
|
if np.max(values) > np.min(values): |
|
|
normalized = 2 * (values - np.min(values)) / (np.max(values) - np.min(values)) - 1 |
|
|
else: |
|
|
normalized = np.zeros_like(values) |
|
|
|
|
|
N = len(normalized) |
|
|
harmonics = np.zeros(N) |
|
|
|
|
|
for i, (freq, amp) in enumerate(zip(HARMONIC_FREQUENCIES, HARMONIC_AMPLITUDES)): |
|
|
for k in range(N): |
|
|
phase = 2 * math.pi * freq * k / N |
|
|
harmonics[k] += amp * normalized[k] * math.sin(phase + i * math.pi / 3) |
|
|
|
|
|
return harmonics.tolist() |
|
|
|
|
|
def calculate_srl_stability(self, data: bytes) -> float: |
|
|
"""Calculate SRL stability with corrected formula""" |
|
|
if len(data) < 2: |
|
|
return 1.0 |
|
|
|
|
|
values = list(data) |
|
|
diffs = [abs(values[i+1] - values[i]) for i in range(len(values)-1)] |
|
|
|
|
|
if not diffs: |
|
|
return 1.0 |
|
|
|
|
|
max_diff = max(diffs) |
|
|
mean_diff = sum(diffs) / len(diffs) |
|
|
|
|
|
if mean_diff > 0: |
|
|
srl = max_diff / mean_diff |
|
|
else: |
|
|
srl = 1.0 |
|
|
|
|
|
stability = 1.0 / (1.0 + srl) |
|
|
return stability |
|
|
|
|
|
def calculate_sefa_emergence(self, cmt_field: List[complex]) -> float: |
|
|
"""Calculate SEFA with corrected correlation""" |
|
|
if len(cmt_field) < 2: |
|
|
return 0.5 |
|
|
|
|
|
real_parts = [z.real for z in cmt_field] |
|
|
imag_parts = [z.imag for z in cmt_field] |
|
|
|
|
|
try: |
|
|
correlation_matrix = np.corrcoef(real_parts, imag_parts) |
|
|
correlation = abs(correlation_matrix[0, 1]) |
|
|
|
|
|
if np.isnan(correlation): |
|
|
correlation = 0.0 |
|
|
|
|
|
except ValueError: |
|
|
correlation = 0.0 |
|
|
|
|
|
emergence = 1.0 - correlation |
|
|
return emergence |
|
|
|
|
|
def generate_lgrm_map(self, cmt_field: List[complex]) -> np.ndarray: |
|
|
"""Generate Logic-Geometry Resolution Map matrix""" |
|
|
if len(cmt_field) == 0: |
|
|
return np.zeros((4, 4), dtype=complex) |
|
|
|
|
|
magnitudes = [abs(z) for z in cmt_field] |
|
|
phases = [cmath.phase(z) for z in cmt_field] |
|
|
|
|
|
lgrm = np.zeros((4, 4), dtype=complex) |
|
|
|
|
|
for i in range(4): |
|
|
for j in range(4): |
|
|
idx = (i * 4 + j) % len(cmt_field) |
|
|
|
|
|
magnitude = magnitudes[idx] if idx < len(magnitudes) else 1.0 |
|
|
phase = phases[idx] if idx < len(phases) else 0.0 |
|
|
|
|
|
real_part = magnitude * math.cos(phase + i * math.pi / 4) |
|
|
imag_part = magnitude * math.sin(phase + j * math.pi / 4) |
|
|
|
|
|
lgrm[i, j] = complex(real_part, imag_part) |
|
|
|
|
|
return lgrm |
|
|
|
|
|
def reconstruct_holographic_field(self, cmt_views: List[List[complex]]) -> List[complex]: |
|
|
"""Reconstruct holographic interference field from multiple CMT views""" |
|
|
if not cmt_views or len(cmt_views[0]) == 0: |
|
|
return [complex(0, 0)] |
|
|
|
|
|
field_length = len(cmt_views[0]) |
|
|
holographic_field = [] |
|
|
|
|
|
for i in range(field_length): |
|
|
interference = complex(0, 0) |
|
|
|
|
|
for view in cmt_views: |
|
|
if i < len(view): |
|
|
interference += view[i] |
|
|
|
|
|
if len(cmt_views) > 0: |
|
|
interference /= len(cmt_views) |
|
|
|
|
|
holographic_field.append(interference) |
|
|
|
|
|
return holographic_field |
|
|
|
|
|
def apply_integrated_ai_poisoning(data: bytes, protection_type: str, strength: float = 1.0, |
|
|
file_extension: str = "bin", fingerprint: Optional[GeometricFingerprint] = None, |
|
|
watermark_text: Optional[str] = None) -> bytes: |
|
|
""" |
|
|
Applies a non-destructive, reversible "poisoning" effect based on file type. |
|
|
It now also handles the embedding of an optional watermark. |
|
|
""" |
|
|
engine = InformationGeometryEngine() |
|
|
content_hash = hashlib.sha256(data).hexdigest() |
|
|
|
|
|
|
|
|
text_formats = ['txt', 'md', 'json', 'csv', 'html', 'xml', 'htm', 'ipynb', 'eml', 'msg', 'py'] |
|
|
if file_extension in text_formats and protection_type != "invisible_armor": |
|
|
try: |
|
|
text_data = data.decode('utf-8') |
|
|
poisoned_text = engine.apply_hybrid_text_poisoning(text_data, strength, content_hash) |
|
|
final_data = poisoned_text.encode('utf-8') |
|
|
except UnicodeDecodeError: |
|
|
|
|
|
final_data = bytearray(data) |
|
|
|
|
|
|
|
|
elif file_extension in ["png", "jpg", "jpeg"] and protection_type == "invisible_armor": |
|
|
final_data = engine.apply_invisible_armor_image_poisoning(data, strength) |
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
final_data = engine.apply_geometric_byte_poisoning(data, strength) |
|
|
|
|
|
|
|
|
|
|
|
if watermark_text and fingerprint: |
|
|
return engine.apply_subtle_geometric_watermark(bytes(final_data), fingerprint, watermark_text) |
|
|
elif watermark_text: |
|
|
return embed_watermark(bytes(final_data), watermark_text) |
|
|
|
|
|
return bytes(final_data) |
|
|
|
|
|
def create_ghostprint_fingerprint(data: bytes, creator: str, protection_level: str, disclaimer: Optional[str] = None) -> GeometricFingerprint: |
|
|
"""Create complete Ghostprint fingerprint, now with an optional AI disclaimer.""" |
|
|
engine = InformationGeometryEngine() |
|
|
|
|
|
|
|
|
entropy_regions = engine.analyze_entropy_landscape(data) |
|
|
z_encoded = engine.complex_encode(data, enhanced_phase=True) |
|
|
cmt_signature = engine.cmt_transform(z_encoded, LensFunction.BESSEL) |
|
|
lehi_pattern = engine.generate_lehi_harmonics(data) |
|
|
srl_stability = engine.calculate_srl_stability(data) |
|
|
sefa_emergence = engine.calculate_sefa_emergence(cmt_signature) |
|
|
lgrm_map = engine.generate_lgrm_map(cmt_signature) |
|
|
|
|
|
cmt_views = [ |
|
|
engine.cmt_transform(z_encoded, LensFunction.BESSEL), |
|
|
engine.cmt_transform(z_encoded, LensFunction.GAMMA), |
|
|
engine.cmt_transform(z_encoded, LensFunction.AIRY) |
|
|
] |
|
|
holographic_field = engine.reconstruct_holographic_field(cmt_views) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"creator": creator, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"protection_level": protection_level, |
|
|
"data_hash": hashlib.sha256(data).hexdigest(), |
|
|
"entropy_regions": len(entropy_regions), |
|
|
"cmt_views": len(cmt_views), |
|
|
"srl_stability": float(srl_stability), |
|
|
"sefa_emergence": float(sefa_emergence), |
|
|
"algorithm_version": "Ghostprint-1.0-2025", |
|
|
"ai_poisoning_enabled": True, |
|
|
"geometry_mask_version": "G-F-H-v1.0", |
|
|
} |
|
|
if disclaimer: |
|
|
metadata["ai_disclaimer"] = disclaimer |
|
|
|
|
|
fingerprint = GeometricFingerprint( |
|
|
cmt_signature=cmt_signature, |
|
|
lehi_pattern=lehi_pattern, |
|
|
lgrm_map=lgrm_map, |
|
|
holographic_field=holographic_field, |
|
|
srl_stability=srl_stability, |
|
|
sefa_emergence=sefa_emergence, |
|
|
metadata=metadata |
|
|
) |
|
|
|
|
|
return fingerprint |
|
|
|
|
|
def embed_metadata_stamp(data: bytes, metadata: Dict, file_extension: str) -> bytes: |
|
|
""" |
|
|
Embeds metadata into a file as a 'digital rubber stamp' in a format-aware and non-destructive way. |
|
|
""" |
|
|
metadata_str = json.dumps(metadata) |
|
|
|
|
|
|
|
|
if file_extension == 'png': |
|
|
try: |
|
|
img = Image.open(BytesIO(data)) |
|
|
|
|
|
if img.mode not in ['RGB', 'RGBA', 'L', 'P']: |
|
|
img = img.convert('RGBA') |
|
|
|
|
|
info = PngImagePlugin.PngInfo() |
|
|
info.add_text("GhostprintFingerprint", metadata_str) |
|
|
output = BytesIO() |
|
|
img.save(output, format='PNG', pnginfo=info) |
|
|
return output.getvalue() |
|
|
except Exception as e: |
|
|
print(f"Error embedding PNG metadata: {e}") |
|
|
|
|
|
|
|
|
return data + b'\n<!-- GHOSTPRINT_METADATA:' + metadata_str.encode('utf-8') + b' -->\n' |
|
|
|
|
|
if file_extension in ['jpg', 'jpeg']: |
|
|
try: |
|
|
img = Image.open(BytesIO(data)) |
|
|
exif_data = img.getexif() |
|
|
exif_data[0x9286] = metadata_str.encode('utf-8') |
|
|
output = BytesIO() |
|
|
img.save(output, format='JPEG', exif=exif_data) |
|
|
return output.getvalue() |
|
|
except Exception as e: |
|
|
print(f"Error embedding JPEG metadata: {e}") |
|
|
return data |
|
|
|
|
|
|
|
|
if file_extension == 'json': |
|
|
try: |
|
|
json_data = json.loads(data.decode('utf-8')) |
|
|
json_data['_ghostprint_fingerprint'] = metadata |
|
|
return json.dumps(json_data, indent=2).encode('utf-8') |
|
|
except (json.JSONDecodeError, UnicodeDecodeError): |
|
|
pass |
|
|
|
|
|
|
|
|
stamp = f"\n\n--- GHOSTPRINT FINGERPRINT ---\n{json.dumps(metadata, indent=2)}\n--- END GHOSTPRINT ---\n" |
|
|
return data + stamp.encode('utf-8') |
|
|
|
|
|
def create_lite_fingerprinted_file(data: bytes, disclaimer: str, watermark_text: str, creator: str, filename: str) -> bytes: |
|
|
""" |
|
|
Creates a 'lite' fingerprinted file by embedding a metadata stamp (with AI disclaimer) |
|
|
and a separate hidden watermark for tamper-proofing. |
|
|
""" |
|
|
engine = InformationGeometryEngine() |
|
|
|
|
|
|
|
|
file_ext = filename.split('.')[-1] if '.' in filename else 'bin' |
|
|
fingerprint = create_ghostprint_fingerprint(data, creator, "Lite", disclaimer=disclaimer) |
|
|
|
|
|
|
|
|
if watermark_text: |
|
|
|
|
|
modified_data_bytes = engine.apply_subtle_geometric_watermark(data, fingerprint, watermark_text) |
|
|
watermark_method = "cmt_geometric" |
|
|
else: |
|
|
modified_data_bytes = data |
|
|
watermark_method = "none" |
|
|
|
|
|
|
|
|
fingerprint.metadata["watermark_method"] = watermark_method |
|
|
fingerprint.metadata["watermark_applied"] = watermark_text is not None |
|
|
|
|
|
|
|
|
stamped_data = embed_metadata_stamp(modified_data_bytes, fingerprint.metadata, file_ext) |
|
|
|
|
|
|
|
|
|
|
|
content_to_hash = modified_data_bytes |
|
|
|
|
|
|
|
|
fingerprint.metadata["content_hash"] = hashlib.sha256(content_to_hash).hexdigest() |
|
|
|
|
|
|
|
|
stamped_data = embed_metadata_stamp(modified_data_bytes, fingerprint.metadata, file_ext) |
|
|
|
|
|
return stamped_data |
|
|
|
|
|
|
|
|
|
|
|
def secure_encrypt_with_passphrase(data: bytes, user_passphrase: str, random_salt: bytes) -> bytes: |
|
|
""" |
|
|
Encrypts data using AES-256-GCM with a user-provided passphrase. |
|
|
Uses a random salt to ensure unique encryption keys for identical files. |
|
|
SECURITY: The passphrase is NEVER stored anywhere. |
|
|
""" |
|
|
|
|
|
iterations = get_config("pbkdf2_iterations") |
|
|
key_length = 32 |
|
|
aes_key = hashlib.pbkdf2_hmac('sha256', user_passphrase.encode('utf-8'), random_salt, iterations, key_length) |
|
|
|
|
|
|
|
|
aesgcm = AESGCM(aes_key) |
|
|
nonce = os.urandom(12) |
|
|
encrypted_data = aesgcm.encrypt(nonce, data, None) |
|
|
|
|
|
|
|
|
return nonce + encrypted_data |
|
|
|
|
|
def create_ghost_file_with_ai_poisoning(data: bytes, fingerprint: GeometricFingerprint, filename: str, |
|
|
user_passphrase: str, watermark_text: Optional[str] = None, |
|
|
create_zip: bool = False) -> bytes: |
|
|
""" |
|
|
Create .ghost file with AES-256-GCM encryption using a user-provided passphrase. |
|
|
The original data is NOT poisoned - only the encrypted container is. |
|
|
SECURITY: The passphrase is never stored in the file. |
|
|
""" |
|
|
|
|
|
if create_zip: |
|
|
import zipfile |
|
|
from io import BytesIO |
|
|
|
|
|
zip_buffer = BytesIO() |
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: |
|
|
|
|
|
zf.writestr(filename, data) |
|
|
|
|
|
|
|
|
data_to_encrypt = zip_buffer.getvalue() |
|
|
original_filename = filename |
|
|
filename = filename + ".zip" |
|
|
else: |
|
|
data_to_encrypt = data |
|
|
original_filename = filename |
|
|
|
|
|
|
|
|
fingerprint.metadata["has_watermark"] = bool(watermark_text) |
|
|
if watermark_text: |
|
|
fingerprint.metadata["watermark_text"] = watermark_text |
|
|
|
|
|
|
|
|
compressed_data = lzma.compress(data_to_encrypt) |
|
|
|
|
|
|
|
|
random_salt = os.urandom(32) |
|
|
|
|
|
|
|
|
encrypted_data = secure_encrypt_with_passphrase(compressed_data, user_passphrase, random_salt) |
|
|
|
|
|
|
|
|
file_ext = original_filename.split('.')[-1] if '.' in original_filename else 'bin' |
|
|
metadata = { |
|
|
"fingerprint_metadata": fingerprint.metadata, |
|
|
"lgrm_map_hash": hashlib.sha256(str(fingerprint.lgrm_map.tolist()).encode()).hexdigest(), |
|
|
"holographic_field_hash": hashlib.sha256(str(fingerprint.holographic_field).encode()).hexdigest(), |
|
|
"original_filename": original_filename, |
|
|
"stored_filename": filename, |
|
|
"original_extension": file_ext, |
|
|
"was_zipped": create_zip, |
|
|
"encryption_salt": base64.b64encode(random_salt).decode('utf-8'), |
|
|
"kdf_iterations": get_config("pbkdf2_iterations"), |
|
|
"container_poisoning": { |
|
|
"enabled": True, |
|
|
"type": "encrypted_container", |
|
|
"note": "Original data is NOT poisoned, only the encrypted container" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
header = b"GHOSTPRT0200" |
|
|
metadata_json = json.dumps(metadata).encode('utf-8') |
|
|
metadata_length = len(metadata_json).to_bytes(4, 'big') |
|
|
|
|
|
ghost_file = header + metadata_length + metadata_json + encrypted_data |
|
|
|
|
|
return ghost_file |
|
|
|
|
|
def secure_decrypt_with_passphrase(encrypted_data: bytes, user_passphrase: str, random_salt: bytes) -> bytes: |
|
|
""" |
|
|
Decrypts data using AES-256-GCM with a user-provided passphrase. |
|
|
The same passphrase and salt used for encryption must be provided. |
|
|
""" |
|
|
|
|
|
iterations = get_config("pbkdf2_iterations") |
|
|
key_length = 32 |
|
|
aes_key = hashlib.pbkdf2_hmac('sha256', user_passphrase.encode('utf-8'), random_salt, iterations, key_length) |
|
|
|
|
|
|
|
|
nonce = encrypted_data[:12] |
|
|
ciphertext = encrypted_data[12:] |
|
|
|
|
|
|
|
|
aesgcm = AESGCM(aes_key) |
|
|
try: |
|
|
decrypted_data = aesgcm.decrypt(nonce, ciphertext, None) |
|
|
return decrypted_data |
|
|
except Exception: |
|
|
raise ValueError("Decryption failed - invalid passphrase or corrupted data") |
|
|
|
|
|
def read_ghost_file_with_poisoning(ghost_data: bytes, user_passphrase: str, auto_unzip: bool = True) -> Tuple[bytes, Dict]: |
|
|
"""Read and decrypt .ghost file using a user-provided passphrase.""" |
|
|
|
|
|
if ghost_data.startswith(b"GHOSTPRT0200"): |
|
|
|
|
|
header_length = 12 |
|
|
elif ghost_data.startswith(b"GHOSTPRT0100"): |
|
|
|
|
|
header_length = 12 |
|
|
else: |
|
|
raise ValueError("Invalid ghost file format") |
|
|
|
|
|
metadata_length = int.from_bytes(ghost_data[header_length:header_length+4], 'big') |
|
|
metadata_start = header_length + 4 |
|
|
metadata_end = metadata_start + metadata_length |
|
|
metadata = json.loads(ghost_data[metadata_start:metadata_end].decode('utf-8')) |
|
|
|
|
|
encrypted_data = ghost_data[metadata_end:] |
|
|
|
|
|
|
|
|
encryption_salt = base64.b64decode(metadata["encryption_salt"]) |
|
|
|
|
|
|
|
|
compressed_data = secure_decrypt_with_passphrase(encrypted_data, user_passphrase, encryption_salt) |
|
|
|
|
|
try: |
|
|
decrypted_data = lzma.decompress(compressed_data) |
|
|
except lzma.LZMAError: |
|
|
raise ValueError("Decryption failed - data is corrupted") |
|
|
|
|
|
|
|
|
|
|
|
if auto_unzip and metadata.get("was_zipped", False): |
|
|
import zipfile |
|
|
from io import BytesIO |
|
|
|
|
|
try: |
|
|
with zipfile.ZipFile(BytesIO(decrypted_data), 'r') as zf: |
|
|
|
|
|
file_list = zf.namelist() |
|
|
if file_list: |
|
|
|
|
|
original_filename = metadata.get("original_filename", file_list[0]) |
|
|
|
|
|
if original_filename in file_list: |
|
|
decrypted_data = zf.read(original_filename) |
|
|
else: |
|
|
decrypted_data = zf.read(file_list[0]) |
|
|
except Exception as e: |
|
|
|
|
|
print(f"[DEBUG] Auto-unzip failed: {e}. Returning ZIP file.") |
|
|
|
|
|
return decrypted_data, metadata |
|
|
|
|
|
def remove_integrated_ai_poisoning(data: bytes, poison_type: str, strength: float, |
|
|
file_extension: str, fingerprint_meta: Dict) -> bytes: |
|
|
""" |
|
|
Reverses the non-destructive poisoning to restore the original data with 100% fidelity. |
|
|
""" |
|
|
engine = InformationGeometryEngine() |
|
|
|
|
|
|
|
|
watermark_to_remove = fingerprint_meta.get("watermark_text") |
|
|
if watermark_to_remove: |
|
|
data_without_watermark = embed_watermark(data, watermark_to_remove) |
|
|
else: |
|
|
data_without_watermark = data |
|
|
|
|
|
|
|
|
text_formats = ['txt', 'md', 'json', 'csv', 'html', 'xml', 'htm', 'ipynb', 'eml', 'msg'] |
|
|
if file_extension in text_formats: |
|
|
try: |
|
|
text_data = data_without_watermark.decode('utf-8') |
|
|
clean_text = engine.remove_hybrid_text_poisoning(text_data) |
|
|
return clean_text.encode('utf-8') |
|
|
except UnicodeDecodeError: |
|
|
return data_without_watermark |
|
|
|
|
|
return data_without_watermark |
|
|
|
|
|
|
|
|
|
|
|
def analyze_ai_poisoning_effectiveness(original_data: bytes, poisoned_data: bytes) -> Dict[str, float]: |
|
|
"""Analyze effectiveness of AI poisoning""" |
|
|
def file_entropy(data: bytes) -> float: |
|
|
if not data: |
|
|
return 0.0 |
|
|
freq = np.bincount(np.frombuffer(data, dtype=np.uint8), minlength=256) |
|
|
probs = freq / len(data) |
|
|
probs = probs[probs > 0] |
|
|
return -np.sum(probs * np.log2(probs)) |
|
|
|
|
|
def compressibility(data: bytes) -> float: |
|
|
if not data: |
|
|
return 1.0 |
|
|
compressed = lzma.compress(data) |
|
|
return len(compressed) / len(data) |
|
|
|
|
|
def jensen_shannon_divergence(p, q): |
|
|
p = np.array(p, dtype=float) |
|
|
q = np.array(q, dtype=float) |
|
|
p_sum = p.sum() |
|
|
q_sum = q.sum() |
|
|
|
|
|
if p_sum == 0 or q_sum == 0: |
|
|
return 0.0 |
|
|
|
|
|
p /= p_sum |
|
|
q /= q_sum |
|
|
|
|
|
m = (p + q) / 2 |
|
|
|
|
|
|
|
|
epsilon = 1e-12 |
|
|
|
|
|
kl_pm = np.sum(np.where(p != 0, p * np.log2(p / (m + epsilon) + epsilon), 0)) |
|
|
kl_qm = np.sum(np.where(q != 0, q * np.log2(q / (m + epsilon) + epsilon), 0)) |
|
|
|
|
|
jsd = (kl_pm + kl_qm) / 2 |
|
|
return jsd if np.isfinite(jsd) else 0.0 |
|
|
|
|
|
|
|
|
orig_entropy = file_entropy(original_data) |
|
|
pois_entropy = file_entropy(poisoned_data) |
|
|
orig_compression = compressibility(original_data) |
|
|
pois_compression = compressibility(poisoned_data) |
|
|
|
|
|
orig_histogram = np.bincount(np.frombuffer(original_data, dtype=np.uint8), minlength=256) |
|
|
pois_histogram = np.bincount(np.frombuffer(poisoned_data, dtype=np.uint8), minlength=256) |
|
|
jsd = jensen_shannon_divergence(orig_histogram, pois_histogram) |
|
|
|
|
|
min_length = min(len(original_data), len(poisoned_data)) |
|
|
byte_changes = sum(1 for i in range(min_length) if original_data[i] != poisoned_data[i]) |
|
|
change_percentage = (byte_changes / min_length) * 100 if min_length > 0 else 0 |
|
|
|
|
|
return { |
|
|
"jensen_shannon_divergence": float(jsd), |
|
|
"entropy_original": float(orig_entropy), |
|
|
"entropy_poisoned": float(pois_entropy), |
|
|
"entropy_delta": float(abs(orig_entropy - pois_entropy)), |
|
|
"compression_original": float(orig_compression), |
|
|
"compression_poisoned": float(pois_compression), |
|
|
"compression_delta": float(abs(orig_compression - pois_compression)), |
|
|
"byte_changes": int(byte_changes), |
|
|
"change_percentage": float(change_percentage), |
|
|
"ai_confusion_score": float(jsd * abs(orig_entropy - pois_entropy) * 10) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def embed_watermark(data: bytes, watermark_text: str) -> bytes: |
|
|
""" |
|
|
Embeds a custom watermark text into the data using a reversible XOR process |
|
|
at geometrically determined positions. A sentinel is added to make it detectable. |
|
|
""" |
|
|
if not watermark_text: |
|
|
return data |
|
|
|
|
|
sentinel = b"GPW::" |
|
|
watermark_bytes = sentinel + watermark_text.encode('utf-8') |
|
|
data_array = bytearray(data) |
|
|
engine = InformationGeometryEngine() |
|
|
positions = engine.golden_mask_positions(len(data_array)) |
|
|
|
|
|
for i, byte_to_embed in enumerate(watermark_bytes): |
|
|
pos_index = i % len(positions) |
|
|
target_pos = positions[pos_index] |
|
|
data_array[target_pos] ^= byte_to_embed |
|
|
|
|
|
return bytes(data_array) |
|
|
|
|
|
def verify_watermark(data: bytes, expected_watermark: str, file_extension: str) -> bool: |
|
|
""" |
|
|
Verifies watermark and detects tampering by checking content hash. |
|
|
""" |
|
|
if not expected_watermark: |
|
|
return False |
|
|
|
|
|
|
|
|
metadata_stamp = extract_metadata_stamp(data, file_extension) |
|
|
if not metadata_stamp: |
|
|
return False |
|
|
|
|
|
|
|
|
watermark_method = metadata_stamp.get("watermark_method", "none") |
|
|
|
|
|
if watermark_method == "cmt_geometric": |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stamp_marker = f"\n\n--- GHOSTPRINT FINGERPRINT ---".encode('utf-8') |
|
|
stamp_pos = data.rfind(stamp_marker) |
|
|
if stamp_pos != -1: |
|
|
watermarked_content = data[:stamp_pos] |
|
|
else: |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_content_hash = hashlib.sha256(watermarked_content).hexdigest() |
|
|
expected_content_hash = metadata_stamp.get("content_hash") |
|
|
|
|
|
if current_content_hash != expected_content_hash: |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] CMT Geometric watermark verification error: {e}") |
|
|
return False |
|
|
|
|
|
elif watermark_method == "geometric": |
|
|
return verify_content_integrity(data, metadata_stamp, file_extension) |
|
|
|
|
|
else: |
|
|
return metadata_stamp.get("watermark_applied", False) |
|
|
|
|
|
|
|
|
def verify_content_integrity(data: bytes, metadata: Dict, file_extension: str) -> bool: |
|
|
""" |
|
|
Verifies that the content hasn't been tampered with by checking hashes. |
|
|
""" |
|
|
if not metadata.get("content_hash"): |
|
|
return False |
|
|
|
|
|
|
|
|
stamp_marker = f"\n\n--- GHOSTPRINT FINGERPRINT ---".encode('utf-8') |
|
|
stamp_pos = data.rfind(stamp_marker) |
|
|
|
|
|
if stamp_pos != -1: |
|
|
content_to_hash = data[:stamp_pos] |
|
|
else: |
|
|
|
|
|
content_to_hash = data |
|
|
|
|
|
|
|
|
|
|
|
if file_extension.lower() in ['png', 'jpg', 'jpeg']: |
|
|
try: |
|
|
|
|
|
img_metadata = extract_metadata_stamp(data, file_extension) |
|
|
return img_metadata is not None |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
current_hash = hashlib.sha256(content_to_hash).hexdigest() |
|
|
stored_hash = metadata.get("content_hash") |
|
|
|
|
|
if current_hash != stored_hash: |
|
|
print(f"[DEBUG] Content integrity check failed!") |
|
|
print(f"[DEBUG] Expected hash: {stored_hash}") |
|
|
print(f"[DEBUG] Current hash: {current_hash}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def verify_ghost_file(ghost_data: bytes) -> Optional[Dict]: |
|
|
""" |
|
|
Comprehensively verifies and extracts metadata from .ghost files. |
|
|
Supports both legacy (0100) and secure (0200) formats. |
|
|
""" |
|
|
try: |
|
|
|
|
|
if ghost_data.startswith(b"GHOSTPRT0200"): |
|
|
version = "2.0-SECURE" |
|
|
header_length = 12 |
|
|
elif ghost_data.startswith(b"GHOSTPRT0100"): |
|
|
version = "1.0-LEGACY" |
|
|
header_length = 12 |
|
|
else: |
|
|
return None |
|
|
|
|
|
|
|
|
metadata_length = int.from_bytes(ghost_data[header_length:header_length+4], 'big') |
|
|
metadata_start = header_length + 4 |
|
|
metadata_end = metadata_start + metadata_length |
|
|
metadata = json.loads(ghost_data[metadata_start:metadata_end].decode('utf-8')) |
|
|
|
|
|
|
|
|
verification_info = { |
|
|
"ghost_file_version": version, |
|
|
"file_size_bytes": len(ghost_data), |
|
|
"metadata_size_bytes": metadata_length, |
|
|
"encrypted_data_size_bytes": len(ghost_data) - metadata_end, |
|
|
"verification_timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
metadata["_verification_info"] = verification_info |
|
|
|
|
|
return metadata |
|
|
|
|
|
except (IndexError, json.JSONDecodeError, KeyError) as e: |
|
|
return None |
|
|
|
|
|
def extract_metadata_stamp(data: bytes, file_extension: str) -> Optional[Dict]: |
|
|
""" |
|
|
Extracts a metadata stamp from a file using format-aware and non-destructive methods. |
|
|
""" |
|
|
|
|
|
if file_extension == 'png': |
|
|
try: |
|
|
img = Image.open(BytesIO(data)) |
|
|
|
|
|
metadata_str = img.info.get("GhostprintFingerprint") |
|
|
if metadata_str: |
|
|
return json.loads(metadata_str) |
|
|
|
|
|
|
|
|
protection_str = img.info.get("GhostprintProtection") |
|
|
if protection_str: |
|
|
metadata = json.loads(protection_str) |
|
|
|
|
|
metadata['_ghostprint_type'] = 'invisible_armor' |
|
|
return metadata |
|
|
except Exception as e: |
|
|
print(f"[DEBUG] PNG metadata extraction error: {e}") |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
import struct |
|
|
|
|
|
|
|
|
if data[:8] != b'\x89PNG\r\n\x1a\n': |
|
|
return None |
|
|
|
|
|
|
|
|
pos = 8 |
|
|
|
|
|
while pos < len(data) - 12: |
|
|
|
|
|
chunk_length = struct.unpack('>I', data[pos:pos+4])[0] |
|
|
pos += 4 |
|
|
|
|
|
|
|
|
chunk_type = data[pos:pos+4] |
|
|
pos += 4 |
|
|
|
|
|
|
|
|
if chunk_type == b'gpAI': |
|
|
|
|
|
|
|
|
print("[DEBUG] Found gpAI chunk in PNG - this is an Invisible Armor protected image") |
|
|
|
|
|
|
|
|
return { |
|
|
"protection_type": "invisible_armor_png", |
|
|
"has_gpAI_chunk": True, |
|
|
"chunk_size": chunk_length, |
|
|
"message": "This PNG contains Invisible Armor protection with embedded AI-confusing data" |
|
|
} |
|
|
|
|
|
|
|
|
pos += chunk_length + 4 |
|
|
|
|
|
|
|
|
if chunk_type == b'IEND': |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] PNG chunk parsing error: {e}") |
|
|
pass |
|
|
|
|
|
if file_extension in ['jpg', 'jpeg']: |
|
|
try: |
|
|
img = Image.open(BytesIO(data)) |
|
|
exif_data = img.getexif() |
|
|
metadata_str = exif_data.get(0x9286) |
|
|
if metadata_str: |
|
|
return json.loads(metadata_str.decode('utf-8')) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
if file_extension == 'json': |
|
|
try: |
|
|
json_data = json.loads(data.decode('utf-8')) |
|
|
return json_data.get('_ghostprint_fingerprint') |
|
|
except (json.JSONDecodeError, UnicodeDecodeError): |
|
|
pass |
|
|
|
|
|
|
|
|
if file_extension == 'png': |
|
|
|
|
|
try: |
|
|
comment_marker = b'<!-- GHOSTPRINT_METADATA:' |
|
|
comment_end = b' -->' |
|
|
if comment_marker in data: |
|
|
start = data.rfind(comment_marker) + len(comment_marker) |
|
|
end = data.rfind(comment_end) |
|
|
if start < end: |
|
|
metadata_str = data[start:end].decode('utf-8') |
|
|
return json.loads(metadata_str) |
|
|
except Exception as e: |
|
|
print(f"[DEBUG] PNG comment metadata extraction error: {e}") |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
if file_extension in ['pdf', 'png', 'jpg', 'jpeg']: |
|
|
|
|
|
tail_size = min(10000, len(data)) |
|
|
tail_data = data[-tail_size:] |
|
|
try: |
|
|
text_data = tail_data.decode('utf-8', errors='ignore') |
|
|
except: |
|
|
text_data = data.decode('utf-8', errors='ignore') |
|
|
else: |
|
|
text_data = data.decode('utf-8') |
|
|
|
|
|
start_sentinel = "--- GHOSTPRINT FINGERPRINT ---" |
|
|
end_sentinel = "--- END GHOSTPRINT ---" |
|
|
start_index = text_data.rfind(start_sentinel) |
|
|
end_index = text_data.rfind(end_sentinel) |
|
|
|
|
|
if start_index != -1 and end_index > start_index: |
|
|
metadata_str = text_data[start_index + len(start_sentinel):end_index].strip() |
|
|
return json.loads(metadata_str) |
|
|
except Exception as e: |
|
|
print(f"[DEBUG] Error extracting metadata stamp: {e}") |
|
|
pass |
|
|
|
|
|
return None |
|
|
|
|
|
def create_preview_safe_armor(image_data: bytes, creator: str, |
|
|
custom_watermark_text: Optional[str] = None, |
|
|
ai_disclaimer_text: Optional[str] = None, |
|
|
strength: float = 1.0) -> Tuple[bytes, Dict[str, Any]]: |
|
|
""" |
|
|
Creates a special dual-purpose image file: |
|
|
1. The main image data is completely untouched (perfect for OS previews) |
|
|
2. Embeds heavily armored data in metadata/alternate streams |
|
|
3. When AI tries to process the file, it encounters the armored version |
|
|
|
|
|
This is achieved by: |
|
|
- For PNG: Using custom chunks to store armored data |
|
|
- For JPEG: Using APP segments to store armored data |
|
|
- The OS preview shows the clean image, but AI processing hits the armor |
|
|
""" |
|
|
engine = InformationGeometryEngine() |
|
|
|
|
|
|
|
|
fingerprint = create_ghostprint_fingerprint( |
|
|
data=image_data, |
|
|
creator=creator, |
|
|
protection_level="preview_safe_armor", |
|
|
disclaimer=ai_disclaimer_text |
|
|
) |
|
|
|
|
|
|
|
|
is_jpeg = image_data.startswith(b'\xff\xd8\xff') |
|
|
is_png = image_data.startswith(b'\x89PNG\r\n\x1a\n') |
|
|
|
|
|
|
|
|
if is_jpeg: |
|
|
armored_data = engine.apply_ultra_subtle_armor(image_data) |
|
|
elif is_png: |
|
|
armored_data = engine.apply_ultra_subtle_armor(image_data) |
|
|
else: |
|
|
armored_data = engine.apply_ultra_subtle_armor(image_data) |
|
|
|
|
|
|
|
|
if is_png: |
|
|
try: |
|
|
|
|
|
output = BytesIO() |
|
|
|
|
|
|
|
|
img = Image.open(BytesIO(image_data)) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"creator": creator, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"protection_level": "preview_safe_armor", |
|
|
"protection_type": "dual_data_streams", |
|
|
"original_hash": hashlib.sha256(image_data).hexdigest(), |
|
|
"armored_hash": hashlib.sha256(armored_data).hexdigest(), |
|
|
"watermark_embedded": False, |
|
|
"ai_disclaimer": ai_disclaimer_text, |
|
|
"format_preserved": False if is_jpeg else True, |
|
|
"format_conversion": "JPEG_to_PNG" if is_jpeg else "none", |
|
|
"human_visibility": "perfect_original_preview", |
|
|
"ai_protection": "hidden_armor_stream", |
|
|
"armor_location": "custom_chunks", |
|
|
"ai_confusion_level": "extreme", |
|
|
"crc_trap": True |
|
|
} |
|
|
|
|
|
|
|
|
info = PngImagePlugin.PngInfo() |
|
|
info.add_text("GhostprintProtection", json.dumps(metadata)) |
|
|
|
|
|
|
|
|
|
|
|
armored_img = Image.open(BytesIO(armored_data)) |
|
|
|
|
|
temp_output = BytesIO() |
|
|
armored_img.save(temp_output, format='PNG', pnginfo=info) |
|
|
|
|
|
|
|
|
temp_output.seek(0) |
|
|
png_data = temp_output.read() |
|
|
|
|
|
|
|
|
output.write(b'\x89PNG\r\n\x1a\n') |
|
|
|
|
|
|
|
|
pos = 8 |
|
|
chunks_written = [] |
|
|
|
|
|
while pos < len(png_data) - 12: |
|
|
|
|
|
chunk_length = int.from_bytes(png_data[pos:pos+4], 'big') |
|
|
chunk_type = png_data[pos+4:pos+8] |
|
|
|
|
|
|
|
|
if chunk_type == b'IEND': |
|
|
break |
|
|
|
|
|
|
|
|
chunk_data = png_data[pos:pos+12+chunk_length] |
|
|
output.write(chunk_data) |
|
|
chunks_written.append(chunk_type) |
|
|
|
|
|
pos += 12 + chunk_length |
|
|
|
|
|
|
|
|
|
|
|
chunk_type = b'gpAI' |
|
|
|
|
|
|
|
|
import zlib |
|
|
compressed_armor = zlib.compress(armored_data, 9) |
|
|
|
|
|
|
|
|
chunk_data = compressed_armor |
|
|
chunk_length = len(chunk_data).to_bytes(4, 'big') |
|
|
|
|
|
|
|
|
import struct |
|
|
crc = zlib.crc32(chunk_type + chunk_data) & 0xffffffff |
|
|
chunk_crc = struct.pack('>I', crc) |
|
|
|
|
|
|
|
|
output.write(chunk_length) |
|
|
output.write(chunk_type) |
|
|
output.write(chunk_data) |
|
|
output.write(chunk_crc) |
|
|
|
|
|
|
|
|
output.write(b'\x00\x00\x00\x00IEND\xaeB`\x82') |
|
|
|
|
|
protected_data = output.getvalue() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] Preview-safe PNG armor failed: {e}") |
|
|
|
|
|
protected_data = image_data |
|
|
|
|
|
elif is_jpeg: |
|
|
try: |
|
|
|
|
|
print("[INFO] Converting JPEG to PNG for better armor compatibility...") |
|
|
|
|
|
|
|
|
img = Image.open(BytesIO(image_data)) |
|
|
|
|
|
|
|
|
if img.mode not in ('RGB', 'L'): |
|
|
img = img.convert('RGB') |
|
|
|
|
|
|
|
|
png_output = BytesIO() |
|
|
img.save(png_output, format='PNG', optimize=True) |
|
|
png_data = png_output.getvalue() |
|
|
|
|
|
|
|
|
armored_data = engine.apply_invisible_armor_image_poisoning(png_data, strength=strength) |
|
|
|
|
|
|
|
|
img = Image.open(BytesIO(armored_data)) |
|
|
original_mode = img.mode |
|
|
info = PngImagePlugin.PngInfo() |
|
|
|
|
|
|
|
|
info.add_text("Creator", creator) |
|
|
if custom_watermark_text: |
|
|
info.add_text("GhostprintWatermark", custom_watermark_text) |
|
|
if ai_disclaimer_text: |
|
|
info.add_text("AIDisclaimer", ai_disclaimer_text) |
|
|
|
|
|
info.add_text("GhostprintProtected", "true") |
|
|
info.add_text("ProtectionTimestamp", datetime.now().isoformat()) |
|
|
info.add_text("ProtectionStrength", str(strength)) |
|
|
info.add_text("OriginalFormat", "JPEG") |
|
|
|
|
|
|
|
|
temp_output = BytesIO() |
|
|
img.save(temp_output, format='PNG', pnginfo=info) |
|
|
|
|
|
|
|
|
temp_output.seek(0) |
|
|
png_data_with_metadata = temp_output.read() |
|
|
|
|
|
|
|
|
output = BytesIO() |
|
|
|
|
|
|
|
|
output.write(b'\x89PNG\r\n\x1a\n') |
|
|
|
|
|
|
|
|
iend_pos = png_data_with_metadata.rfind(b'IEND') |
|
|
if iend_pos == -1: |
|
|
iend_pos = len(png_data_with_metadata) - 12 |
|
|
|
|
|
|
|
|
output.write(png_data_with_metadata[8:iend_pos - 4]) |
|
|
|
|
|
|
|
|
chunk_type = b'gpAI' |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"creator": creator, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"protection_level": "preview_safe_armor", |
|
|
"protection_type": "dual_data_streams", |
|
|
"original_hash": hashlib.sha256(image_data).hexdigest(), |
|
|
"armored_hash": hashlib.sha256(armored_data).hexdigest(), |
|
|
"watermark_embedded": bool(custom_watermark_text), |
|
|
"ai_disclaimer": ai_disclaimer_text, |
|
|
} |
|
|
|
|
|
metadata_extended = metadata.copy() |
|
|
metadata_extended['original_format'] = 'JPEG' |
|
|
metadata_extended['converted_to_png'] = True |
|
|
|
|
|
import zlib |
|
|
chunk_data = json.dumps(metadata_extended).encode('utf-8') |
|
|
compressed_data = zlib.compress(chunk_data) |
|
|
|
|
|
|
|
|
|
|
|
chunk_data = compressed_data |
|
|
chunk_length = len(chunk_data).to_bytes(4, 'big') |
|
|
|
|
|
|
|
|
import struct |
|
|
crc = zlib.crc32(chunk_type + chunk_data) & 0xffffffff |
|
|
chunk_crc = struct.pack('>I', crc) |
|
|
|
|
|
|
|
|
output.write(chunk_length) |
|
|
output.write(chunk_type) |
|
|
output.write(chunk_data) |
|
|
output.write(chunk_crc) |
|
|
|
|
|
|
|
|
output.write(png_data_with_metadata[iend_pos - 4:]) |
|
|
|
|
|
protected_data = output.getvalue() |
|
|
|
|
|
|
|
|
is_jpeg = False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] JPEG to PNG armor conversion failed: {e}") |
|
|
|
|
|
protected_data = engine.apply_ultra_subtle_armor(image_data) |
|
|
else: |
|
|
|
|
|
protected_data = image_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
original_was_jpeg = image_data.startswith(b'\xff\xd8\xff') |
|
|
metadata = { |
|
|
"creator": creator, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"protection_level": "preview_safe_armor", |
|
|
"protection_type": "dual_data_streams", |
|
|
"original_hash": hashlib.sha256(image_data).hexdigest(), |
|
|
"protected_hash": hashlib.sha256(protected_data).hexdigest(), |
|
|
"armored_hash": hashlib.sha256(armored_data).hexdigest(), |
|
|
"watermark_embedded": False, |
|
|
"ai_disclaimer": ai_disclaimer_text, |
|
|
"format_preserved": not original_was_jpeg, |
|
|
"format_conversion": "JPEG_to_PNG" if original_was_jpeg else "none", |
|
|
"human_visibility": "perfect_original_preview", |
|
|
"ai_protection": "hidden_armor_stream", |
|
|
"armor_location": "custom_chunks", |
|
|
"ai_confusion_level": "extreme", |
|
|
"crc_trap": True |
|
|
} |
|
|
|
|
|
return protected_data, metadata |
|
|
|
|
|
def create_dual_layer_protected_image(image_data: bytes, creator: str, |
|
|
custom_watermark_text: Optional[str] = None, |
|
|
ai_disclaimer_text: Optional[str] = None, |
|
|
strength: float = 1.0) -> Tuple[bytes, Dict[str, Any]]: |
|
|
""" |
|
|
Applies dual-layer protection with adjustable strength |
|
|
""" |
|
|
return create_preview_safe_armor(image_data, creator, custom_watermark_text, ai_disclaimer_text, strength) |
|
|
|
|
|
def create_artist_image_protection(image_data: bytes, creator: str, |
|
|
ai_disclaimer_text: Optional[str] = None, |
|
|
strength: float = 1.0, |
|
|
focus_parameter: float = 3.5, |
|
|
frequency_strategy: str = 'auto', |
|
|
enable_skil: bool = False, |
|
|
stochastic_alpha: float = 0.85, |
|
|
stochastic_beta: float = 0.15) -> Tuple[bytes, Dict[str, Any]]: |
|
|
""" |
|
|
Streamlined primary entry point for applying Invisible Armor. |
|
|
It now performs a single, efficient pass that returns both the protected image and its metrics. |
|
|
|
|
|
SKIL Defense Parameters: |
|
|
enable_skil: Enable the stochastic SKIL defense layer. |
|
|
stochastic_alpha: Weight for deterministic armor component (default 0.85) |
|
|
stochastic_beta: Weight for stochastic mask component (default 0.15) |
|
|
""" |
|
|
is_jpeg = image_data.startswith(b'\xff\xd8\xff') |
|
|
original_image_data = image_data |
|
|
|
|
|
if is_jpeg: |
|
|
print("[INFO] Converting JPEG to PNG for optimal armor compatibility.") |
|
|
img = Image.open(BytesIO(image_data)) |
|
|
if img.mode != 'RGB': img = img.convert('RGB') |
|
|
png_buffer = BytesIO() |
|
|
img.save(png_buffer, format='PNG', optimize=True) |
|
|
image_data = png_buffer.getvalue() |
|
|
|
|
|
protected_data = None |
|
|
metrics = {} |
|
|
try: |
|
|
import tempfile |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_input: |
|
|
temp_input.write(image_data) |
|
|
temp_input_path = temp_input.name |
|
|
temp_output_path = temp_input_path + ".armored.png" |
|
|
|
|
|
armor_gen = ArmorGenerator() |
|
|
|
|
|
|
|
|
armor_config = ArmorConfig( |
|
|
enable_stochastic=enable_skil, |
|
|
stochastic_alpha=stochastic_alpha, |
|
|
stochastic_beta=stochastic_beta |
|
|
) |
|
|
|
|
|
|
|
|
metrics = armor_gen.apply_to_image( |
|
|
image_path=temp_input_path, |
|
|
out_path=temp_output_path, |
|
|
cfg=armor_config, |
|
|
strength=strength, |
|
|
focus_parameter=focus_parameter, |
|
|
frequency_strategy=frequency_strategy, |
|
|
return_metrics=True |
|
|
) or {} |
|
|
|
|
|
with open(temp_output_path, "rb") as f: |
|
|
protected_data = f.read() |
|
|
|
|
|
os.remove(temp_input_path) |
|
|
os.remove(temp_output_path) |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Core armor application failed: {e}") |
|
|
|
|
|
protected_data = image_data |
|
|
metrics = {"error": str(e)} |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"creator": creator, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"protection_type": "invisible_armor_v2", |
|
|
"original_hash": hashlib.sha256(original_image_data).hexdigest(), |
|
|
"protected_hash": hashlib.sha256(protected_data).hexdigest(), |
|
|
"ai_disclaimer": ai_disclaimer_text, |
|
|
"format_conversion": "JPEG_to_PNG" if is_jpeg else "none", |
|
|
"armor_config": { |
|
|
"strength": strength, "focus_parameter": focus_parameter, |
|
|
"frequency_strategy": frequency_strategy, "stealth_mode": True, |
|
|
"skil_defense_enabled": enable_skil, |
|
|
"stochastic_alpha": stochastic_alpha, "stochastic_beta": stochastic_beta |
|
|
}, |
|
|
"performance_metrics": metrics |
|
|
} |
|
|
|
|
|
final_image = Image.open(BytesIO(protected_data)) |
|
|
info = PngImagePlugin.PngInfo() |
|
|
info.add_text("GhostprintProtection", json.dumps(metadata)) |
|
|
|
|
|
output_buffer = BytesIO() |
|
|
final_image.save(output_buffer, format='PNG', pnginfo=info) |
|
|
|
|
|
return output_buffer.getvalue(), metadata |
|
|
|
|
|
__all__ = [ |
|
|
'InformationGeometryEngine', 'GeometricFingerprint', 'LensFunction', |
|
|
'create_ghostprint_fingerprint', 'apply_integrated_ai_poisoning', |
|
|
'create_ghost_file_with_ai_poisoning', 'read_ghost_file_with_poisoning', |
|
|
'analyze_ai_poisoning_effectiveness', 'create_lite_fingerprinted_file', |
|
|
'verify_ghost_file', 'verify_watermark', 'verify_content_integrity', |
|
|
'extract_metadata_stamp', 'create_artist_image_protection', |
|
|
|
|
|
"Ring", "ArmorConfig", "PureArmorSpec", "ArmorMeta", "ArmorGenerator", |
|
|
"apply_delta_autosize", "analyze_image_bands", "analyze_array_bands", |
|
|
] |