-
Notifications
You must be signed in to change notification settings - Fork 21
/
motion_detection_test.py
166 lines (124 loc) · 5.43 KB
/
motion_detection_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from system.log_support import init_logger
import time
from system.motion_detection import MotionDetectorV3Traced
import config
import datetime as dts
import numpy as np
from system.camera_support import CameraConnectionSupport
import cv2 as cv
import imutils
class MotionDetectionTester(CameraConnectionSupport):
def __init__(self, camConnectionString, logger):
CameraConnectionSupport.__init__(self, camConnectionString, logger)
# initializing motion detector
self.detector = MotionDetectorV3Traced()
self.detector.resizeBeforeDetect = False
self.detector.multiFrameDetection = False
self.inMotionDetectedState = False
self.__camConnectionDts = None
self.__canDetectMotion = False
def canDetectMotion(self):
if self.__canDetectMotion:
return True
if self.__camConnectionDts is None:
return False
minDts = self.__camConnectionDts + dts.timedelta(seconds=config.INITIAL_WAIT_INTERVAL_BEFORE_MOTION_DETECTION_SECS)
if minDts > self.utcNow():
return False
self.__canDetectMotion = True
return True
def loop(self): # noqa
"""
Main loop for motion detection tester
:return:
"""
self.logger.info("main loop started")
emptyFrame = None
while True:
# initializing connection to camera
if self.cap is None:
if not self._initCamera():
continue
self.__camConnectionDts = self.utcNow()
# reading frames from camera
ret, current_frame = self.cap.read()
# if can't read current frame - going to the next loop
if (ret == False) or (current_frame is None): # the connection broke, or the stream came to an end
continue
current_frame = imutils.resize(current_frame, width=500, height=500)
instant = time.time() # get timestamp of the frame
############################################################
# calculating width and height of current video stream #
############################################################
frameWidth = np.size(current_frame, 0)
frameHeight = np.size(current_frame, 1)
if emptyFrame is None:
emptyFrame = np.zeros((frameHeight, frameWidth, 3), np.uint8)
resolutionChanged = False
if None in [self.frameWidth, self.frameHeight]:
self.frameWidth = frameWidth
self.frameHeight = frameHeight
self.nb_pixels = self.frameWidth * self.frameHeight
self.logger.info("self.width = {}".format(self.frameWidth))
self.logger.info("self.height = {}".format(self.frameHeight))
resolutionChanged = True
else:
resolutionChanged = ((self.frameWidth != frameWidth) or (self.frameHeight != frameHeight))
if resolutionChanged:
self.onFrameSizeUpdate(frameWidth, frameHeight)
########################
# detecting motion #
########################
motionDetected = False
# detection motion if can do it now
if self.canDetectMotion():
if self.detector.motionDetected(current_frame):
self.trigger_time = instant # Update the trigger_time
if not self.inMotionDetectedState:
self.logger.info("something moved!")
motionDetected = True
self.inMotionDetectedState = True
now = self.utcNow()
# prolongating motion for minimal motion duration
if (not motionDetected) and (self.detector.motionDetectionDts is not None):
minDuration = self.detector.motionDetectionDts + dts.timedelta(seconds=config.MINIMAL_MOTION_DURATION)
if minDuration > now:
motionDetected = True
# clearing motion detection flag when needed
if not motionDetected:
self.inMotionDetectedState = False
# calculating left seconds for motion (for further use in label)
dx = 0
if motionDetected:
dx = now - self.detector.motionDetectionDts
dx = config.MINIMAL_MOTION_DURATION - dx.seconds
# adding label for frame with detected motion
if motionDetected:
text = "MOTION DETECTED [{}]".format(dx)
cv.putText(
current_frame,
text,
(10, 20),
cv.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 255), # b g r
2
)
# show current frame
cv.imshow("frame", current_frame)
# reading key and breaking loop when Esc or "q" key pressed
key = cv.waitKey(1)
if (key & 0xFF == ord("q")) or (key == 27):
break
if self.cap is not None:
self.cap.release()
cv.destroyAllWindows()
self.logger.info("main loop finished")
def main():
logger = init_logger()
logger.info("app started")
processor = MotionDetectionTester(config.cam, logger)
processor.loop()
logger.info("app finished")
if __name__ == "__main__":
main()