|
|
|
|
|
""" |
|
|
Test script to verify video processing functionality. |
|
|
Creates a synthetic test video and tests the prediction pipeline. |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import tempfile |
|
|
import logging |
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
from PIL import Image, ImageDraw |
|
|
import cv2 |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
def create_synthetic_video(output_path: Path, duration_seconds: float = 2.0, fps: int = 24): |
|
|
"""Create a synthetic test video with simple animation.""" |
|
|
|
|
|
width, height = 640, 480 |
|
|
total_frames = int(duration_seconds * fps) |
|
|
|
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) |
|
|
|
|
|
logging.info(f"Creating synthetic video: {total_frames} frames at {fps} FPS") |
|
|
|
|
|
for frame_num in range(total_frames): |
|
|
|
|
|
frame = np.zeros((height, width, 3), dtype=np.uint8) |
|
|
|
|
|
|
|
|
for y in range(height): |
|
|
intensity = int(255 * (y / height)) |
|
|
frame[y, :] = [intensity // 3, intensity // 2, intensity] |
|
|
|
|
|
|
|
|
center_x = int(width * (0.2 + 0.6 * frame_num / total_frames)) |
|
|
center_y = height // 2 |
|
|
radius = 30 + int(20 * np.sin(frame_num * 0.3)) |
|
|
|
|
|
|
|
|
pil_frame = Image.fromarray(frame) |
|
|
draw = ImageDraw.Draw(pil_frame) |
|
|
|
|
|
|
|
|
left = center_x - radius |
|
|
top = center_y - radius |
|
|
right = center_x + radius |
|
|
bottom = center_y + radius |
|
|
draw.ellipse([left, top, right, bottom], fill=(255, 255, 0)) |
|
|
|
|
|
|
|
|
draw.text((50, 50), f"Frame {frame_num}", fill=(255, 255, 255)) |
|
|
draw.text((50, 80), "Synthetic Action", fill=(255, 255, 255)) |
|
|
|
|
|
|
|
|
frame = np.array(pil_frame) |
|
|
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
out.write(frame_bgr) |
|
|
|
|
|
out.release() |
|
|
logging.info(f"β Created synthetic video: {output_path}") |
|
|
return output_path |
|
|
|
|
|
def test_video_reading(): |
|
|
"""Test video reading functionality without full model inference.""" |
|
|
|
|
|
logging.info("=== Testing Video Reading ===") |
|
|
|
|
|
try: |
|
|
from predict import _read_video_frames, normalize_frames |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir: |
|
|
tmp_path = Path(tmp_dir) |
|
|
video_path = tmp_path / "test_video.mp4" |
|
|
|
|
|
|
|
|
create_synthetic_video(video_path, duration_seconds=1.0, fps=12) |
|
|
|
|
|
|
|
|
logging.info("Testing frame reading...") |
|
|
frames = _read_video_frames(video_path, num_frames=8) |
|
|
|
|
|
if not frames: |
|
|
logging.error("β No frames extracted") |
|
|
return False |
|
|
|
|
|
logging.info(f"β Extracted {len(frames)} frames") |
|
|
|
|
|
|
|
|
logging.info("Testing frame normalization...") |
|
|
normalized = normalize_frames(frames, required_frames=8) |
|
|
|
|
|
if len(normalized) != 8: |
|
|
logging.error(f"β Expected 8 frames, got {len(normalized)}") |
|
|
return False |
|
|
|
|
|
logging.info("β Frame normalization successful") |
|
|
|
|
|
|
|
|
for i, frame in enumerate(normalized): |
|
|
if frame.size != (224, 224): |
|
|
logging.error(f"β Frame {i} has wrong size: {frame.size}") |
|
|
return False |
|
|
if frame.mode != 'RGB': |
|
|
logging.error(f"β Frame {i} has wrong mode: {frame.mode}") |
|
|
return False |
|
|
|
|
|
logging.info("β All frames have correct properties") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Video reading test failed: {e}") |
|
|
return False |
|
|
|
|
|
def test_tensor_creation(): |
|
|
"""Test tensor creation from frames.""" |
|
|
|
|
|
logging.info("=== Testing Tensor Creation ===") |
|
|
|
|
|
try: |
|
|
from predict import create_tensor_from_frames |
|
|
import torch |
|
|
|
|
|
|
|
|
frames = [] |
|
|
for i in range(8): |
|
|
frame = Image.new('RGB', (224, 224), (i*30 % 255, 100, 150)) |
|
|
frames.append(frame) |
|
|
|
|
|
logging.info("Testing tensor creation...") |
|
|
tensor = create_tensor_from_frames(frames, processor=None) |
|
|
|
|
|
|
|
|
expected_shape = (1, 3, 8, 224, 224) |
|
|
if tensor.shape != expected_shape: |
|
|
logging.error(f"β Expected shape {expected_shape}, got {tensor.shape}") |
|
|
return False |
|
|
|
|
|
logging.info(f"β Tensor created with correct shape: {tensor.shape}") |
|
|
|
|
|
|
|
|
if tensor.min() < 0 or tensor.max() > 1: |
|
|
logging.warning(f"β Tensor values outside [0,1]: [{tensor.min():.3f}, {tensor.max():.3f}]") |
|
|
|
|
|
logging.info("β Tensor creation successful") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Tensor creation test failed: {e}") |
|
|
return False |
|
|
|
|
|
def test_full_pipeline(): |
|
|
"""Test the complete prediction pipeline with a synthetic video.""" |
|
|
|
|
|
logging.info("=== Testing Full Pipeline ===") |
|
|
|
|
|
try: |
|
|
from predict import predict_actions |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir: |
|
|
tmp_path = Path(tmp_dir) |
|
|
video_path = tmp_path / "test_video.mp4" |
|
|
|
|
|
|
|
|
create_synthetic_video(video_path, duration_seconds=2.0, fps=15) |
|
|
|
|
|
logging.info("Running full prediction pipeline...") |
|
|
|
|
|
|
|
|
results = predict_actions(str(video_path), top_k=3) |
|
|
|
|
|
if not results: |
|
|
logging.error("β No predictions returned") |
|
|
return False |
|
|
|
|
|
logging.info(f"β Got {len(results)} predictions") |
|
|
|
|
|
|
|
|
for i, (label, confidence) in enumerate(results, 1): |
|
|
logging.info(f" {i}. {label}: {confidence:.3f}") |
|
|
|
|
|
|
|
|
if len(results) != 3: |
|
|
logging.error(f"β Expected 3 results, got {len(results)}") |
|
|
return False |
|
|
|
|
|
for label, confidence in results: |
|
|
if not isinstance(label, str) or not isinstance(confidence, float): |
|
|
logging.error(f"β Invalid result format: {label}, {confidence}") |
|
|
return False |
|
|
if confidence < 0 or confidence > 1: |
|
|
logging.error(f"β Invalid confidence: {confidence}") |
|
|
return False |
|
|
|
|
|
logging.info("β Full pipeline test successful") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"β Full pipeline test failed: {e}") |
|
|
logging.exception("Full error traceback:") |
|
|
return False |
|
|
|
|
|
def main(): |
|
|
"""Run all tests.""" |
|
|
|
|
|
print("π§ͺ Video Processing Test Suite") |
|
|
print("=" * 50) |
|
|
|
|
|
tests = [ |
|
|
("Video Reading", test_video_reading), |
|
|
("Tensor Creation", test_tensor_creation), |
|
|
("Full Pipeline", test_full_pipeline), |
|
|
] |
|
|
|
|
|
passed = 0 |
|
|
total = len(tests) |
|
|
|
|
|
for test_name, test_func in tests: |
|
|
print(f"\nπ Running: {test_name}") |
|
|
print("-" * 30) |
|
|
|
|
|
try: |
|
|
if test_func(): |
|
|
print(f"β
{test_name} PASSED") |
|
|
passed += 1 |
|
|
else: |
|
|
print(f"β {test_name} FAILED") |
|
|
except Exception as e: |
|
|
print(f"π₯ {test_name} CRASHED: {e}") |
|
|
logging.exception(f"Test {test_name} crashed:") |
|
|
|
|
|
print(f"\nπ Test Results: {passed}/{total} tests passed") |
|
|
|
|
|
if passed == total: |
|
|
print("π All tests passed! Video processing is working correctly.") |
|
|
return 0 |
|
|
else: |
|
|
print("β οΈ Some tests failed. Check the logs above for details.") |
|
|
return 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |
|
|
|