-
Notifications
You must be signed in to change notification settings - Fork 0
/
cv2_pipeline.py
101 lines (87 loc) · 3.41 KB
/
cv2_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# importing required libraries
import cv2
import time
from threading import Thread # library for implementing multi-threaded processing
# defining a helper class for implementing multi-threaded processing
class WebcamStream:
def __init__(self, stream_id=0, frame_size=(640, 480)):
self.stream_id = stream_id # default is 0 for primary camera
# opening video capture stream
self.vcap = cv2.VideoCapture(self.stream_id)
self.vcap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_size[0])
self.vcap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_size[1])
if self.vcap.isOpened() is False:
print("[Exiting]: Error accessing webcam stream.")
exit(0)
fps_input_stream = int(self.vcap.get(5))
print("FPS of webcam hardware/input stream: {}".format(fps_input_stream))
# reading a single frame from vcap stream for initializing
self.grabbed, self.frame = self.vcap.read()
if self.grabbed is False:
print("[Exiting] No more frames to read")
exit(0)
# self.stopped is set to False when frames are being read from self.vcap stream
self.stopped = True
# reference to the thread for reading next available frame from input stream
self.t = Thread(target=self.update, args=())
self.t.daemon = True # daemon threads keep running in the background while the program is executing
# method for starting the thread for grabbing next available frame in input stream
def start(self):
self.stopped = False
self.t.start()
# method for reading next frame
def update(self):
while True:
if self.stopped is True:
break
self.grabbed, self.frame = self.vcap.read()
if self.grabbed is False:
print("[Exiting] No more frames to read")
self.stopped = True
break
self.vcap.release()
# method for returning latest read frame
def read(self):
return self.frame
# method called to stop reading frames
def stop(self):
self.stopped = True
# initializing and starting multi-threaded webcam capture input stream 0
webcam_stream = WebcamStream(stream_id=0) # stream_id = 0 is for primary camera
webcam_stream.start()
# initializing and starting multi-threaded webcam capture input stream 1
webcam_stream_1 = WebcamStream(stream_id=4, frame_size=(1920, 1080)) # stream_id = 2 is for primary camera
webcam_stream_1.start()
# processing frames in input stream
num_frames_processed = 0
start = time.time()
while True:
if webcam_stream.stopped is True:
break
else:
frame = webcam_stream.read()
if webcam_stream_1.stopped is True:
break
else:
frame_1 = webcam_stream_1.read()
# adding a delay for simulating time taken for processing a frame
delay = 0.03 # delay value in seconds. so, delay=1 is equivalent to 1 second
time.sleep(delay)
num_frames_processed += 1
cv2.imshow("frame", frame)
cv2.imshow("frame_1", frame_1)
key = cv2.waitKey(1)
if key == ord("q"):
break
end = time.time()
webcam_stream.stop() # stop the webcam stream
# printing time elapsed and fps
elapsed = end - start
fps = num_frames_processed / elapsed
print(
"FPS: {} , Elapsed Time: {} , Frames Processed: {}".format(
fps, elapsed, num_frames_processed
)
)
# closing all windows
cv2.destroyAllWindows()