2
2
const debug = require ( 'debug' ) ( 'simple-peer' )
3
3
const getBrowserRTC = require ( 'get-browser-rtc' )
4
4
const randombytes = require ( 'randombytes' )
5
- const stream = require ( 'readable-stream ' )
5
+ const { Duplex } = require ( 'streamx ' )
6
6
const queueMicrotask = require ( 'queue-microtask' ) // TODO: remove when Node 10 is not supported
7
7
const errCode = require ( 'err-code' )
8
8
const { Buffer } = require ( 'buffer' )
@@ -25,14 +25,16 @@ function warn (message) {
25
25
* Duplex stream.
26
26
* @param {Object } opts
27
27
*/
28
- class Peer extends stream . Duplex {
28
+ class Peer extends Duplex {
29
29
constructor ( opts ) {
30
30
opts = Object . assign ( {
31
31
allowHalfOpen : false
32
32
} , opts )
33
33
34
34
super ( opts )
35
35
36
+ this . __objectMode = ! ! opts . objectMode // streamx is objectMode by default, so implement readable's fuctionality
37
+
36
38
this . _id = randombytes ( 4 ) . toString ( 'hex' ) . slice ( 0 , 7 )
37
39
this . _debug ( 'new peer %o' , opts )
38
40
@@ -52,8 +54,7 @@ class Peer extends stream.Duplex {
52
54
this . allowHalfTrickle = opts . allowHalfTrickle !== undefined ? opts . allowHalfTrickle : false
53
55
this . iceCompleteTimeout = opts . iceCompleteTimeout || ICECOMPLETE_TIMEOUT
54
56
55
- this . destroyed = false
56
- this . destroying = false
57
+ this . _destroying = false
57
58
this . _connected = false
58
59
59
60
this . remoteAddress = undefined
@@ -100,7 +101,7 @@ class Peer extends stream.Duplex {
100
101
try {
101
102
this . _pc = new ( this . _wrtc . RTCPeerConnection ) ( this . config )
102
103
} catch ( err ) {
103
- this . destroy ( errCode ( err , 'ERR_PC_CONSTRUCTOR' ) )
104
+ this . __destroy ( errCode ( err , 'ERR_PC_CONSTRUCTOR' ) )
104
105
return
105
106
}
106
107
@@ -127,7 +128,7 @@ class Peer extends stream.Duplex {
127
128
// HACK: Fix for odd Firefox behavior, see: https://github.com/feross/simple-peer/pull/783
128
129
if ( typeof this . _pc . peerIdentity === 'object' ) {
129
130
this . _pc . peerIdentity . catch ( err => {
130
- this . destroy ( errCode ( err , 'ERR_PC_PEER_IDENTITY' ) )
131
+ this . __destroy ( errCode ( err , 'ERR_PC_PEER_IDENTITY' ) )
131
132
} )
132
133
}
133
134
@@ -180,7 +181,7 @@ class Peer extends stream.Duplex {
180
181
}
181
182
182
183
signal ( data ) {
183
- if ( this . destroying ) return
184
+ if ( this . _destroying ) return
184
185
if ( this . destroyed ) throw errCode ( new Error ( 'cannot signal after peer is destroyed' ) , 'ERR_DESTROYED' )
185
186
if ( typeof data === 'string' ) {
186
187
try {
@@ -219,11 +220,11 @@ class Peer extends stream.Duplex {
219
220
if ( this . _pc . remoteDescription . type === 'offer' ) this . _createAnswer ( )
220
221
} )
221
222
. catch ( err => {
222
- this . destroy ( errCode ( err , 'ERR_SET_REMOTE_DESCRIPTION' ) )
223
+ this . __destroy ( errCode ( err , 'ERR_SET_REMOTE_DESCRIPTION' ) )
223
224
} )
224
225
}
225
226
if ( ! data . sdp && ! data . candidate && ! data . renegotiate && ! data . transceiverRequest ) {
226
- this . destroy ( errCode ( new Error ( 'signal() called with invalid signal data' ) , 'ERR_SIGNALING' ) )
227
+ this . __destroy ( errCode ( new Error ( 'signal() called with invalid signal data' ) , 'ERR_SIGNALING' ) )
227
228
}
228
229
}
229
230
@@ -234,7 +235,7 @@ class Peer extends stream.Duplex {
234
235
if ( ! iceCandidateObj . address || iceCandidateObj . address . endsWith ( '.local' ) ) {
235
236
warn ( 'Ignoring unsupported ICE candidate.' )
236
237
} else {
237
- this . destroy ( errCode ( err , 'ERR_ADD_ICE_CANDIDATE' ) )
238
+ this . __destroy ( errCode ( err , 'ERR_ADD_ICE_CANDIDATE' ) )
238
239
}
239
240
} )
240
241
}
@@ -244,7 +245,7 @@ class Peer extends stream.Duplex {
244
245
* @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob } chunk
245
246
*/
246
247
send ( chunk ) {
247
- if ( this . destroying ) return
248
+ if ( this . _destroying ) return
248
249
if ( this . destroyed ) throw errCode ( new Error ( 'cannot send after peer is destroyed' ) , 'ERR_DESTROYED' )
249
250
this . _channel . send ( chunk )
250
251
}
@@ -255,7 +256,7 @@ class Peer extends stream.Duplex {
255
256
* @param {Object } init
256
257
*/
257
258
addTransceiver ( kind , init ) {
258
- if ( this . destroying ) return
259
+ if ( this . _destroying ) return
259
260
if ( this . destroyed ) throw errCode ( new Error ( 'cannot addTransceiver after peer is destroyed' ) , 'ERR_DESTROYED' )
260
261
this . _debug ( 'addTransceiver()' )
261
262
@@ -264,7 +265,7 @@ class Peer extends stream.Duplex {
264
265
this . _pc . addTransceiver ( kind , init )
265
266
this . _needsNegotiation ( )
266
267
} catch ( err ) {
267
- this . destroy ( errCode ( err , 'ERR_ADD_TRANSCEIVER' ) )
268
+ this . __destroy ( errCode ( err , 'ERR_ADD_TRANSCEIVER' ) )
268
269
}
269
270
} else {
270
271
this . emit ( 'signal' , { // request initiator to renegotiate
@@ -279,7 +280,7 @@ class Peer extends stream.Duplex {
279
280
* @param {MediaStream } stream
280
281
*/
281
282
addStream ( stream ) {
282
- if ( this . destroying ) return
283
+ if ( this . _destroying ) return
283
284
if ( this . destroyed ) throw errCode ( new Error ( 'cannot addStream after peer is destroyed' ) , 'ERR_DESTROYED' )
284
285
this . _debug ( 'addStream()' )
285
286
@@ -294,7 +295,7 @@ class Peer extends stream.Duplex {
294
295
* @param {MediaStream } stream
295
296
*/
296
297
addTrack ( track , stream ) {
297
- if ( this . destroying ) return
298
+ if ( this . _destroying ) return
298
299
if ( this . destroyed ) throw errCode ( new Error ( 'cannot addTrack after peer is destroyed' ) , 'ERR_DESTROYED' )
299
300
this . _debug ( 'addTrack()' )
300
301
@@ -319,7 +320,7 @@ class Peer extends stream.Duplex {
319
320
* @param {MediaStream } stream
320
321
*/
321
322
replaceTrack ( oldTrack , newTrack , stream ) {
322
- if ( this . destroying ) return
323
+ if ( this . _destroying ) return
323
324
if ( this . destroyed ) throw errCode ( new Error ( 'cannot replaceTrack after peer is destroyed' ) , 'ERR_DESTROYED' )
324
325
this . _debug ( 'replaceTrack()' )
325
326
@@ -333,7 +334,7 @@ class Peer extends stream.Duplex {
333
334
if ( sender . replaceTrack != null ) {
334
335
sender . replaceTrack ( newTrack )
335
336
} else {
336
- this . destroy ( errCode ( new Error ( 'replaceTrack is not supported in this browser' ) , 'ERR_UNSUPPORTED_REPLACETRACK' ) )
337
+ this . __destroy ( errCode ( new Error ( 'replaceTrack is not supported in this browser' ) , 'ERR_UNSUPPORTED_REPLACETRACK' ) )
337
338
}
338
339
}
339
340
@@ -343,7 +344,7 @@ class Peer extends stream.Duplex {
343
344
* @param {MediaStream } stream
344
345
*/
345
346
removeTrack ( track , stream ) {
346
- if ( this . destroying ) return
347
+ if ( this . _destroying ) return
347
348
if ( this . destroyed ) throw errCode ( new Error ( 'cannot removeTrack after peer is destroyed' ) , 'ERR_DESTROYED' )
348
349
this . _debug ( 'removeSender()' )
349
350
@@ -359,7 +360,7 @@ class Peer extends stream.Duplex {
359
360
if ( err . name === 'NS_ERROR_UNEXPECTED' ) {
360
361
this . _sendersAwaitingStable . push ( sender ) // HACK: Firefox must wait until (signalingState === stable) https://bugzilla.mozilla.org/show_bug.cgi?id=1133874
361
362
} else {
362
- this . destroy ( errCode ( err , 'ERR_REMOVE_TRACK' ) )
363
+ this . __destroy ( errCode ( err , 'ERR_REMOVE_TRACK' ) )
363
364
}
364
365
}
365
366
this . _needsNegotiation ( )
@@ -370,7 +371,7 @@ class Peer extends stream.Duplex {
370
371
* @param {MediaStream } stream
371
372
*/
372
373
removeStream ( stream ) {
373
- if ( this . destroying ) return
374
+ if ( this . _destroying ) return
374
375
if ( this . destroyed ) throw errCode ( new Error ( 'cannot removeStream after peer is destroyed' ) , 'ERR_DESTROYED' )
375
376
this . _debug ( 'removeSenders()' )
376
377
@@ -396,7 +397,7 @@ class Peer extends stream.Duplex {
396
397
}
397
398
398
399
negotiate ( ) {
399
- if ( this . destroying ) return
400
+ if ( this . _destroying ) return
400
401
if ( this . destroyed ) throw errCode ( new Error ( 'cannot negotiate after peer is destroyed' ) , 'ERR_DESTROYED' )
401
402
402
403
if ( this . initiator ) {
@@ -424,30 +425,23 @@ class Peer extends stream.Duplex {
424
425
this . _isNegotiating = true
425
426
}
426
427
427
- // TODO: Delete this method once readable-stream is updated to contain a default
428
- // implementation of destroy() that automatically calls _destroy()
429
- // See: https://github.com/nodejs/readable-stream/issues/283
430
- destroy ( err ) {
431
- this . _destroy ( err , ( ) => { } )
428
+ _final ( cb ) {
429
+ if ( ! this . _readableState . ended ) this . push ( null )
430
+ cb ( null )
432
431
}
433
432
434
- _destroy ( err , cb ) {
435
- if ( this . destroyed || this . destroying ) return
436
- this . destroying = true
437
-
438
- this . _debug ( 'destroying (error: %s)' , err && ( err . message || err ) )
439
-
440
- queueMicrotask ( ( ) => { // allow events concurrent with the call to _destroy() to fire (see #692)
441
- this . destroyed = true
442
- this . destroying = false
443
-
444
- this . _debug ( 'destroy (error: %s)' , err && ( err . message || err ) )
433
+ __destroy ( err ) {
434
+ this . end ( )
435
+ this . _destroy ( ( ) => { } , err )
436
+ }
445
437
446
- this . readable = this . writable = false
438
+ _destroy ( cb , err ) {
439
+ if ( this . destroyed || this . _destroying ) return
440
+ this . _destroying = true
447
441
448
- if ( ! this . _readableState . ended ) this . push ( null )
449
- if ( ! this . _writableState . finished ) this . end ( )
442
+ this . _debug ( 'destroying (error: %s)' , err && ( err . message || err ) )
450
443
444
+ setTimeout ( ( ) => { // allow events concurrent with the call to _destroy() to fire (see #692)
451
445
this . _connected = false
452
446
this . _pcReady = false
453
447
this . _channelReady = false
@@ -492,19 +486,17 @@ class Peer extends stream.Duplex {
492
486
}
493
487
this . _pc = null
494
488
this . _channel = null
495
-
496
489
if ( err ) this . emit ( 'error' , err )
497
- this . emit ( 'close' )
498
490
cb ( )
499
- } )
491
+ } , 0 )
500
492
}
501
493
502
494
_setupData ( event ) {
503
495
if ( ! event . channel ) {
504
496
// In some situations `pc.createDataChannel()` returns `undefined` (in wrtc),
505
497
// which is invalid behavior. Handle it gracefully.
506
498
// See: https://github.com/feross/simple-peer/issues/163
507
- return this . destroy ( errCode ( new Error ( 'Data channel event is missing `channel` property' ) , 'ERR_DATA_CHANNEL' ) )
499
+ return this . __destroy ( errCode ( new Error ( 'Data channel event is missing `channel` property' ) , 'ERR_DATA_CHANNEL' ) )
508
500
}
509
501
510
502
this . _channel = event . channel
@@ -532,7 +524,7 @@ class Peer extends stream.Duplex {
532
524
const err = event . error instanceof Error
533
525
? event . error
534
526
: new Error ( `Datachannel error: ${ event . message } ${ event . filename } :${ event . lineno } :${ event . colno } ` )
535
- this . destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
527
+ this . __destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
536
528
}
537
529
538
530
// HACK: Chrome will sometimes get stuck in readyState "closing", let's check for this condition
@@ -548,16 +540,14 @@ class Peer extends stream.Duplex {
548
540
} , CHANNEL_CLOSING_TIMEOUT )
549
541
}
550
542
551
- _read ( ) { }
552
-
553
- _write ( chunk , encoding , cb ) {
543
+ _write ( chunk , cb ) {
554
544
if ( this . destroyed ) return cb ( errCode ( new Error ( 'cannot write after peer is destroyed' ) , 'ERR_DATA_CHANNEL' ) )
555
545
556
546
if ( this . _connected ) {
557
547
try {
558
548
this . send ( chunk )
559
549
} catch ( err ) {
560
- return this . destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
550
+ return this . __destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
561
551
}
562
552
if ( this . _channel . bufferedAmount > MAX_BUFFERED_AMOUNT ) {
563
553
this . _debug ( 'start backpressure: bufferedAmount %d' , this . _channel . bufferedAmount )
@@ -580,7 +570,7 @@ class Peer extends stream.Duplex {
580
570
// Wait a bit before destroying so the socket flushes.
581
571
// TODO: is there a more reliable way to accomplish this?
582
572
const destroySoon = ( ) => {
583
- setTimeout ( ( ) => this . destroy ( ) , 1000 )
573
+ setTimeout ( ( ) => this . __destroy ( ) , 1000 )
584
574
}
585
575
586
576
if ( this . _connected ) {
@@ -631,15 +621,15 @@ class Peer extends stream.Duplex {
631
621
}
632
622
633
623
const onError = err => {
634
- this . destroy ( errCode ( err , 'ERR_SET_LOCAL_DESCRIPTION' ) )
624
+ this . __destroy ( errCode ( err , 'ERR_SET_LOCAL_DESCRIPTION' ) )
635
625
}
636
626
637
627
this . _pc . setLocalDescription ( offer )
638
628
. then ( onSuccess )
639
629
. catch ( onError )
640
630
} )
641
631
. catch ( err => {
642
- this . destroy ( errCode ( err , 'ERR_CREATE_OFFER' ) )
632
+ this . __destroy ( errCode ( err , 'ERR_CREATE_OFFER' ) )
643
633
} )
644
634
}
645
635
@@ -681,22 +671,22 @@ class Peer extends stream.Duplex {
681
671
}
682
672
683
673
const onError = err => {
684
- this . destroy ( errCode ( err , 'ERR_SET_LOCAL_DESCRIPTION' ) )
674
+ this . __destroy ( errCode ( err , 'ERR_SET_LOCAL_DESCRIPTION' ) )
685
675
}
686
676
687
677
this . _pc . setLocalDescription ( answer )
688
678
. then ( onSuccess )
689
679
. catch ( onError )
690
680
} )
691
681
. catch ( err => {
692
- this . destroy ( errCode ( err , 'ERR_CREATE_ANSWER' ) )
682
+ this . __destroy ( errCode ( err , 'ERR_CREATE_ANSWER' ) )
693
683
} )
694
684
}
695
685
696
686
_onConnectionStateChange ( ) {
697
- if ( this . destroyed ) return
687
+ if ( this . destroyed || this . _destroying ) return
698
688
if ( this . _pc . connectionState === 'failed' ) {
699
- this . destroy ( errCode ( new Error ( 'Connection failed.' ) , 'ERR_CONNECTION_FAILURE' ) )
689
+ this . __destroy ( errCode ( new Error ( 'Connection failed.' ) , 'ERR_CONNECTION_FAILURE' ) )
700
690
}
701
691
}
702
692
@@ -717,10 +707,10 @@ class Peer extends stream.Duplex {
717
707
this . _maybeReady ( )
718
708
}
719
709
if ( iceConnectionState === 'failed' ) {
720
- this . destroy ( errCode ( new Error ( 'Ice connection failed.' ) , 'ERR_ICE_CONNECTION_FAILURE' ) )
710
+ this . __destroy ( errCode ( new Error ( 'Ice connection failed.' ) , 'ERR_ICE_CONNECTION_FAILURE' ) )
721
711
}
722
712
if ( iceConnectionState === 'closed' ) {
723
- this . destroy ( errCode ( new Error ( 'Ice connection closed.' ) , 'ERR_ICE_CONNECTION_CLOSED' ) )
713
+ this . __destroy ( errCode ( new Error ( 'Ice connection closed.' ) , 'ERR_ICE_CONNECTION_CLOSED' ) )
724
714
}
725
715
}
726
716
@@ -781,10 +771,10 @@ class Peer extends stream.Duplex {
781
771
782
772
// HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339
783
773
const findCandidatePair = ( ) => {
784
- if ( this . destroyed ) return
774
+ if ( this . destroyed || this . _destroying ) return
785
775
786
776
this . getStats ( ( err , items ) => {
787
- if ( this . destroyed ) return
777
+ if ( this . destroyed || this . _destroying ) return
788
778
789
779
// Treat getStats error as non-fatal. It's not essential.
790
780
if ( err ) items = [ ]
@@ -889,7 +879,7 @@ class Peer extends stream.Duplex {
889
879
try {
890
880
this . send ( this . _chunk )
891
881
} catch ( err ) {
892
- return this . destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
882
+ return this . __destroy ( errCode ( err , 'ERR_DATA_CHANNEL' ) )
893
883
}
894
884
this . _chunk = null
895
885
this . _debug ( 'sent chunk from "write before connect"' )
@@ -972,7 +962,7 @@ class Peer extends stream.Duplex {
972
962
_onChannelMessage ( event ) {
973
963
if ( this . destroyed ) return
974
964
let data = event . data
975
- if ( data instanceof ArrayBuffer ) data = Buffer . from ( data )
965
+ if ( data instanceof ArrayBuffer || this . __objectMode === false ) data = Buffer . from ( data )
976
966
this . push ( data )
977
967
}
978
968
@@ -994,7 +984,7 @@ class Peer extends stream.Duplex {
994
984
_onChannelClose ( ) {
995
985
if ( this . destroyed ) return
996
986
this . _debug ( 'on channel close' )
997
- this . destroy ( )
987
+ this . __destroy ( )
998
988
}
999
989
1000
990
_onTrack ( event ) {
0 commit comments