Skip to content

Commit

Permalink
camera reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
isilcala committed Feb 17, 2025
1 parent eb06be6 commit 190f606
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
14 changes: 14 additions & 0 deletions src/sentinelowl/core/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
from ..config import CameraConfig


class CameraConnectionError(Exception):
"""Custom exception for camera connection errors"""

pass


class CameraHandler:
"""Handler for camera input"""

def __init__(self, config: CameraConfig):
self._retry_count = 0
self.max_retries = 3 # ✅ 新增最大重试次数
self.config = config
self.cap = None
self._initialize_camera()
Expand Down Expand Up @@ -39,6 +47,12 @@ def capture_frame(self) -> Optional[Tuple[bool, any]]:
return frame

def _reconnect_camera(self):
if self._retry_count >= self.max_retries:
raise CameraConnectionError(
f"Failed to reconnect after {self.max_retries} attempts"
)

self._retry_count += 1
"""Attempt to reconnect to the camera"""
if self.cap is not None:
self.cap.release()
Expand Down
33 changes: 14 additions & 19 deletions src/sentinelowl/scripts/validate.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
import os
import cv2
import numpy as np
import onnxruntime as ort
from typing import Tuple


def load_model(model_path: str) -> ort.InferenceSession:
"""Load the ONNX model"""
try:
session = ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
print(f"✅ Model loaded successfully: {model_path}")
return session
except Exception as e:
print(f"❌ Failed to load model from {model_path}: {str(e)}")
raise
"""Load the ONNX model with path validation"""
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
print(f"Loading model from: {model_path}")
return ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])


def preprocess_frame(
frame: np.ndarray, target_size: Tuple[int, int] = (28, 28)
) -> np.ndarray:
"""Preprocess the input frame for MNIST model"""
# Convert to grayscale
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Resize to model input size
frame = cv2.resize(frame, target_size)
# Normalize to [0, 1]
frame = frame.astype(np.float32) / 255.0
# Add batch and channel dimensions
frame = np.expand_dims(frame, axis=(0, 1)) # Shape: (1, 1, 28, 28)
return frame

Expand All @@ -41,30 +35,31 @@ def predict(model: ort.InferenceSession, frame: np.ndarray) -> Tuple[int, float]


def main(camera_url: str, model_path: str):
"""Main validation function"""
"""Main validation function with URL validation"""
if not camera_url.startswith(("http://", "rtsp://", "file://")):
raise ValueError(f"Invalid camera URL: {camera_url}")

# Load model
model = load_model(model_path)
print(f"✅ Model loaded: {model_path}")

# Open camera stream
print(f"Connecting to camera: {camera_url}")
cap = cv2.VideoCapture(camera_url)
if not cap.isOpened():
print(f"Failed to open camera: {camera_url}")
print(f"Failed to open camera: {camera_url}")
return

print(f"✅ Camera connected: {camera_url}")

try:
# Capture a single frame
ret, frame = cap.read()
if not ret:
print("Failed to capture frame")
print("Failed to capture frame")
return

# Preprocess and predict
processed_frame = preprocess_frame(frame)
predicted_class, confidence = predict(model, processed_frame)
print(f"🔍 Predicted class: {predicted_class}, Confidence: {confidence:.4f}")
print(f"Predicted class: {predicted_class}, Confidence: {confidence:.4f}")

finally:
cap.release()
Expand Down
20 changes: 0 additions & 20 deletions src/sentinelowl/services/printer.py

This file was deleted.

13 changes: 13 additions & 0 deletions tests/test_camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from sentinelowl.config import CameraConfig
from sentinelowl.core.camera import CameraHandler, CameraConnectionError


def test_camera_reconnect_failure():
"""Test camera reconnection failure handling"""
config = CameraConfig(url="invalid_url", reconnect_interval=1)
camera = CameraHandler(config)

with pytest.raises(CameraConnectionError):
for _ in range(5): # 超过最大重试次数
camera.capture_frame()

0 comments on commit 190f606

Please sign in to comment.