-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathreplay-stats-provider.ts
178 lines (156 loc) · 6.33 KB
/
replay-stats-provider.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright (C) Microsoft Corporation. All rights reserved.
import * as fs from 'fs';
import * as readline from 'readline';
import * as path from 'path';
import { StatMessageModel, StatsProvider, StatsListener } from './stats-provider';
export class ReplayStatsProvider extends StatsProvider {
private _replayFilePath: string;
private _replayStreamReader: readline.Interface | null;
private _simTickFreqency: number;
private _simTickPeriod: number;
private _simTickCurrent: number;
private _simTimeoutId: NodeJS.Timeout | null;
private _pendingStats: StatMessageModel[];
// resume stream when lines drop below this threshold
private static readonly PENDING_STATS_BUFFER_MIN = 256;
// pause stream when lines exceed this threshold
private static readonly PENDING_STATS_BUFFER_MAX = ReplayStatsProvider.PENDING_STATS_BUFFER_MIN * 2;
// ticks per second (frequency)
private readonly MILLIS_PER_SECOND = 1000;
private readonly DEFAULT_SPEED = 20; // ticks per second
private readonly MIN_SPEED = 5;
private readonly MAX_SPEED = 160;
constructor(replayFilePath: string) {
super(path.basename(replayFilePath), replayFilePath);
this._replayFilePath = replayFilePath;
this._replayStreamReader = null;
this._simTickFreqency = this.DEFAULT_SPEED;
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._pendingStats = [];
}
public override start() {
this.stop();
const fileStream = fs.createReadStream(this._replayFilePath);
this._replayStreamReader = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
this._replayStreamReader.on('line', line => this._onReadNextStatMessage(line));
this._replayStreamReader.on('close', () => this._onCloseStream());
// begin simulation
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
this._fireSpeedChanged();
this._firePauseChanged();
}
public override stop() {
if (this._simTimeoutId) {
clearTimeout(this._simTimeoutId);
}
this._replayStreamReader?.close();
this._simTickFreqency = this.DEFAULT_SPEED;
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._pendingStats = [];
this._firePauseChanged();
}
public override pause() {
if (this._simTimeoutId) {
clearTimeout(this._simTimeoutId);
this._simTimeoutId = null;
}
this._firePauseChanged();
}
public override resume() {
if (this._simTickCurrent === 0) {
this.start();
} else {
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
}
this._firePauseChanged();
}
public override faster() {
this._simTickFreqency *= 2;
if (this._simTickFreqency > this.MAX_SPEED) {
this._simTickFreqency = this.MAX_SPEED;
}
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._fireSpeedChanged();
}
public override slower() {
this._simTickFreqency /= 2;
if (this._simTickFreqency < this.MIN_SPEED) {
this._simTickFreqency = this.MIN_SPEED;
}
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._fireSpeedChanged();
}
public override setSpeed(speed: string) {
this._simTickFreqency = parseInt(speed);
if (this._simTickFreqency < this.MIN_SPEED) {
this._simTickFreqency = this.MIN_SPEED;
} else if (this._simTickFreqency > this.MAX_SPEED) {
this._simTickFreqency = this.MAX_SPEED;
}
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._fireSpeedChanged();
}
public override manualControl(): boolean {
return true;
}
private _updateSim() {
const nextStatsMessage = this._pendingStats[0];
if (nextStatsMessage) {
if (nextStatsMessage.tick > this._simTickCurrent) {
// not ready to process this message, wait for the next tick
} else if (nextStatsMessage.tick < this._simTickCurrent) {
// reset sim? close?
} else if (nextStatsMessage.tick === this._simTickCurrent) {
// process and remove the message, then increment sim tick
this.setStats(nextStatsMessage);
this._pendingStats.shift();
this._simTickCurrent++;
}
}
// resume stream if we're running low on data
if (this._pendingStats.length < ReplayStatsProvider.PENDING_STATS_BUFFER_MIN) {
this._replayStreamReader?.resume();
}
// schedule next update as long as we have pending data to process or there's still a stream to read
if (this._replayStreamReader || this._pendingStats.length > 0) {
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
}
}
private _onReadNextStatMessage(line: string) {
const statsMessageJson = JSON.parse(line);
// seed sim tick with first message
if (this._simTickCurrent === 0) {
this._simTickCurrent = statsMessageJson.tick;
}
// add stats messages to queue
this._pendingStats.push(statsMessageJson as StatMessageModel);
// pause stream reader if we've got enough data for now
if (this._pendingStats.length > ReplayStatsProvider.PENDING_STATS_BUFFER_MAX) {
this._replayStreamReader?.pause();
}
}
private _onCloseStream() {
this._replayStreamReader = null;
}
private _fireSpeedChanged() {
this._statListeners.forEach((listener: StatsListener) => {
listener.onSpeedUpdated(this._simTickFreqency);
});
}
private _firePauseChanged() {
this._statListeners.forEach((listener: StatsListener) => {
// paused if no timeout id
listener.onPauseUpdated(this._simTimeoutId == null);
});
}
private _calcSimPeriod(simFrequency: number): number {
return this.MILLIS_PER_SECOND / simFrequency;
}
}