Skip to content

Commit 5b1e63c

Browse files
authored
New functionality to receive data from socket (#4)
* Added Try/Except block around header delimiter Headers should end with a carriage return/line feed at the end of each line and then a blank line (with a CRLF) to end the header. In some cases, for example a 406 reponse, the header will not have the blank line and instead the server will close the connection. In this case the code raises a ValueError exception which is not very helpfully. Inserting this try/except block adds clarity to what failed. * Removed .pyc file, added .gitignore * GET_PARAM now callsback Best I can tell, GET_PARAMETER is skipped being sent to callback because it is part of the heartbeat, and will get called every so often (current default 10s). I suppose I understand the intent, but not sure its the right approach, with the callback parameter, I want to see all traffic between server and client. Also if CSeq isn't in response, teardown() and raise error. * Break recv_msg into two functions _recv_msg will continously poll for incoming data, _parse_msg will try to pull out any data that requires action. * Turn off blocking on the socket * _parse_msg now waits for full message before sending it out to be processed.
1 parent 3fdc083 commit 5b1e63c

File tree

2 files changed

+52
-26
lines changed

2 files changed

+52
-26
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ Getting Started
1212
myrtsp.do_describe()
1313
while myrtsp.state != 'describe':
1414
time.sleep(0.1)
15-
myrtsp.do_setup(rtsp.track_id_str)
15+
myrtsp.TRANSPORT_TYPE_LIST = ['rtp_over_udp','rtp_over_tcp']
16+
myrtsp.do_setup(track_id)
1617
while myrtsp.state != 'setup':
1718
time.sleep(0.1)
1819
#Open socket to capture frames here

rtsp.py

+50-25
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(self, url, dest_ip='', callback=None, socks=None):
6161
self._parsed_url.path
6262
self._session_id = ''
6363
self._sock = None
64-
self._socks = socks
64+
self._socks = socks
6565
self.cur_range = 'npt=end-'
6666
self.cur_scale = 1
6767
self.location = ''
@@ -108,7 +108,8 @@ def close(self):
108108
def run(self):
109109
try:
110110
while self.running:
111-
self.response = msg = self.recv_msg()
111+
self._recv_msg()
112+
self.response = msg = self._parse_msg()
112113
if msg.startswith('RTSP'):
113114
self._process_response(msg)
114115
elif msg.startswith('ANNOUNCE'):
@@ -146,6 +147,9 @@ def _connect_server(self):
146147
try:
147148
self._sock = self._socks or socket.socket(socket.AF_INET, socket.SOCK_STREAM)
148149
self._sock.connect((self._parsed_url.hostname, self._server_port))
150+
# Turning off blocking here, as the socket is currently monitored
151+
# in its own thread.
152+
self._sock.setblocking(0)
149153
except socket.error as e:
150154
raise RTSPNetError('socket error: %s [%s:%d]' %
151155
(e, self._parsed_url.hostname, self._server_port))
@@ -165,29 +169,36 @@ def _update_dest_ip(self):
165169
self._dest_ip = self._sock.getsockname()[0]
166170
self._callback('DEST_IP: %s\n' % self._dest_ip)
167171

168-
def recv_msg(self):
169-
'''A complete response message or
170-
an ANNOUNCE notification message is received'''
172+
def _recv_msg(self):
173+
'''Continously check for new data and put it in
174+
cache.'''
171175
try:
172-
while not (not self.running or HEADER_END_STR in self.cache()):
173-
more = self._sock.recv(2048)
174-
if not more:
175-
break
176-
self.cache(more.decode())
176+
more = self._sock.recv(2048)
177+
self.cache(more.decode())
177178
except socket.error as e:
178179
RTSPNetError('Receive data error: %s' % e)
179180

181+
def _parse_msg(self):
182+
'''Read through the cache and pull out a complete
183+
response or ANNOUNCE notification message'''
180184
msg = ''
181-
if self.cache():
182-
tmp = self.cache()
185+
tmp = self.cache()
186+
if tmp:
183187
try:
184-
(msg, tmp) = tmp.split(HEADER_END_STR, 1)
188+
# Check here for a header, if the cache isn't empty and there
189+
# isn't a HEADER_END_STR, then there isn't a proper header in
190+
# the response. For now this will generate an error and fail.
191+
(header, body) = tmp.split(HEADER_END_STR, 1)
185192
except ValueError as e:
186-
self._callback(self._get_time_str() + '\n' + tmp)
187-
raise RTSPError('Response did not contain double CRLF')
188-
content_length = self._get_content_length(msg)
189-
msg += HEADER_END_STR + tmp[:content_length]
190-
self.set_cache(tmp[content_length:])
193+
self._callback(self._get_time_str() + '\n' + tmp)
194+
raise RTSPError('Response did not contain double CRLF')
195+
content_length = self._get_content_length(header)
196+
# If the body of the message is less than the given content_length
197+
# then the full message hasn't been received so bail.
198+
if (len(body) < content_length):
199+
return ''
200+
msg = header + HEADER_END_STR + body[:content_length]
201+
self.set_cache(body[content_length:])
191202
return msg
192203

193204
def _add_auth(self, msg):
@@ -254,9 +265,21 @@ def _get_time_str(self):
254265
def _process_response(self, msg):
255266
'''Process the response message'''
256267
status, headers, body = self._parse_response(msg)
257-
rsp_cseq = int(headers['cseq'])
258-
if self._cseq_map[rsp_cseq] != 'GET_PARAMETER':
268+
try:
269+
rsp_cseq = int(headers['cseq'])
270+
except KeyError as e:
259271
self._callback(self._get_time_str() + '\n' + msg)
272+
self.do_teardown()
273+
raise RTSPError('Unexpected response from server')
274+
275+
# Best I can tell, GET_PARAMETER is skipped being sent to callback
276+
# because it is part of the heartbeat, and will get called every so
277+
# often (current default 10s). I suppose I understand the intent, but
278+
# not sure its the right approach, with the callback parameter, I
279+
# want to see all traffic between server and client.
280+
#if self._cseq_map[rsp_cseq] != 'GET_PARAMETER':
281+
# self._callback(self._get_time_str() + '\n' + msg)
282+
self._callback(self._get_time_str() + '\n' + msg)
260283
if status == 401 and not self._auth:
261284
self._add_auth(headers['www-authenticate'])
262285
self.do_replay_request()
@@ -325,8 +348,9 @@ def _sendmsg(self, method, url, headers):
325348
for (k, v) in list(headers.items()):
326349
msg += END_OF_LINE + '%s: %s'%(k, str(v))
327350
msg += HEADER_END_STR # End headers
328-
if method != 'GET_PARAMETER' or 'x-RetransSeq' in headers:
329-
self._callback(self._get_time_str() + END_OF_LINE + msg)
351+
#if method != 'GET_PARAMETER' or 'x-RetransSeq' in headers:
352+
# self._callback(self._get_time_str() + END_OF_LINE + msg)
353+
self._callback(self._get_time_str() + END_OF_LINE + msg)
330354
try:
331355
self._sock.send(msg.encode())
332356
except socket.error as e:
@@ -363,13 +387,14 @@ def do_describe(self, headers={}):
363387
self._sendmsg('DESCRIBE', self._orig_url, headers)
364388

365389
def do_setup(self, track_id_str=None, headers={}):
390+
#TODO: Currently issues SETUP for all tracks but doesn't keep track
391+
# of all sessions or teardown all of them.
366392
if self._auth:
367393
headers['Authorization'] = self._auth
368394
headers['Transport'] = self._get_transport_type()
369-
#TODO: Currently issues SETUP for all tracks but doesn't keep track
370-
# of all sessions or teardown all of them.
395+
# If a string is supplied, it must contain the proceeding '/'
371396
if isinstance(track_id_str,str):
372-
self._sendmsg('SETUP', self._orig_url+'/'+track_id_str, headers)
397+
self._sendmsg('SETUP', self._orig_url + track_id_str, headers)
373398
elif isinstance(track_id_str, int):
374399
self._sendmsg('SETUP', self._orig_url +
375400
'/' +

0 commit comments

Comments
 (0)