Skip to content

Commit

Permalink
fix(diag): stats replay would stop when reaching eof, should go until…
Browse files Browse the repository at this point in the history
… buffer has been processed. (#271)
  • Loading branch information
chmeyer-ms committed Dec 12, 2024
1 parent 648c3d6 commit 4794b17
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 24 deletions.
6 changes: 3 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ export function activate(context: vscode.ExtensionContext) {
async () => {
const fileUri = await vscode.window.showOpenDialog({
canSelectMany: false,
openLabel: 'Select diagnostics capture to replay',
openLabel: 'Open',
filters: {
'MC Stats files': ['mcstats'],
'All files': ['*'],
'MC Stats Files': ['mcstats'],
'All Files': ['*'],
},
});
if (!fileUri || fileUri.length === 0) {
Expand Down
1 change: 0 additions & 1 deletion src/panels/home-view-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ export class HomeViewProvider implements vscode.WebviewViewProvider {

private _refreshProfilerCaptures(capturesBasePath: string, newCaptureFileName?: string) {
if (!capturesBasePath) {
console.error('Captures path is invalid.');
return;
}
fs.readdir(capturesBasePath, (err, files) => {
Expand Down
3 changes: 3 additions & 0 deletions src/panels/minecraft-diagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export class MinecraftDiagnosticsPanel {
};
this._panel.webview.postMessage(message);
},
onNotification: (message: string) => {
window.showInformationMessage(message);
},
};

this._statsTracker.addStatListener(this._statsCallback);
Expand Down
50 changes: 50 additions & 0 deletions src/stats/replay-stats-provider.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { describe, it, expect } from 'vitest';
import { ReplayStatsProvider } from './replay-stats-provider';
import { StatData, StatsListener } from './stats-provider';
import path from 'path';

describe('ReplayStatsProvider', () => {
it('should load base64-gzip encoded replay data and trigger events', async () => {
const replayFilePath = path.resolve('./test/diagnostics-replay-compressed.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let statCount = 0;
let statsCallback: StatsListener = {
onStatUpdated: (stat: StatData) => {
statCount++;
expect(stat).toBeDefined();
},
};
replay.addStatListener(statsCallback);
await replay.start();
expect(statCount).toBeGreaterThan(0); // no idea how many are in there
});

it('shoud load uncompressed replay data and trigger events', async () => {
const replayFilePath = path.resolve('./test/diagnostics-replay-uncompressed.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let statCount = 0;
let statsCallback: StatsListener = {
onStatUpdated: (stat: StatData) => {
statCount++;
expect(stat).toBeDefined();
},
};
replay.addStatListener(statsCallback);
await replay.start();
expect(statCount).toBeGreaterThan(0);
});

it('should fire notification on invalid file read', async () => {
const replayFilePath = path.resolve('./test/not-a-valid-file.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let notification = '';
let statsCallback: StatsListener = {
onNotification: (message: string) => {
notification = message;
},
};
replay.addStatListener(statsCallback);
await replay.start();
expect(notification).toBe('Failed to read replay file.');
});
});
107 changes: 91 additions & 16 deletions src/stats/replay-stats-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,33 @@
import * as fs from 'fs';
import * as readline from 'readline';
import * as path from 'path';
import * as zlib from 'zlib';
//import { window } from 'vscode';
import { StatMessageModel, StatsProvider, StatsListener } from './stats-provider';

interface ReplayStatMessageHeader {
encoding: string; // base64 or empty
compression: string; // gzip or empty
}

export class ReplayStatsProvider extends StatsProvider {
private _replayFilePath: string;
private _replayStreamReader: readline.Interface | null;
private _simTickFreqency: number;
private _simTickPeriod: number;
private _simTickCurrent: number;
private _simTimeoutId: NodeJS.Timeout | null;
private _replayHeader: ReplayStatMessageHeader | undefined;
private _base64Gzipped: boolean;
private _pendingStats: StatMessageModel[];
private _onComplete: (() => void) | undefined;

// resume stream when lines drop below this threshold
private static readonly PENDING_STATS_BUFFER_MIN = 256;
// pause stream when lines exceed this threshold
private static readonly PENDING_STATS_BUFFER_MAX = ReplayStatsProvider.PENDING_STATS_BUFFER_MIN * 2;
// supported encodings
private readonly BASE64_GZIP_ENCODING = 'base64-gzip';

// ticks per second (frequency)
private readonly MILLIS_PER_SECOND = 1000;
Expand All @@ -33,7 +45,9 @@ export class ReplayStatsProvider extends StatsProvider {
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._base64Gzipped = false;
this._pendingStats = [];
this._onComplete = undefined;
}

public override start() {
Expand All @@ -44,27 +58,39 @@ export class ReplayStatsProvider extends StatsProvider {
input: fileStream,
crlfDelay: Infinity,
});

this._replayStreamReader.on('line', line => this._onReadNextStatMessage(line));
this._replayStreamReader.on('close', () => this._onCloseStream());
this._replayStreamReader.on('error', () => this._errorCloseStream('Failed to read replay file.'));

// begin simulation
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
this._fireSpeedChanged();
this._firePauseChanged();

return new Promise<void>(resolve => {
this._onComplete = resolve;
});
}

public override stop() {
this._fireStopped();
if (this._simTimeoutId) {
clearTimeout(this._simTimeoutId);
}
this._replayStreamReader?.close();
if (this._onComplete) {
this._onComplete();
this._onComplete = undefined;
}
if (this._replayStreamReader) {
this._replayStreamReader.close();
this._replayStreamReader = null;
}
this._simTickFreqency = this.DEFAULT_SPEED;
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._base64Gzipped = false;
this._pendingStats = [];
this._firePauseChanged();
}

public override pause() {
Expand Down Expand Up @@ -138,37 +164,86 @@ export class ReplayStatsProvider extends StatsProvider {
// schedule next update as long as we have pending data to process or there's still a stream to read
if (this._replayStreamReader || this._pendingStats.length > 0) {
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
} else {
// no more data to process
this.stop();
}
}

private _onReadNextStatMessage(line: string) {
const statsMessageJson = JSON.parse(line);
// seed sim tick with first message
if (this._simTickCurrent === 0) {
this._simTickCurrent = statsMessageJson.tick;
private _onReadNextStatMessage(rawLine: string) {
// first line is the header
if (this._replayHeader === undefined) {
try {
const headerJson = JSON.parse(rawLine);
this._replayHeader = headerJson as ReplayStatMessageHeader;
this._base64Gzipped = this._replayHeader.encoding === this.BASE64_GZIP_ENCODING;
} catch (error) {
this._errorCloseStream('Failed to parse replay header.');
return;
}
return;
}
// add stats messages to queue
this._pendingStats.push(statsMessageJson as StatMessageModel);
// pause stream reader if we've got enough data for now
if (this._pendingStats.length > ReplayStatsProvider.PENDING_STATS_BUFFER_MAX) {
this._replayStreamReader?.pause();
// could be base64 encoded and gzipped
let processedLine = rawLine;
if (this._base64Gzipped) {
try {
const buffer = Buffer.from(rawLine, 'base64');
processedLine = zlib.gunzipSync(buffer).toString('utf-8');
} catch (error) {
this._errorCloseStream('Failed to decode replay.');
return;
}
}
try {
const jsonLine = JSON.parse(processedLine);
const statMessage = jsonLine as StatMessageModel;
// seed sim tick with first message
if (this._simTickCurrent === 0) {
this._simTickCurrent = statMessage.tick;
}
// add stats messages to queue
this._pendingStats.push(statMessage);
// pause stream reader if we've got enough data for now
if (this._pendingStats.length > ReplayStatsProvider.PENDING_STATS_BUFFER_MAX) {
this._replayStreamReader?.pause();
}
} catch (error) {
this._errorCloseStream('Failed to parse replay.');
}
}

private _errorCloseStream(message: string) {
this._replayStreamReader?.close();
this._replayStreamReader = null;
this._fireNotification(message);
}

private _onCloseStream() {
this.stop();
this._replayStreamReader = null;
}

private _fireSpeedChanged() {
this._statListeners.forEach((listener: StatsListener) => {
listener.onSpeedUpdated(this._simTickFreqency);
listener.onSpeedUpdated?.(this._simTickFreqency);
});
}

private _firePauseChanged() {
this._statListeners.forEach((listener: StatsListener) => {
// paused if no timeout id
listener.onPauseUpdated(this._simTimeoutId == null);
listener.onPauseUpdated?.(this._simTimeoutId == null);
});
}

private _fireStopped() {
this._statListeners.forEach((listener: StatsListener) => {
listener.onStopped?.();
});
}

private _fireNotification(message: string) {
this._statListeners.forEach((listener: StatsListener) => {
listener.onNotification?.(message);
});
}

Expand Down
10 changes: 6 additions & 4 deletions src/stats/stats-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ export interface StatMessageModel {
}

export interface StatsListener {
onStatUpdated: (stat: StatData) => void;
onSpeedUpdated: (speed: number) => void;
onPauseUpdated: (paused: boolean) => void;
onStatUpdated?: (stat: StatData) => void;
onSpeedUpdated?: (speed: number) => void;
onPauseUpdated?: (paused: boolean) => void;
onStopped?: () => void;
onNotification?: (message: string) => void;
}

export class StatsProvider {
Expand Down Expand Up @@ -89,7 +91,7 @@ export class StatsProvider {
values: stat.values ?? [],
tick: tick,
};
listener.onStatUpdated(statData);
listener.onStatUpdated?.(statData);

if (stat.children) {
stat.children.forEach((child: StatDataModel) => {
Expand Down
4 changes: 4 additions & 0 deletions test/diagnostics-replay-compressed.mcstats
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"encoding":"base64-gzip"}
H4sIAAAAAAAACq2RMQvCMBCF/0q5ORRaULSbQ3EVlDqIyJEebTBeS3JVRPzvJg4iiIPS8fE+vntwNxCjj1BM5rNsMs0VyLUnKGAtKOWZWHJQ4EPwUOxuwHiKbYtcWzrobuBQKNCtsbUjfmcaYnIonUuEvHyBShYj11Ce0Q4Ub2T5/r6/qxdRIRtrMVmGuAkinyxs3+J4vhGXVdnfsi06j5cRVjny3eA0rVAfU6oHjWI6TpvQxj+kT+w3+bueImPIf1IP7xkEAUoCAAA=
H4sIAAAAAAAACq2SzWrDMBCEX8XoLEzkn6TxLYfQa6ElPRQTtvJiq5ElI61dQsi7V84hGIyhLbkIlhl9O1rthZGSJ1bk2yeRr1PO6NwhK9grAe0HNJQwznwoPCs+LsxAO6oNmErjUdreBIEz2ShdOTRTT40GHZB1EaGnBdPekKJzEAfQPY49RFJeyyu/Ow5glNYQPYfyLYB8tNNdA4/jPTDZQfwb9g7Ow/cDUjn0tncSX0CeYqx6CaSsieugjv8Q32x/g0/xHt2A7jguTThaZeqlqBoH1DfjlJjlK8EzscnL+6q1SjrrUVpTeTZtJZ3qaEbYrNZ8vc1+ff/Lfs4YQohkxZM0Txc500fjOBiFfj6aH/azqd4/AwAA
H4sIAAAAAAAACq2RQQuCQBCF/4rMWQSlorx1kK5BUYeQGNZBl7YxdsdCxP/e2iGE6FB4fLyPbx5MB6LVBdL5ahnPF7MQpL0RpLATlOxOLAmE4HxwkJ46YLwObYVcGDqrumFfhKAqbQpLPGZKYrIotQ2EnHyBMhYtrS/vaBoabsRJ3ud9+CYOyNoYDDY+7r3IBWtzq3A634TLDvHfsiNah48JVllydWMVbVFdIioahaJrjkrfDn+IXthv8rGeBkaT+6SeqVlmQ0oCAAA=
4 changes: 4 additions & 0 deletions test/diagnostics-replay-uncompressed.mcstats

Large diffs are not rendered by default.

0 comments on commit 4794b17

Please sign in to comment.