3
3
import * as fs from 'fs' ;
4
4
import * as readline from 'readline' ;
5
5
import * as path from 'path' ;
6
+ import * as zlib from 'zlib' ;
6
7
import { StatMessageModel , StatsProvider , StatsListener } from './stats-provider' ;
7
8
9
+ interface ReplayStatMessageHeader {
10
+ encoding ?: string ;
11
+ }
12
+
13
+ export class ReplayResults {
14
+ statLinesRead : number = 0 ;
15
+ statEventsSent : number = 0 ;
16
+ }
17
+
8
18
export class ReplayStatsProvider extends StatsProvider {
9
19
private _replayFilePath : string ;
10
20
private _replayStreamReader : readline . Interface | null ;
11
21
private _simTickFreqency : number ;
12
22
private _simTickPeriod : number ;
13
23
private _simTickCurrent : number ;
14
24
private _simTimeoutId : NodeJS . Timeout | null ;
25
+ private _replayHeader : ReplayStatMessageHeader | undefined ;
26
+ private _base64Gzipped : boolean ;
15
27
private _pendingStats : StatMessageModel [ ] ;
28
+ private _replayResults : ReplayResults ;
29
+ private _onComplete : ( ( results : ReplayResults ) => void ) | undefined ;
16
30
17
31
// resume stream when lines drop below this threshold
18
32
private static readonly PENDING_STATS_BUFFER_MIN = 256 ;
19
33
// pause stream when lines exceed this threshold
20
34
private static readonly PENDING_STATS_BUFFER_MAX = ReplayStatsProvider . PENDING_STATS_BUFFER_MIN * 2 ;
35
+ // supported encodings
36
+ private readonly ENCODING_BASE64_GZIP = 'base64-gzip' ;
37
+ private readonly ENCODING_UTF8 = 'utf8' ;
21
38
22
39
// ticks per second (frequency)
23
40
private readonly MILLIS_PER_SECOND = 1000 ;
@@ -33,38 +50,53 @@ export class ReplayStatsProvider extends StatsProvider {
33
50
this . _simTickPeriod = this . _calcSimPeriod ( this . _simTickFreqency ) ;
34
51
this . _simTickCurrent = 0 ;
35
52
this . _simTimeoutId = null ;
53
+ this . _base64Gzipped = false ;
36
54
this . _pendingStats = [ ] ;
55
+ this . _replayResults = new ReplayResults ( ) ;
56
+ this . _onComplete = undefined ;
37
57
}
38
58
39
- public override start ( ) {
59
+ public override start ( ) : Promise < ReplayResults > {
40
60
this . stop ( ) ;
41
61
42
62
const fileStream = fs . createReadStream ( this . _replayFilePath ) ;
43
63
this . _replayStreamReader = readline . createInterface ( {
44
64
input : fileStream ,
45
65
crlfDelay : Infinity ,
46
66
} ) ;
47
-
48
- this . _replayStreamReader . on ( 'line ' , line => this . _onReadNextStatMessage ( line ) ) ;
49
- this . _replayStreamReader . on ( 'close ' , ( ) => this . _onCloseStream ( ) ) ;
67
+ this . _replayStreamReader . on ( 'line' , line => this . _onReadNextLineFromReplayStream ( line ) ) ;
68
+ this . _replayStreamReader . on ( 'close ' , ( ) => this . _onCloseReplayStream ( ) ) ;
69
+ this . _replayStreamReader . on ( 'error ' , ( ) => this . _errorCloseReplayStream ( 'Failed to read replay file.' ) ) ;
50
70
51
71
// begin simulation
52
72
this . _simTimeoutId = setTimeout ( ( ) => this . _updateSim ( ) , this . _simTickPeriod ) ;
53
73
this . _fireSpeedChanged ( ) ;
54
74
this . _firePauseChanged ( ) ;
75
+
76
+ return new Promise < ReplayResults > ( resolve => {
77
+ this . _onComplete = resolve ;
78
+ } ) ;
55
79
}
56
80
57
81
public override stop ( ) {
82
+ this . _fireStopped ( ) ;
58
83
if ( this . _simTimeoutId ) {
59
84
clearTimeout ( this . _simTimeoutId ) ;
60
85
}
61
- this . _replayStreamReader ?. close ( ) ;
86
+ if ( this . _onComplete ) {
87
+ this . _onComplete ( this . _replayResults ) ;
88
+ this . _onComplete = undefined ;
89
+ }
90
+ if ( this . _replayStreamReader ) {
91
+ this . _replayStreamReader . close ( ) ;
92
+ this . _replayStreamReader = null ;
93
+ }
62
94
this . _simTickFreqency = this . DEFAULT_SPEED ;
63
95
this . _simTickPeriod = this . _calcSimPeriod ( this . _simTickFreqency ) ;
64
96
this . _simTickCurrent = 0 ;
65
97
this . _simTimeoutId = null ;
98
+ this . _base64Gzipped = false ;
66
99
this . _pendingStats = [ ] ;
67
- this . _firePauseChanged ( ) ;
68
100
}
69
101
70
102
public override pause ( ) {
@@ -127,6 +159,7 @@ export class ReplayStatsProvider extends StatsProvider {
127
159
} else if ( nextStatsMessage . tick === this . _simTickCurrent ) {
128
160
// process and remove the message, then increment sim tick
129
161
this . setStats ( nextStatsMessage ) ;
162
+ this . _replayResults . statEventsSent ++ ;
130
163
this . _pendingStats . shift ( ) ;
131
164
this . _simTickCurrent ++ ;
132
165
}
@@ -138,37 +171,95 @@ export class ReplayStatsProvider extends StatsProvider {
138
171
// schedule next update as long as we have pending data to process or there's still a stream to read
139
172
if ( this . _replayStreamReader || this . _pendingStats . length > 0 ) {
140
173
this . _simTimeoutId = setTimeout ( ( ) => this . _updateSim ( ) , this . _simTickPeriod ) ;
174
+ } else {
175
+ // no more data to process
176
+ this . stop ( ) ;
141
177
}
142
178
}
143
179
144
- private _onReadNextStatMessage ( line : string ) {
145
- const statsMessageJson = JSON . parse ( line ) ;
146
- // seed sim tick with first message
147
- if ( this . _simTickCurrent === 0 ) {
148
- this . _simTickCurrent = statsMessageJson . tick ;
180
+ private _onReadNextLineFromReplayStream ( rawLine : string ) {
181
+ if ( this . _replayHeader === undefined ) {
182
+ try {
183
+ const headerJson = JSON . parse ( rawLine ) ;
184
+ if ( headerJson . tick ) {
185
+ this . _replayHeader = { } ; // no header, fall through to process this line as stat data
186
+ } else {
187
+ // first line was header, set encoding and return
188
+ this . _replayHeader = headerJson as ReplayStatMessageHeader ;
189
+ const encoding = this . _replayHeader . encoding ?? this . ENCODING_UTF8 ;
190
+ this . _base64Gzipped = encoding === this . ENCODING_BASE64_GZIP ;
191
+ return ;
192
+ }
193
+ } catch ( error ) {
194
+ this . _errorCloseReplayStream ( 'Failed to parse replay header.' ) ;
195
+ return ;
196
+ }
197
+ }
198
+
199
+ let decodedLine = rawLine ;
200
+ if ( this . _base64Gzipped ) {
201
+ try {
202
+ const buffer = Buffer . from ( rawLine , 'base64' ) ;
203
+ decodedLine = zlib . gunzipSync ( buffer ) . toString ( 'utf-8' ) ;
204
+ } catch ( error ) {
205
+ this . _errorCloseReplayStream ( 'Failed to decode replay data.' ) ;
206
+ return ;
207
+ }
208
+ }
209
+
210
+ try {
211
+ const jsonLine = JSON . parse ( decodedLine ) ;
212
+ const statMessage = jsonLine as StatMessageModel ;
213
+ // seed sim tick with first message
214
+ if ( this . _simTickCurrent === 0 ) {
215
+ this . _simTickCurrent = statMessage . tick ;
216
+ }
217
+ this . _replayResults . statLinesRead ++ ;
218
+ // add stats messages to queue
219
+ this . _pendingStats . push ( statMessage ) ;
220
+ // pause stream reader if we've got enough data for now
221
+ if ( this . _pendingStats . length > ReplayStatsProvider . PENDING_STATS_BUFFER_MAX ) {
222
+ this . _replayStreamReader ?. pause ( ) ;
223
+ }
224
+ } catch ( error ) {
225
+ this . _errorCloseReplayStream ( 'Failed to process replay data.' ) ;
149
226
}
150
- // add stats messages to queue
151
- this . _pendingStats . push ( statsMessageJson as StatMessageModel ) ;
152
- // pause stream reader if we've got enough data for now
153
- if ( this . _pendingStats . length > ReplayStatsProvider . PENDING_STATS_BUFFER_MAX ) {
154
- this . _replayStreamReader ?. pause ( ) ;
227
+ }
228
+
229
+ private _errorCloseReplayStream ( message : string ) {
230
+ if ( this . _replayStreamReader ) {
231
+ this . _replayStreamReader . close ( ) ;
232
+ this . _replayStreamReader = null ;
155
233
}
234
+ this . _fireNotification ( message ) ;
156
235
}
157
236
158
- private _onCloseStream ( ) {
237
+ private _onCloseReplayStream ( ) {
159
238
this . _replayStreamReader = null ;
160
239
}
161
240
162
241
private _fireSpeedChanged ( ) {
163
242
this . _statListeners . forEach ( ( listener : StatsListener ) => {
164
- listener . onSpeedUpdated ( this . _simTickFreqency ) ;
243
+ listener . onSpeedUpdated ?. ( this . _simTickFreqency ) ;
165
244
} ) ;
166
245
}
167
246
168
247
private _firePauseChanged ( ) {
169
248
this . _statListeners . forEach ( ( listener : StatsListener ) => {
170
249
// paused if no timeout id
171
- listener . onPauseUpdated ( this . _simTimeoutId == null ) ;
250
+ listener . onPauseUpdated ?.( this . _simTimeoutId == null ) ;
251
+ } ) ;
252
+ }
253
+
254
+ private _fireStopped ( ) {
255
+ this . _statListeners . forEach ( ( listener : StatsListener ) => {
256
+ listener . onStopped ?.( ) ;
257
+ } ) ;
258
+ }
259
+
260
+ private _fireNotification ( message : string ) {
261
+ this . _statListeners . forEach ( ( listener : StatsListener ) => {
262
+ listener . onNotification ?.( message ) ;
172
263
} ) ;
173
264
}
174
265
0 commit comments