Skip to content

Commit a507021

Browse files
authored
fix: correct target stats when sockets are reused (#576)
This PR attempts to fix incorrect stats due to the reuse of target sockets for HTTP(S) protocols. Based on #572 Note: I was forced to upgrade `actions/cache` as `v2` was deprecated and it wouldn't run with it. I also had to edit eslint config to run with `.` instead of `src` and excluded `tests`, because with `src` the CI was failing (no idea why).
1 parent 6c6bb09 commit a507021

9 files changed

+55
-28
lines changed
+9-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
const path = require('path');
2-
const fs = require('fs');
31
const { execSync } = require('child_process');
2+
const fs = require('fs');
3+
const path = require('path');
44

55
const PKG_JSON_PATH = path.join(__dirname, '..', '..', 'package.json');
66

7+
// eslint-disable-next-line import/no-dynamic-require
78
const pkgJson = require(PKG_JSON_PATH);
89

910
const PACKAGE_NAME = pkgJson.name;
@@ -13,20 +14,20 @@ const nextVersion = getNextVersion(VERSION);
1314
console.log(`before-deploy: Setting version to ${nextVersion}`);
1415
pkgJson.version = nextVersion;
1516

16-
fs.writeFileSync(PKG_JSON_PATH, JSON.stringify(pkgJson, null, 2) + '\n');
17+
fs.writeFileSync(PKG_JSON_PATH, `${JSON.stringify(pkgJson, null, 2)}\n`);
1718

1819
function getNextVersion(version) {
19-
const versionString = execSync(`npm show ${PACKAGE_NAME} versions --json`, { encoding: 'utf8'});
20+
const versionString = execSync(`npm show ${PACKAGE_NAME} versions --json`, { encoding: 'utf8' });
2021
const versions = JSON.parse(versionString);
2122

22-
if (versions.some(v => v === VERSION)) {
23+
if (versions.some((v) => v === VERSION)) {
2324
console.error(`before-deploy: A release with version ${VERSION} already exists. Please increment version accordingly.`);
2425
process.exit(1);
2526
}
2627

2728
const prereleaseNumbers = versions
28-
.filter(v => (v.startsWith(VERSION) && v.includes('-')))
29-
.map(v => Number(v.match(/\.(\d+)$/)[1]));
29+
.filter((v) => (v.startsWith(VERSION) && v.includes('-')))
30+
.map((v) => Number(v.match(/\.(\d+)$/)[1]));
3031
const lastPrereleaseNumber = Math.max(-1, ...prereleaseNumbers);
31-
return `${version}-beta.${lastPrereleaseNumber + 1}`
32+
return `${version}-beta.${lastPrereleaseNumber + 1}`;
3233
}

.github/workflows/check.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
-
2929
name: Cache Node Modules
3030
if: ${{ matrix.node-version == 18 }}
31-
uses: actions/cache@v2
31+
uses: actions/cache@v4
3232
with:
3333
path: |
3434
node_modules
@@ -64,7 +64,7 @@ jobs:
6464
node-version: 18
6565
-
6666
name: Load Cache
67-
uses: actions/cache@v2
67+
uses: actions/cache@v4
6868
with:
6969
path: |
7070
node_modules

.github/workflows/release.yaml

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ jobs:
3232
-
3333
name: Cache Node Modules
3434
if: ${{ matrix.node-version == 18 }}
35-
uses: actions/cache@v2
35+
uses: actions/cache@v4
3636
with:
3737
path: |
3838
node_modules
@@ -67,7 +67,7 @@ jobs:
6767
node-version: 18
6868
-
6969
name: Load Cache
70-
uses: actions/cache@v2
70+
uses: actions/cache@v4
7171
with:
7272
path: |
7373
node_modules
@@ -93,7 +93,7 @@ jobs:
9393
registry-url: https://registry.npmjs.org/
9494
-
9595
name: Load Cache
96-
uses: actions/cache@v2
96+
uses: actions/cache@v4
9797
with:
9898
path: |
9999
node_modules

eslint.config.mjs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import apify from '@apify/eslint-config';
22

33
// eslint-disable-next-line import/no-default-export
44
export default [
5-
{ ignores: ['**/dist'] }, // Ignores need to happen first
5+
{ ignores: ['**/dist', 'test'] }, // Ignores need to happen first
66
...apify,
77
{
88
languageOptions: {

jest.config.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Config } from '@jest/types';
22

3+
// eslint-disable-next-line import/no-default-export
34
export default (): Config.InitialOptions => ({
45
verbose: true,
56
preset: 'ts-jest',

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
"prepublishOnly": "npm run build",
3838
"local-proxy": "node ./dist/run_locally.js",
3939
"test": "nyc cross-env NODE_OPTIONS=--insecure-http-parser mocha --bail",
40-
"lint": "eslint src",
41-
"lint:fix": "eslint src --fix"
40+
"lint": "eslint .",
41+
"lint:fix": "eslint . --fix"
4242
},
4343
"engines": {
4444
"node": ">=14"

src/chain.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { URL } from 'url';
77

88
import type { Socket } from './socket';
99
import { badGatewayStatusCodes, createCustomStatusHttpResponse, errorCodeToStatusCode } from './statuses';
10+
import type { SocketWithPreviousStats } from './utils/count_target_bytes';
1011
import { countTargetBytes } from './utils/count_target_bytes';
1112
import { getBasicAuthorizationHeader } from './utils/get_basic';
1213

@@ -85,9 +86,15 @@ export const chain = (
8586
const fn = proxy.protocol === 'https:' ? https.request : http.request;
8687
const client = fn(proxy.origin, options as unknown as http.ClientRequestArgs);
8788

88-
client.on('connect', (response, targetSocket, clientHead) => {
89+
client.once('socket', (targetSocket: SocketWithPreviousStats) => {
90+
// Socket can be re-used by multiple requests.
91+
// That's why we need to track the previous stats.
92+
targetSocket.previousBytesRead = targetSocket.bytesRead;
93+
targetSocket.previousBytesWritten = targetSocket.bytesWritten;
8994
countTargetBytes(sourceSocket, targetSocket);
95+
});
9096

97+
client.on('connect', (response, targetSocket, clientHead) => {
9198
if (sourceSocket.readyState !== 'open') {
9299
// Sanity check, should never reach.
93100
targetSocket.destroy();

src/forward.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { URL } from 'url';
66
import util from 'util';
77

88
import { badGatewayStatusCodes, errorCodeToStatusCode } from './statuses';
9+
import type { SocketWithPreviousStats } from './utils/count_target_bytes';
910
import { countTargetBytes } from './utils/count_target_bytes';
1011
import { getBasicAuthorizationHeader } from './utils/get_basic';
1112
import { validHeadersOnly } from './utils/valid_headers_only';
@@ -114,8 +115,12 @@ export const forward = async (
114115
}
115116
});
116117

117-
client.once('socket', (socket) => {
118-
countTargetBytes(request.socket, socket);
118+
client.once('socket', (socket: SocketWithPreviousStats) => {
119+
// Socket can be re-used by multiple requests.
120+
// That's why we need to track the previous stats.
121+
socket.previousBytesRead = socket.bytesRead;
122+
socket.previousBytesWritten = socket.bytesWritten;
123+
countTargetBytes(request.socket, socket, (handler) => response.once('close', handler));
119124
});
120125

121126
// Can't use pipeline here as it automatically destroys the streams

src/utils/count_target_bytes.ts

+22-9
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,27 @@ const calculateTargetStats = Symbol('calculateTargetStats');
77

88
type Stats = { bytesWritten: number | null, bytesRead: number | null };
99

10+
/**
11+
* Socket object extended with previous read and written bytes.
12+
* Necessary due to target socket re-use.
13+
*/
14+
export type SocketWithPreviousStats = net.Socket & { previousBytesWritten?: number, previousBytesRead?: number };
15+
1016
interface Extras {
1117
[targetBytesWritten]: number;
1218
[targetBytesRead]: number;
13-
[targets]: Set<net.Socket>;
19+
[targets]: Set<SocketWithPreviousStats>;
1420
[calculateTargetStats]: () => Stats;
1521
}
1622

1723
// @ts-expect-error TS is not aware that `source` is used in the assertion.
1824
function typeSocket(source: unknown): asserts source is net.Socket & Extras {}
1925

20-
export const countTargetBytes = (source: net.Socket, target: net.Socket): void => {
26+
export const countTargetBytes = (
27+
source: net.Socket,
28+
target: SocketWithPreviousStats,
29+
registerCloseHandler?: (handler: () => void) => void,
30+
): void => {
2131
typeSocket(source);
2232

2333
source[targetBytesWritten] = source[targetBytesWritten] || 0;
@@ -26,21 +36,24 @@ export const countTargetBytes = (source: net.Socket, target: net.Socket): void =
2636

2737
source[targets].add(target);
2838

29-
target.once('close', () => {
30-
source[targetBytesWritten] += target.bytesWritten;
31-
source[targetBytesRead] += target.bytesRead;
32-
39+
const closeHandler = () => {
40+
source[targetBytesWritten] += (target.bytesWritten - (target.previousBytesWritten || 0));
41+
source[targetBytesRead] += (target.bytesRead - (target.previousBytesRead || 0));
3342
source[targets].delete(target);
34-
});
43+
};
44+
if (!registerCloseHandler) {
45+
registerCloseHandler = (handler: () => void) => target.once('close', handler);
46+
}
47+
registerCloseHandler(closeHandler);
3548

3649
if (!source[calculateTargetStats]) {
3750
source[calculateTargetStats] = () => {
3851
let bytesWritten = source[targetBytesWritten];
3952
let bytesRead = source[targetBytesRead];
4053

4154
for (const socket of source[targets]) {
42-
bytesWritten += socket.bytesWritten;
43-
bytesRead += socket.bytesRead;
55+
bytesWritten += (socket.bytesWritten - (socket.previousBytesWritten || 0));
56+
bytesRead += (socket.bytesRead - (socket.previousBytesRead || 0));
4457
}
4558

4659
return {

0 commit comments

Comments
 (0)