@@ -140,14 +140,7 @@ public void Connect(EndPoint endPoint)
140
140
}
141
141
catch ( SocketException ex )
142
142
{
143
- lock ( _isConnectedLock )
144
- {
145
- if ( _isConnected )
146
- {
147
- HandleCommunicationError ( _socket , ex ) ;
148
- _isConnected = false ;
149
- }
150
- }
143
+ HandleCommunicationError ( _socket , ex ) ;
151
144
}
152
145
catch ( ObjectDisposedException )
153
146
{
@@ -179,14 +172,7 @@ public void Send(byte[] message)
179
172
}
180
173
catch ( SocketException ex )
181
174
{
182
- lock ( _isConnectedLock )
183
- {
184
- if ( _isConnected )
185
- {
186
- HandleCommunicationError ( _socket , ex ) ;
187
- _isConnected = false ;
188
- }
189
- }
175
+ HandleCommunicationError ( _socket , ex ) ;
190
176
}
191
177
catch ( ObjectDisposedException )
192
178
{
@@ -222,15 +208,7 @@ public byte[] SendReceive(byte[] message)
222
208
}
223
209
catch ( SocketException ex )
224
210
{
225
- lock ( _isConnectedLock )
226
- {
227
- if ( _isConnected )
228
- {
229
- HandleCommunicationError ( _socket , ex ) ;
230
- _isConnected = false ;
231
- }
232
- }
233
-
211
+ HandleCommunicationError ( _socket , ex ) ;
234
212
return null ;
235
213
}
236
214
catch ( ObjectDisposedException )
@@ -273,10 +251,7 @@ public void Close()
273
251
_socket . Close ( ) ;
274
252
275
253
// No longer connected
276
- lock ( _isConnectedLock )
277
- {
278
- _isConnected = false ;
279
- }
254
+ _isConnected = false ;
280
255
}
281
256
282
257
/// <summary>
@@ -322,15 +297,7 @@ private void ConnectCallback(IAsyncResult asyncResult)
322
297
}
323
298
catch ( SocketException ex )
324
299
{
325
- lock ( _isConnectedLock )
326
- {
327
- if ( _isConnected )
328
- {
329
- HandleCommunicationError ( _socket , ex ) ;
330
- _isConnected = false ;
331
- }
332
- }
333
-
300
+ HandleCommunicationError ( _socket , ex ) ;
334
301
return ;
335
302
}
336
303
catch ( ObjectDisposedException )
@@ -355,13 +322,55 @@ private void ConnectCallback(IAsyncResult asyncResult)
355
322
// Post a receive to the socket as the client will be continuously receiving messages to be pushed to the queue
356
323
_socket . BeginReceive ( messageState . Buffer , 0 , messageState . Buffer . Length , 0 , ReceiveCallback , messageState ) ;
357
324
325
+ // Spin up the keep-alive
326
+ KeepAlive ( _socket ) ;
327
+
358
328
// Process all incoming messages
359
329
var processMessageState = _messageStatePool . Pop ( ) ;
360
330
processMessageState . Handler = _socket ;
361
331
362
332
ProcessReceivedMessage ( processMessageState ) ;
363
333
}
364
334
335
+ private void KeepAlive ( Socket handler )
336
+ {
337
+ int availableTest = 0 ;
338
+
339
+ // If the socket is disposed, we're done
340
+ try
341
+ {
342
+ availableTest = handler . Available ;
343
+ }
344
+ catch ( ObjectDisposedException )
345
+ {
346
+ // Peace out!
347
+ return ;
348
+ }
349
+
350
+ // Do the keep-alive
351
+ try
352
+ {
353
+ handler . BeginSend ( _controlBytesPlaceholder , 0 , _controlBytesPlaceholder . Length , 0 , KeepAliveCallback , handler ) ;
354
+ }
355
+ catch ( SocketException ex )
356
+ {
357
+ HandleCommunicationError ( handler , ex ) ;
358
+ }
359
+ catch ( ObjectDisposedException )
360
+ {
361
+ // If disposed, handle communication error was already done and we're just catching up on other threads. Supress it.
362
+ }
363
+ }
364
+
365
+ private void KeepAliveCallback ( IAsyncResult asyncResult )
366
+ {
367
+ SendCallback ( asyncResult ) ;
368
+
369
+ Thread . Sleep ( 1000 ) ;
370
+
371
+ KeepAlive ( ( Socket ) asyncResult . AsyncState ) ;
372
+ }
373
+
365
374
private void SendCallback ( IAsyncResult asyncResult )
366
375
{
367
376
// Get the socket to complete on
@@ -374,15 +383,7 @@ private void SendCallback(IAsyncResult asyncResult)
374
383
}
375
384
catch ( SocketException ex )
376
385
{
377
- lock ( _isConnectedLock )
378
- {
379
- if ( _isConnected )
380
- {
381
- HandleCommunicationError ( _socket , ex ) ;
382
- _isConnected = false ;
383
- }
384
- }
385
-
386
+ HandleCommunicationError ( _socket , ex ) ;
386
387
return ;
387
388
}
388
389
catch ( ObjectDisposedException )
@@ -405,15 +406,7 @@ private void ReceiveCallback(IAsyncResult asyncResult)
405
406
}
406
407
catch ( SocketException ex )
407
408
{
408
- lock ( _isConnectedLock )
409
- {
410
- if ( _isConnected )
411
- {
412
- HandleCommunicationError ( _socket , ex ) ;
413
- _isConnected = false ;
414
- }
415
- }
416
-
409
+ HandleCommunicationError ( _socket , ex ) ;
417
410
return ;
418
411
}
419
412
catch ( ObjectDisposedException )
@@ -491,27 +484,34 @@ private void ProcessReceivedMessage(MessageState messageState)
491
484
492
485
// Have control bytes, get message bytes
493
486
494
- // Initialize buffer if needed
495
- if ( messageState . Buffer == null )
487
+ // SPECIAL CASE: if empty message, skip a bunch of stuff
488
+ if ( messageState . BytesToRead != 0 )
496
489
{
497
- messageState . Buffer = new byte [ messageState . BytesToRead ] ;
498
- }
490
+ // Initialize buffer if needed
491
+ if ( messageState . Buffer == null )
492
+ {
493
+ messageState . Buffer = new byte [ messageState . BytesToRead ] ;
494
+ }
499
495
500
- var bytesAvailable = bytesRead - currentOffset ;
496
+ var bytesAvailable = bytesRead - currentOffset ;
501
497
502
- var bytesToCopy = Math . Min ( messageState . BytesToRead , bytesAvailable ) ;
498
+ var bytesToCopy = Math . Min ( messageState . BytesToRead , bytesAvailable ) ;
503
499
504
- // Copy bytes to buffer
505
- Buffer . BlockCopy ( buffer , currentOffset , messageState . Buffer , messageState . Buffer . Length - messageState . BytesToRead , bytesToCopy ) ;
500
+ // Copy bytes to buffer
501
+ Buffer . BlockCopy ( buffer , currentOffset , messageState . Buffer , messageState . Buffer . Length - messageState . BytesToRead , bytesToCopy ) ;
506
502
507
- currentOffset += bytesToCopy ;
508
- messageState . BytesToRead -= bytesToCopy ;
503
+ currentOffset += bytesToCopy ;
504
+ messageState . BytesToRead -= bytesToCopy ;
505
+ }
509
506
510
507
// Check if we're done
511
508
if ( messageState . BytesToRead == 0 )
512
509
{
513
- // Done, add to complete received messages
514
- CompleteMessage ( messageState . Handler , messageState . ThreadId , messageState . Buffer ) ;
510
+ if ( messageState . Buffer != null )
511
+ {
512
+ // Done, add to complete received messages
513
+ CompleteMessage ( messageState . Handler , messageState . ThreadId , messageState . Buffer ) ;
514
+ }
515
515
516
516
// Reset message state
517
517
messageState . Buffer = null ;
@@ -566,24 +566,30 @@ private void CompleteMessage(Socket handler, int threadId, byte[] message)
566
566
/// <param name="ex">The exception that the socket raised.</param>
567
567
private void HandleCommunicationError ( Socket socket , Exception ex )
568
568
{
569
- // Close the socket
570
- try
571
- {
572
- socket . Shutdown ( SocketShutdown . Both ) ;
573
- }
574
- catch ( SocketException )
569
+ lock ( socket )
575
570
{
576
- // Socket was not able to be shutdown, likely because it was never opened
577
- }
578
- catch ( ObjectDisposedException )
579
- {
580
- // Socket was already closed/disposed, so return out to prevent raising the Error event multiple times
581
- // This is most likely to happen when an error occurs during heavily multithreaded use
582
- return ;
571
+ // Close the socket
572
+ try
573
+ {
574
+ socket . Shutdown ( SocketShutdown . Both ) ;
575
+ }
576
+ catch ( SocketException )
577
+ {
578
+ // Socket was not able to be shutdown, likely because it was never opened
579
+ }
580
+ catch ( ObjectDisposedException )
581
+ {
582
+ // Socket was already closed/disposed, so return out to prevent raising the Error event multiple times
583
+ // This is most likely to happen when an error occurs during heavily multithreaded use
584
+ return ;
585
+ }
586
+
587
+ // Close / dispose the socket
588
+ socket . Close ( ) ;
583
589
}
584
590
585
- // Close / dispose the socket
586
- socket . Close ( ) ;
591
+ // No longer connected
592
+ _isConnected = false ;
587
593
588
594
// Clear receive queue for this client
589
595
_receiveBufferQueue . Clear ( ) ;
0 commit comments