""" Real-Time Interview Recording and Violation Detection System UPDATED VERSION: - Fixed cv2.FONT_HERSHEY_BOLD error (use FONT_HERSHEY_SIMPLEX) - Captures violation images - Continues to next question after violation - Stores violation metadata for display in results """ import cv2 import numpy as np import threading import time import tempfile import os import speech_recognition as sr import warnings from collections import deque warnings.filterwarnings('ignore') os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' class RecordingSystem: """Handles video/audio recording with real-time violation detection""" def __init__(self, models_dict): """ Initialize recording system with loaded models Args: models_dict: Dictionary containing pre-loaded AI models """ self.models = models_dict self.violation_detected = False self.violation_reason = "" # Frame boundaries (for sitting position: left, right, top only) self.frame_margin = 50 # pixels from edge # Position adjustment tracking self.position_adjusted = False self.baseline_environment = None # Store initial environment scan # Violation storage directory self.violation_images_dir = tempfile.mkdtemp(prefix="violations_") # Initialize pose detection if available try: import mediapipe as mp self.mp_pose = mp.solutions.pose self.pose_detector = self.mp_pose.Pose( static_image_mode=False, model_complexity=1, smooth_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) self.pose_available = True except: self.pose_detector = None self.pose_available = False def save_violation_image(self, frame, question_number, violation_reason): """ Save an image of the violation for later display FIXED: Changed cv2.FONT_HERSHEY_BOLD to cv2.FONT_HERSHEY_SIMPLEX Args: frame: BGR image frame showing the violation question_number: Current question number violation_reason: Description of the violation Returns: Path to saved violation image """ try: # Create filename with timestamp timestamp = int(time.time() * 1000) filename = f"violation_q{question_number}_{timestamp}.jpg" filepath = os.path.join(self.violation_images_dir, filename) # Add violation text overlay to image overlay_frame = frame.copy() h, w = overlay_frame.shape[:2] # Add semi-transparent red overlay red_overlay = overlay_frame.copy() cv2.rectangle(red_overlay, (0, 0), (w, h), (0, 0, 255), -1) overlay_frame = cv2.addWeighted(overlay_frame, 0.7, red_overlay, 0.3, 0) # Add thick red border cv2.rectangle(overlay_frame, (0, 0), (w-1, h-1), (0, 0, 255), 10) # Add violation text with background - FIXED FONT text = "VIOLATION DETECTED" cv2.rectangle(overlay_frame, (0, 0), (w, 80), (0, 0, 0), -1) cv2.putText(overlay_frame, text, (w//2 - 200, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) # FIXED: Was FONT_HERSHEY_BOLD # Add violation reason at bottom cv2.rectangle(overlay_frame, (0, h-100), (w, h), (0, 0, 0), -1) # Split long violation text into multiple lines words = violation_reason.split() lines = [] current_line = "" for word in words: test_line = current_line + " " + word if current_line else word if len(test_line) > 50: lines.append(current_line) current_line = word else: current_line = test_line if current_line: lines.append(current_line) # Draw violation reason lines y_offset = h - 90 for line in lines[:2]: # Max 2 lines cv2.putText(overlay_frame, line, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) y_offset += 30 # Save image cv2.imwrite(filepath, overlay_frame) return filepath except Exception as e: print(f"Error saving violation image: {e}") return None def scan_environment(self, frame): """ Scan and catalog the environment before test starts """ if self.models['yolo'] is None: return {'objects': [], 'positions': []} try: results = self.models['yolo'].predict(frame, conf=0.25, verbose=False) environment_data = { 'objects': [], 'positions': [], 'person_position': None } if results and len(results) > 0: names = self.models['yolo'].names boxes = results[0].boxes for box in boxes: cls_id = int(box.cls[0]) obj_name = names[cls_id] x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() environment_data['objects'].append(obj_name) environment_data['positions'].append({ 'name': obj_name, 'bbox': (int(x1), int(y1), int(x2), int(y2)), 'center': (int((x1+x2)/2), int((y1+y2)/2)) }) if obj_name == 'person': environment_data['person_position'] = (int((x1+x2)/2), int((y1+y2)/2)) return environment_data except Exception as e: return {'objects': [], 'positions': []} def detect_new_objects(self, frame): """ Detect NEW objects that weren't in baseline environment """ if self.models['yolo'] is None or self.baseline_environment is None: return False, [] try: results = self.models['yolo'].predict(frame, conf=0.25, verbose=False) if results and len(results) > 0: names = self.models['yolo'].names boxes = results[0].boxes current_objects = [] for box in boxes: cls_id = int(box.cls[0]) obj_name = names[cls_id] x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() current_center = (int((x1+x2)/2), int((y1+y2)/2)) current_objects.append({ 'name': obj_name, 'center': current_center, 'bbox': (int(x1), int(y1), int(x2), int(y2)) }) baseline_objects = self.baseline_environment['positions'] new_items = [] for curr_obj in current_objects: if curr_obj['name'] == 'person': continue is_baseline = False for base_obj in baseline_objects: if curr_obj['name'] == base_obj['name']: dist = np.sqrt( (curr_obj['center'][0] - base_obj['center'][0])**2 + (curr_obj['center'][1] - base_obj['center'][1])**2 ) if dist < 100: is_baseline = True break if not is_baseline: new_items.append(curr_obj['name']) if new_items: return True, list(set(new_items)) return False, [] except Exception as e: return False, [] def detect_suspicious_movements(self, frame): """Detect suspicious hand movements""" if self.models['hands'] is None: return False, "" rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = frame.shape[:2] try: hand_results = self.models['hands'].process(rgb_frame) if hand_results.multi_hand_landmarks: for hand_landmarks in hand_results.multi_hand_landmarks: wrist = hand_landmarks.landmark[0] index_tip = hand_landmarks.landmark[8] wrist_y = wrist.y * h tip_y = index_tip.y * h if wrist_y > h * 0.75: return True, "Hand movement below desk level detected" if wrist_y < h * 0.15: return True, "Suspicious hand movement at top of frame" except Exception as e: pass return False, "" def calculate_eye_gaze(self, face_landmarks, frame_shape): """Calculate if eyes are looking at camera""" h, w = frame_shape[:2] left_eye_indices = [468, 469, 470, 471, 472] right_eye_indices = [473, 474, 475, 476, 477] left_eye_center = [33, 133, 157, 158, 159, 160, 161, 163, 144, 145, 153, 154, 155] right_eye_center = [362, 263, 387, 386, 385, 384, 398, 382, 381, 380, 373, 374, 390] landmarks = face_landmarks.landmark left_iris_x = np.mean([landmarks[i].x for i in left_eye_indices if i < len(landmarks)]) left_eye_x = np.mean([landmarks[i].x for i in left_eye_center if i < len(landmarks)]) right_iris_x = np.mean([landmarks[i].x for i in right_eye_indices if i < len(landmarks)]) right_eye_x = np.mean([landmarks[i].x for i in right_eye_center if i < len(landmarks)]) left_gaze_ratio = (left_iris_x - left_eye_x) if left_iris_x and left_eye_x else 0 right_gaze_ratio = (right_iris_x - right_eye_x) if right_iris_x and right_eye_x else 0 avg_gaze = (left_gaze_ratio + right_gaze_ratio) / 2 return abs(avg_gaze) < 0.02 def estimate_head_pose(self, face_landmarks, frame_shape): """Estimate head pose angles""" h, w = frame_shape[:2] landmarks_3d = np.array([(lm.x * w, lm.y * h, lm.z) for lm in face_landmarks.landmark]) required_indices = [1, 33, 263, 61, 291] image_points = np.array([landmarks_3d[i] for i in required_indices], dtype="double") model_points = np.array([ (0.0, 0.0, 0.0), (-30.0, -125.0, -30.0), (30.0, -125.0, -30.0), (-60.0, -70.0, -60.0), (60.0, -70.0, -60.0) ]) focal_length = w center = (w / 2, h / 2) camera_matrix = np.array([ [focal_length, 0, center[0]], [0, focal_length, center[1]], [0, 0, 1] ], dtype="double") dist_coeffs = np.zeros((4, 1)) success, rotation_vector, _ = cv2.solvePnP( model_points, image_points, camera_matrix, dist_coeffs, flags=cv2.SOLVEPNP_ITERATIVE ) if success: rmat, _ = cv2.Rodrigues(rotation_vector) pose_mat = cv2.hconcat((rmat, rotation_vector)) _, _, _, _, _, _, euler = cv2.decomposeProjectionMatrix(pose_mat) yaw, pitch, roll = [float(a) for a in euler] return yaw, pitch, roll return 0, 0, 0 def detect_blink(self, face_landmarks): """Detect if eye is blinking""" upper_lid = face_landmarks.landmark[159] lower_lid = face_landmarks.landmark[145] eye_openness = abs(upper_lid.y - lower_lid.y) return eye_openness < 0.01 def analyze_lighting(self, frame): """Analyze lighting conditions""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) mean_brightness = np.mean(gray) std_brightness = np.std(gray) if mean_brightness < 60: return "Too Dark", mean_brightness elif mean_brightness > 200: return "Too Bright", mean_brightness elif std_brightness < 25: return "Low Contrast", mean_brightness else: return "Good", mean_brightness def check_frame_boundaries(self, frame, face_box): """Check if person is within frame boundaries""" if face_box is None: return False, "No face detected", "NO_FACE" h, w = frame.shape[:2] margin = self.frame_margin x, y, fw, fh = face_box face_center_x = x + fw // 2 face_top = y face_left = x face_right = x + fw if face_left < margin: return False, "Person too close to LEFT edge", "LEFT_VIOLATION" if face_right > (w - margin): return False, "Person too close to RIGHT edge", "RIGHT_VIOLATION" if face_top < margin: return False, "Person too close to TOP edge", "TOP_VIOLATION" return True, "Within boundaries", "OK" def detect_person_outside_frame(self, frame): """Detect if any person/living being is outside boundaries""" if self.models['yolo'] is None: return False, "", "" h, w = frame.shape[:2] margin = self.frame_margin try: results = self.models['yolo'].predict(frame, conf=0.4, verbose=False) if results and len(results) > 0: names = self.models['yolo'].names boxes = results[0].boxes living_beings = ['person', 'cat', 'dog', 'bird', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe'] for i, box in enumerate(boxes): cls_id = int(box.cls[0]) obj_name = names[cls_id] if obj_name.lower() in living_beings: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() if x1 < margin or x2 < margin: return True, obj_name, "LEFT" if x1 > (w - margin) or x2 > (w - margin): return True, obj_name, "RIGHT" if y1 < margin or y2 < margin: return True, obj_name, "TOP" except Exception as e: pass return False, "", "" def detect_multiple_bodies(self, frame, num_faces): """Detect multiple bodies using pose and hand detection""" rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) body_count = 0 detected_parts = [] if self.pose_available and self.pose_detector: try: pose_results = self.pose_detector.process(rgb_frame) if pose_results.pose_landmarks: body_count += 1 detected_parts.append("body") landmarks = pose_results.pose_landmarks.landmark visible_shoulders = sum(1 for idx in [11, 12] if landmarks[idx].visibility > 0.5) visible_elbows = sum(1 for idx in [13, 14] if landmarks[idx].visibility > 0.5) if visible_shoulders > 2 or visible_elbows > 2: return True, "Multiple body parts detected (extra shoulders/arms)", body_count + 1 except Exception as e: pass if self.models['hands'] is not None: try: hand_results = self.models['hands'].process(rgb_frame) if hand_results.multi_hand_landmarks: num_hands = len(hand_results.multi_hand_landmarks) if num_hands > 2: detected_parts.append(f"{num_hands} hands") return True, f"Multiple persons detected ({num_hands} hands visible)", 2 if num_hands == 2: hand1 = hand_results.multi_hand_landmarks[0].landmark[0] hand2 = hand_results.multi_hand_landmarks[1].landmark[0] distance = np.sqrt((hand1.x - hand2.x)**2 + (hand1.y - hand2.y)**2) if distance > 0.7: detected_parts.append("widely separated hands") return True, "Suspicious hand positions (possible multiple persons)", 2 except Exception as e: pass if num_faces == 1 and body_count > 1: return True, "Body parts from multiple persons detected", 2 if num_faces > 1: return True, f"Multiple persons detected ({num_faces} faces)", num_faces return False, "", max(num_faces, body_count) def detect_hands_outside_main_person(self, frame, face_box): """Detect hands outside main person's area""" if self.models['hands'] is None or face_box is None: return False, "" rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = frame.shape[:2] try: hand_results = self.models['hands'].process(rgb_frame) if hand_results.multi_hand_landmarks: x, y, fw, fh = face_box expected_left = max(0, x - fw) expected_right = min(w, x + fw * 2) expected_top = max(0, y - fh) expected_bottom = min(h, y + fh * 4) for hand_landmarks in hand_results.multi_hand_landmarks: hand_x = hand_landmarks.landmark[0].x * w hand_y = hand_landmarks.landmark[0].y * h if (hand_x < expected_left - 50 or hand_x > expected_right + 50 or hand_y < expected_top - 50 or hand_y > expected_bottom + 50): return True, "Hand detected outside main person's area" except Exception as e: pass return False, "" def has_skin_tone(self, region): """Check if region contains skin-like colors""" if region.size == 0: return False hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV) lower_skin1 = np.array([0, 20, 70], dtype=np.uint8) upper_skin1 = np.array([20, 255, 255], dtype=np.uint8) lower_skin2 = np.array([0, 20, 0], dtype=np.uint8) upper_skin2 = np.array([20, 150, 255], dtype=np.uint8) mask1 = cv2.inRange(hsv, lower_skin1, upper_skin1) mask2 = cv2.inRange(hsv, lower_skin2, upper_skin2) mask = cv2.bitwise_or(mask1, mask2) skin_ratio = np.sum(mask > 0) / mask.size return skin_ratio > 0.3 def detect_intrusion_at_edges(self, frame, face_box): """Detect body parts intruding from frame edges""" if face_box is None: return False, "" h, w = frame.shape[:2] x, y, fw, fh = face_box edge_width = 80 left_region = frame[:, :edge_width] right_region = frame[:, w-edge_width:] top_left = frame[:edge_width, :w//3] top_right = frame[:edge_width, 2*w//3:] face_center_x = x + fw // 2 face_far_from_left = face_center_x > w * 0.3 face_far_from_right = face_center_x < w * 0.7 if face_far_from_left and self.has_skin_tone(left_region): if self.models['hands']: rgb_region = cv2.cvtColor(left_region, cv2.COLOR_BGR2RGB) try: result = self.models['hands'].process(rgb_region) if result.multi_hand_landmarks: return True, "Body part detected at left edge (another person)" except: pass if face_far_from_right and self.has_skin_tone(right_region): if self.models['hands']: rgb_region = cv2.cvtColor(right_region, cv2.COLOR_BGR2RGB) try: result = self.models['hands'].process(rgb_region) if result.multi_hand_landmarks: return True, "Body part detected at right edge (another person)" except: pass if y > h * 0.2: if self.has_skin_tone(top_left) or self.has_skin_tone(top_right): return True, "Body part detected at top edge (another person)" return False, "" def draw_frame_boundaries(self, frame): """Draw visible frame boundaries""" h, w = frame.shape[:2] margin = self.frame_margin overlay = frame.copy() cv2.line(overlay, (margin, 0), (margin, h), (0, 255, 0), 3) cv2.line(overlay, (w - margin, 0), (w - margin, h), (0, 255, 0), 3) cv2.line(overlay, (0, margin), (w, margin), (0, 255, 0), 3) cv2.rectangle(overlay, (margin, margin), (w - margin, h), (0, 255, 0), 2) frame_with_boundaries = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0) corner_size = 30 cv2.line(frame_with_boundaries, (margin, margin), (margin + corner_size, margin), (0, 255, 0), 3) cv2.line(frame_with_boundaries, (margin, margin), (margin, margin + corner_size), (0, 255, 0), 3) cv2.line(frame_with_boundaries, (w - margin, margin), (w - margin - corner_size, margin), (0, 255, 0), 3) cv2.line(frame_with_boundaries, (w - margin, margin), (w - margin, margin + corner_size), (0, 255, 0), 3) cv2.putText(frame_with_boundaries, "Stay within GREEN boundaries", (w//2 - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) return frame_with_boundaries def pre_test_setup_phase(self, ui_callbacks, timeout=60): """ ONE-TIME pre-test setup phase with environment scanning """ if self.position_adjusted: return True cap = cv2.VideoCapture(0) if not cap.isOpened(): return False start_time = time.time() position_ok_counter = 0 required_stable_frames = 30 ui_callbacks['countdown_update']("šŸ“ø ONE-TIME SETUP: Adjust your position within the GREEN frame") while (time.time() - start_time) < timeout: ret, frame = cap.read() if not ret: continue rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = frame.shape[:2] frame_with_boundaries = self.draw_frame_boundaries(frame) face_box = None is_ready = False status_message = "Detecting face..." status_color = (255, 165, 0) if self.models['face_mesh'] is not None: face_results = self.models['face_mesh'].process(rgb_frame) if face_results.multi_face_landmarks: num_faces = len(face_results.multi_face_landmarks) if num_faces > 1: status_message = "āš ļø Multiple faces detected! Only ONE person allowed" status_color = (0, 0, 255) position_ok_counter = 0 elif num_faces == 1: face_landmarks = face_results.multi_face_landmarks[0] landmarks_2d = np.array([(lm.x * w, lm.y * h) for lm in face_landmarks.landmark]) x_coords = landmarks_2d[:, 0] y_coords = landmarks_2d[:, 1] face_box = (int(np.min(x_coords)), int(np.min(y_coords)), int(np.max(x_coords) - np.min(x_coords)), int(np.max(y_coords) - np.min(y_coords))) within_bounds, boundary_msg, boundary_status = self.check_frame_boundaries(frame, face_box) outside_detected, obj_type, location = self.detect_person_outside_frame(frame) if outside_detected: status_message = f"āš ļø {obj_type.upper()} detected outside frame ({location} side)!" status_color = (0, 0, 255) position_ok_counter = 0 elif not within_bounds: status_message = f"āš ļø {boundary_msg} - Please adjust!" status_color = (0, 0, 255) position_ok_counter = 0 if boundary_status == "LEFT_VIOLATION": cv2.rectangle(frame_with_boundaries, (0, 0), (self.frame_margin, h), (0, 0, 255), -1) elif boundary_status == "RIGHT_VIOLATION": cv2.rectangle(frame_with_boundaries, (w - self.frame_margin, 0), (w, h), (0, 0, 255), -1) elif boundary_status == "TOP_VIOLATION": cv2.rectangle(frame_with_boundaries, (0, 0), (w, self.frame_margin), (0, 0, 255), -1) else: position_ok_counter += 1 progress = min(100, int((position_ok_counter / required_stable_frames) * 100)) status_message = f"āœ… Good position! Hold steady... {progress}%" status_color = (0, 255, 0) if position_ok_counter >= required_stable_frames: is_ready = True else: status_message = "āŒ No face detected - Please position yourself in frame" status_color = (0, 0, 255) position_ok_counter = 0 overlay_height = 140 overlay = frame_with_boundaries.copy() cv2.rectangle(overlay, (0, h - overlay_height), (w, h), (0, 0, 0), -1) frame_with_boundaries = cv2.addWeighted(frame_with_boundaries, 0.7, overlay, 0.3, 0) cv2.putText(frame_with_boundaries, status_message, (10, h - 110), cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) cv2.putText(frame_with_boundaries, "Instructions:", (10, h - 80), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) cv2.putText(frame_with_boundaries, "• Keep your face within GREEN boundaries", (10, h - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) cv2.putText(frame_with_boundaries, "• Ensure no one else is visible", (10, h - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) cv2.putText(frame_with_boundaries, "• Remove all unauthorized items from view", (10, h - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) ui_callbacks['video_update'](cv2.resize(frame_with_boundaries, (640, 480))) elapsed = int(time.time() - start_time) ui_callbacks['timer_update'](f"ā±ļø Setup time: {elapsed}s / {timeout}s") if is_ready: ui_callbacks['countdown_update']("šŸ” Scanning environment... Please stay still") time.sleep(1) baseline_frames = [] for _ in range(10): ret, scan_frame = cap.read() if ret: baseline_frames.append(scan_frame) time.sleep(0.1) if baseline_frames: self.baseline_environment = self.scan_environment(baseline_frames[len(baseline_frames)//2]) success_frame = frame_with_boundaries.copy() cv2.rectangle(success_frame, (0, 0), (w, h), (0, 255, 0), 10) cv2.putText(success_frame, "SETUP COMPLETE!", (w//2 - 180, h//2 - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3) cv2.putText(success_frame, "Test will begin shortly...", (w//2 - 180, h//2 + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) ui_callbacks['video_update'](cv2.resize(success_frame, (640, 480))) time.sleep(3) cap.release() ui_callbacks['countdown_update']('') self.position_adjusted = True return True time.sleep(0.03) cap.release() ui_callbacks['countdown_update']('āš ļø Setup timeout - Please try again') return False def record_interview(self, question_data, duration, ui_callbacks): """ DEPRECATED: Use record_continuous_interview() instead Kept for backward compatibility """ result = self.record_continuous_interview([question_data], duration, ui_callbacks) if isinstance(result, dict) and 'questions_results' in result: if result['questions_results']: first_result = result['questions_results'][0] first_result['video_path'] = result.get('session_video_path', '') first_result['violation_detected'] = len(first_result.get('violations', [])) > 0 first_result['violation_reason'] = first_result['violations'][0]['reason'] if first_result.get('violations') else '' return first_result return {"error": "Recording failed"} def record_audio_to_file(self, duration, path): """Record audio to WAV file""" r = sr.Recognizer() try: with sr.Microphone() as source: r.adjust_for_ambient_noise(source, duration=0.6) audio = r.record(source, duration=duration) with open(path, "wb") as f: f.write(audio.get_wav_data()) return path except: return None def transcribe_audio(self, path): """Transcribe audio file to text""" r = sr.Recognizer() try: with sr.AudioFile(path) as source: audio = r.record(source) text = r.recognize_google(audio) return text if text.strip() else "[Could not understand audio]" except sr.UnknownValueError: return "[Could not understand audio]" except sr.RequestError: return "[Speech recognition service unavailable]" except: return "[Could not understand audio]" def record_continuous_interview(self, questions_list, duration_per_question, ui_callbacks): """ Record ALL questions continuously - continues even if violations occur Captures violation images and stores them for display in results """ # ========== PRE-TEST SETUP ========== ui_callbacks['status_update']("**šŸ”§ Initializing test environment...**") setup_success = self.pre_test_setup_phase(ui_callbacks, timeout=90) if not setup_success: return {"error": "Setup phase failed or timeout"} # ========== INSTRUCTIONS ========== ui_callbacks['countdown_update']("āœ… Setup complete! Please read the instructions...") ui_callbacks['status_update'](f""" **šŸ“‹ TEST INSTRUCTIONS:** - You will answer **{len(questions_list)} questions** continuously - Each question has **{duration_per_question} seconds** to answer - **Important:** Even if a violation is detected, the interview will continue - All violations will be reviewed at the end - Stay within boundaries and maintain focus throughout **The test will begin in 10 seconds...** """) time.sleep(10) # ========== START RECORDING ========== all_results = [] for i in range(3, 0, -1): ui_callbacks['countdown_update'](f"šŸŽ¬ Test starts in {i}...") time.sleep(1) ui_callbacks['countdown_update']('') cap = cv2.VideoCapture(0) if not cap.isOpened(): return {"error": "Unable to access camera"} session_video_temp = tempfile.NamedTemporaryFile(delete=False, suffix=".avi") session_video_path = session_video_temp.name session_video_temp.close() fourcc = cv2.VideoWriter_fourcc(*"XVID") out = cv2.VideoWriter(session_video_path, fourcc, 15.0, (640, 480)) session_start_time = time.time() session_violations = [] # ========== LOOP THROUGH ALL QUESTIONS ========== for q_idx, question_data in enumerate(questions_list): ui_callbacks['countdown_update'](f"šŸ“ Question {q_idx + 1} of {len(questions_list)}") question_text = question_data.get('question', 'No question text') question_tip = question_data.get('tip', 'Speak clearly and confidently') ui_callbacks['question_update'](q_idx + 1, question_text, question_tip) ui_callbacks['status_update'](f""" **ā±ļø Recording Question {q_idx + 1}** Time to answer: **{duration_per_question} seconds** """) for i in range(3, 0, -1): ui_callbacks['timer_update'](f"ā±ļø Starting in {i}s...") time.sleep(1) audio_temp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") audio_path = audio_temp.name audio_temp.close() audio_thread = threading.Thread( target=lambda path=audio_path: self.record_audio_to_file(duration_per_question, path), daemon=True ) audio_thread.start() # Question recording state question_start_time = time.time() frames = [] question_violations = [] # Store violations for THIS question no_face_start = None look_away_start = None eye_contact_frames = 0 total_frames = 0 blink_count = 0 prev_blink = False face_box = None # ========== RECORDING LOOP FOR THIS QUESTION ========== while (time.time() - question_start_time) < duration_per_question: ret, frame = cap.read() if not ret: break out.write(frame) frames.append(frame.copy()) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, _ = frame.shape total_frames += 1 lighting_status, brightness = self.analyze_lighting(frame) num_faces = 0 looking_at_camera = False attention_status = "No Face" # ========== FACE DETECTION & VIOLATION CHECKS ========== if self.models['face_mesh'] is not None: face_results = self.models['face_mesh'].process(rgb_frame) if face_results.multi_face_landmarks: num_faces = len(face_results.multi_face_landmarks) # Check multiple bodies is_multi_body, multi_msg, body_count = self.detect_multiple_bodies(frame, num_faces) if is_multi_body: violation_img_path = self.save_violation_image(frame, q_idx + 1, multi_msg) question_violations.append({ 'reason': multi_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) # Continue to next question instead of breaking break if num_faces > 1: violation_msg = f"Multiple persons detected ({num_faces} faces)" violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) question_violations.append({ 'reason': violation_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break elif num_faces == 1: no_face_start = None face_landmarks = face_results.multi_face_landmarks[0] try: landmarks_2d = np.array([(lm.x * w, lm.y * h) for lm in face_landmarks.landmark]) x_coords = landmarks_2d[:, 0] y_coords = landmarks_2d[:, 1] face_box = (int(np.min(x_coords)), int(np.min(y_coords)), int(np.max(x_coords) - np.min(x_coords)), int(np.max(y_coords) - np.min(y_coords))) # Check boundaries within_bounds, boundary_msg, boundary_status = self.check_frame_boundaries(frame, face_box) if not within_bounds: violation_img_path = self.save_violation_image(frame, q_idx + 1, boundary_msg) question_violations.append({ 'reason': boundary_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break # Check person outside frame outside_detected, obj_type, location = self.detect_person_outside_frame(frame) if outside_detected: violation_msg = f"{obj_type.upper()} detected outside frame ({location} side)" violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) question_violations.append({ 'reason': violation_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break # Check intrusions is_intrusion, intrusion_msg = self.detect_intrusion_at_edges(frame, face_box) if is_intrusion: violation_img_path = self.save_violation_image(frame, q_idx + 1, intrusion_msg) question_violations.append({ 'reason': intrusion_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break # Check hands outside is_hand_violation, hand_msg = self.detect_hands_outside_main_person(frame, face_box) if is_hand_violation: violation_img_path = self.save_violation_image(frame, q_idx + 1, hand_msg) question_violations.append({ 'reason': hand_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break # Suspicious movements is_suspicious, sus_msg = self.detect_suspicious_movements(frame) if is_suspicious: violation_img_path = self.save_violation_image(frame, q_idx + 1, sus_msg) question_violations.append({ 'reason': sus_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break yaw, pitch, roll = self.estimate_head_pose(face_landmarks, frame.shape) gaze_centered = self.calculate_eye_gaze(face_landmarks, frame.shape) is_blink = self.detect_blink(face_landmarks) if is_blink and not prev_blink: blink_count += 1 prev_blink = is_blink head_looking_forward = abs(yaw) <= 20 and abs(pitch) <= 20 if head_looking_forward and gaze_centered: look_away_start = None looking_at_camera = True eye_contact_frames += 1 attention_status = "Looking at Camera āœ“" else: if look_away_start is None: look_away_start = time.time() attention_status = "Looking Away" else: elapsed = time.time() - look_away_start if elapsed > 2.0: violation_msg = "Looking away for >2 seconds" violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) question_violations.append({ 'reason': violation_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break else: attention_status = f"Looking Away ({elapsed:.1f}s)" except: attention_status = "Face Error" else: if no_face_start is None: no_face_start = time.time() attention_status = "No Face Visible" else: elapsed = time.time() - no_face_start if elapsed > 2.0: violation_msg = "No face visible for >2 seconds" violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) question_violations.append({ 'reason': violation_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break else: attention_status = f"No Face ({elapsed:.1f}s)" # Check for new objects if total_frames % 20 == 0: new_detected, new_items = self.detect_new_objects(frame) if new_detected: violation_msg = f"New item(s) brought into view: {', '.join(new_items)}" violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) question_violations.append({ 'reason': violation_msg, 'timestamp': time.time() - question_start_time, 'image_path': violation_img_path }) break # Display frame overlay = frame.copy() cv2.rectangle(overlay, (0, 0), (w, 120), (0, 0, 0), -1) frame_display = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0) # Show violation warning if any occurred status_color = (0, 255, 0) if len(question_violations) == 0 else (0, 165, 255) violation_text = f" | āš ļø {len(question_violations)} violation(s)" if question_violations else "" cv2.putText(frame_display, f"Q{q_idx+1}/{len(questions_list)} - {attention_status}{violation_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) cv2.putText(frame_display, f"Lighting: {lighting_status}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) cv2.putText(frame_display, f"Eye Contact: {int((eye_contact_frames/max(total_frames,1))*100)}%", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) elapsed_q = time.time() - question_start_time remaining = max(0, int(duration_per_question - elapsed_q)) cv2.putText(frame_display, f"Time: {remaining}s", (10, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) ui_callbacks['video_update'](cv2.resize(frame_display, (480, 360))) eye_contact_pct = (eye_contact_frames / max(total_frames, 1)) * 100 status_text = f""" **Question {q_idx + 1} of {len(questions_list)}** šŸ‘ļø **Eye Contact:** {eye_contact_pct:.1f}% 😓 **Blinks:** {blink_count} šŸ’” **Lighting:** {lighting_status} āš ļø **Status:** {attention_status} """ if question_violations: status_text += f"\n\nāš ļø **Violations in this question:** {len(question_violations)}" ui_callbacks['status_update'](status_text) overall_progress = (q_idx + (elapsed_q / duration_per_question)) / len(questions_list) overall_progress = max(0.0, min(1.0, overall_progress)) ui_callbacks['progress_update'](overall_progress) ui_callbacks['timer_update'](f"šŸŽ„ Q{q_idx+1}/{len(questions_list)} - {remaining}s remaining") time.sleep(0.05) # Wait for audio audio_thread.join(timeout=duration_per_question + 5) # Transcribe transcript = "" if os.path.exists(audio_path): transcript = self.transcribe_audio(audio_path) # Add violations to session list if question_violations: session_violations.extend([f"Q{q_idx+1}: {v['reason']}" for v in question_violations]) # Store results for this question question_result = { 'question_number': q_idx + 1, 'question_text': question_data.get('question', ''), 'audio_path': audio_path, 'frames': frames, 'violations': question_violations, # Now includes image paths 'violation_detected': len(question_violations) > 0, 'eye_contact_pct': (eye_contact_frames / max(total_frames, 1)) * 100, 'blink_count': blink_count, 'face_box': face_box, 'transcript': transcript, 'lighting_status': lighting_status } all_results.append(question_result) # Show message and continue to next question if question_violations: ui_callbacks['countdown_update'](f"āš ļø Violation detected in Q{q_idx + 1}! Continuing to next question in 3s...") time.sleep(3) elif q_idx < len(questions_list) - 1: ui_callbacks['countdown_update'](f"āœ… Question {q_idx + 1} complete! Next question in 3s...") time.sleep(3) # Cleanup cap.release() out.release() # Clear UI ui_callbacks['video_update'](None) ui_callbacks['progress_update'](1.0) # Final message total_violations = sum(len(r.get('violations', [])) for r in all_results) if total_violations > 0: ui_callbacks['countdown_update'](f"āš ļø TEST COMPLETED WITH {total_violations} VIOLATION(S)") ui_callbacks['status_update'](f"**āš ļø {total_violations} violation(s) detected across all questions. Review results below.**") else: ui_callbacks['countdown_update']("āœ… TEST COMPLETED SUCCESSFULLY!") ui_callbacks['status_update']("**All questions answered with no violations. Processing results...**") ui_callbacks['timer_update']("") # Return comprehensive results return { 'questions_results': all_results, 'session_video_path': session_video_path, 'total_questions': len(questions_list), 'completed_questions': len(all_results), 'session_violations': session_violations, 'total_violations': total_violations, 'violation_images_dir': self.violation_images_dir, 'session_duration': time.time() - session_start_time } ####