@@ -24,13 +24,6 @@ import type { ServerSessionId } from './sessions';
24
24
import { CSOTTimeoutContext , type TimeoutContext } from './timeout' ;
25
25
import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
26
26
27
- /** @internal */
28
- const kCursorStream = Symbol ( 'cursorStream' ) ;
29
- /** @internal */
30
- const kClosed = Symbol ( 'closed' ) ;
31
- /** @internal */
32
- const kMode = Symbol ( 'mode' ) ;
33
-
34
27
const CHANGE_STREAM_OPTIONS = [
35
28
'resumeAfter' ,
36
29
'startAfter' ,
@@ -584,14 +577,14 @@ export class ChangeStream<
584
577
namespace : MongoDBNamespace ;
585
578
type : symbol ;
586
579
/** @internal */
587
- cursor : ChangeStreamCursor < TSchema , TChange > ;
580
+ private cursor : ChangeStreamCursor < TSchema , TChange > ;
588
581
streamOptions ?: CursorStreamOptions ;
589
582
/** @internal */
590
- [ kCursorStream ] ?: Readable & AsyncIterable < TChange > ;
583
+ private cursorStream ?: Readable & AsyncIterable < TChange > ;
591
584
/** @internal */
592
- [ kClosed ] : boolean ;
585
+ private isClosed : boolean ;
593
586
/** @internal */
594
- [ kMode ] : false | 'iterator' | 'emitter' ;
587
+ private mode : false | 'iterator' | 'emitter' ;
595
588
596
589
/** @event */
597
590
static readonly RESPONSE = RESPONSE ;
@@ -668,8 +661,8 @@ export class ChangeStream<
668
661
// Create contained Change Stream cursor
669
662
this . cursor = this . _createChangeStreamCursor ( options ) ;
670
663
671
- this [ kClosed ] = false ;
672
- this [ kMode ] = false ;
664
+ this . isClosed = false ;
665
+ this . mode = false ;
673
666
674
667
// Listen for any `change` listeners being added to ChangeStream
675
668
this . on ( 'newListener' , eventName => {
@@ -680,7 +673,7 @@ export class ChangeStream<
680
673
681
674
this . on ( 'removeListener' , eventName => {
682
675
if ( eventName === 'change' && this . listenerCount ( 'change' ) === 0 && this . cursor ) {
683
- this [ kCursorStream ] ?. removeAllListeners ( 'data' ) ;
676
+ this . cursorStream ?. removeAllListeners ( 'data' ) ;
684
677
}
685
678
} ) ;
686
679
@@ -692,11 +685,6 @@ export class ChangeStream<
692
685
}
693
686
}
694
687
695
- /** @internal */
696
- get cursorStream ( ) : ( Readable & AsyncIterable < TChange > ) | undefined {
697
- return this [ kCursorStream ] ;
698
- }
699
-
700
688
/** The cached resume token that is used to resume after the most recently returned change. */
701
689
get resumeToken ( ) : ResumeToken {
702
690
return this . cursor ?. resumeToken ;
@@ -826,8 +814,8 @@ export class ChangeStream<
826
814
}
827
815
828
816
/** Is the cursor closed */
829
- get closed ( ) : boolean {
830
- return this [ kClosed ] || this . cursor . closed ;
817
+ public get closed ( ) : boolean {
818
+ return this . isClosed || this . cursor . closed ;
831
819
}
832
820
833
821
/**
@@ -836,7 +824,7 @@ export class ChangeStream<
836
824
async close ( ) : Promise < void > {
837
825
this . timeoutContext ?. clear ( ) ;
838
826
this . timeoutContext = undefined ;
839
- this [ kClosed ] = true ;
827
+ this . isClosed = true ;
840
828
841
829
const cursor = this . cursor ;
842
830
try {
@@ -865,24 +853,24 @@ export class ChangeStream<
865
853
866
854
/** @internal */
867
855
private _setIsEmitter ( ) : void {
868
- if ( this [ kMode ] === 'iterator' ) {
856
+ if ( this . mode === 'iterator' ) {
869
857
// TODO(NODE-3485): Replace with MongoChangeStreamModeError
870
858
throw new MongoAPIError (
871
859
'ChangeStream cannot be used as an EventEmitter after being used as an iterator'
872
860
) ;
873
861
}
874
- this [ kMode ] = 'emitter' ;
862
+ this . mode = 'emitter' ;
875
863
}
876
864
877
865
/** @internal */
878
866
private _setIsIterator ( ) : void {
879
- if ( this [ kMode ] === 'emitter' ) {
867
+ if ( this . mode === 'emitter' ) {
880
868
// TODO(NODE-3485): Replace with MongoChangeStreamModeError
881
869
throw new MongoAPIError (
882
870
'ChangeStream cannot be used as an iterator after being used as an EventEmitter'
883
871
) ;
884
872
}
885
- this [ kMode ] = 'iterator' ;
873
+ this . mode = 'iterator' ;
886
874
}
887
875
888
876
/**
@@ -947,8 +935,8 @@ export class ChangeStream<
947
935
/** @internal */
948
936
private _streamEvents ( cursor : ChangeStreamCursor < TSchema , TChange > ) : void {
949
937
this . _setIsEmitter ( ) ;
950
- const stream = this [ kCursorStream ] ?? cursor . stream ( ) ;
951
- this [ kCursorStream ] = stream ;
938
+ const stream = this . cursorStream ?? cursor . stream ( ) ;
939
+ this . cursorStream = stream ;
952
940
stream . on ( 'data' , change => {
953
941
try {
954
942
const processedChange = this . _processChange ( change ) ;
@@ -963,18 +951,18 @@ export class ChangeStream<
963
951
964
952
/** @internal */
965
953
private _endStream ( ) : void {
966
- const cursorStream = this [ kCursorStream ] ;
954
+ const cursorStream = this . cursorStream ;
967
955
if ( cursorStream ) {
968
956
[ 'data' , 'close' , 'end' , 'error' ] . forEach ( event => cursorStream . removeAllListeners ( event ) ) ;
969
957
cursorStream . destroy ( ) ;
970
958
}
971
959
972
- this [ kCursorStream ] = undefined ;
960
+ this . cursorStream = undefined ;
973
961
}
974
962
975
963
/** @internal */
976
964
private _processChange ( change : TChange | null ) : TChange {
977
- if ( this [ kClosed ] ) {
965
+ if ( this . isClosed ) {
978
966
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
979
967
throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
980
968
}
@@ -1002,7 +990,7 @@ export class ChangeStream<
1002
990
/** @internal */
1003
991
private _processErrorStreamMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
1004
992
// If the change stream has been closed explicitly, do not process error.
1005
- if ( this [ kClosed ] ) return ;
993
+ if ( this . isClosed ) return ;
1006
994
1007
995
if (
1008
996
cursorInitialized &&
@@ -1034,7 +1022,7 @@ export class ChangeStream<
1034
1022
1035
1023
/** @internal */
1036
1024
private async _processErrorIteratorMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
1037
- if ( this [ kClosed ] ) {
1025
+ if ( this . isClosed ) {
1038
1026
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
1039
1027
throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
1040
1028
}
0 commit comments