Skip to content

Commit 5fc48d0

Browse files
authored
Fix some cache issue (#444)
* use a global fetch because it seems next patched fetch even ignore internals sometimes * fix SocketError: other side closed * Always await cache set and queue send before returning * remove unnecessary debug * never reject detached promises * fix lint * Create small-rivers-taste.md * fix writeTags using the not patched fetch
1 parent f469fcc commit 5fc48d0

File tree

9 files changed

+208
-122
lines changed

9 files changed

+208
-122
lines changed

.changeset/small-rivers-taste.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"open-next": patch
3+
---
4+
5+
Fix some cache issue

packages/open-next/src/adapters/cache.ts

+98-78
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { DetachedPromise } from "utils/promise.js";
2+
13
import { IncrementalCache } from "../cache/incremental/types.js";
24
import { TagCache } from "../cache/tag/types.js";
35
import { isBinaryContentType } from "./binary.js";
@@ -144,7 +146,8 @@ export default class S3Cache {
144146
value: value,
145147
} as CacheHandlerValue;
146148
} catch (e) {
147-
error("Failed to get fetch cache", e);
149+
// We can usually ignore errors here as they are usually due to cache not being found
150+
debug("Failed to get fetch cache", e);
148151
return null;
149152
}
150153
}
@@ -166,7 +169,7 @@ export default class S3Cache {
166169
// If some tags are stale we need to force revalidation
167170
return null;
168171
}
169-
const requestId = globalThis.__als.getStore() ?? "";
172+
const requestId = globalThis.__als.getStore()?.requestId ?? "";
170173
globalThis.lastModified[requestId] = _lastModified;
171174
if (cacheData?.type === "route") {
172175
return {
@@ -208,7 +211,8 @@ export default class S3Cache {
208211
return null;
209212
}
210213
} catch (e) {
211-
error("Failed to get body cache", e);
214+
// We can usually ignore errors here as they are usually due to cache not being found
215+
debug("Failed to get body cache", e);
212216
return null;
213217
}
214218
}
@@ -221,99 +225,115 @@ export default class S3Cache {
221225
if (globalThis.disableIncrementalCache) {
222226
return;
223227
}
224-
if (data?.kind === "ROUTE") {
225-
const { body, status, headers } = data;
226-
await globalThis.incrementalCache.set(
227-
key,
228-
{
229-
type: "route",
230-
body: body.toString(
231-
isBinaryContentType(String(headers["content-type"]))
232-
? "base64"
233-
: "utf8",
234-
),
235-
meta: {
236-
status,
237-
headers,
238-
},
239-
},
240-
false,
241-
);
242-
} else if (data?.kind === "PAGE") {
243-
const { html, pageData } = data;
244-
const isAppPath = typeof pageData === "string";
245-
if (isAppPath) {
246-
globalThis.incrementalCache.set(
228+
const detachedPromise = new DetachedPromise<void>();
229+
globalThis.__als.getStore()?.pendingPromises.push(detachedPromise);
230+
try {
231+
if (data?.kind === "ROUTE") {
232+
const { body, status, headers } = data;
233+
await globalThis.incrementalCache.set(
247234
key,
248235
{
249-
type: "app",
250-
html,
251-
rsc: pageData,
236+
type: "route",
237+
body: body.toString(
238+
isBinaryContentType(String(headers["content-type"]))
239+
? "base64"
240+
: "utf8",
241+
),
242+
meta: {
243+
status,
244+
headers,
245+
},
252246
},
253247
false,
254248
);
255-
} else {
256-
globalThis.incrementalCache.set(
249+
} else if (data?.kind === "PAGE") {
250+
const { html, pageData } = data;
251+
const isAppPath = typeof pageData === "string";
252+
if (isAppPath) {
253+
globalThis.incrementalCache.set(
254+
key,
255+
{
256+
type: "app",
257+
html,
258+
rsc: pageData,
259+
},
260+
false,
261+
);
262+
} else {
263+
globalThis.incrementalCache.set(
264+
key,
265+
{
266+
type: "page",
267+
html,
268+
json: pageData,
269+
},
270+
false,
271+
);
272+
}
273+
} else if (data?.kind === "FETCH") {
274+
await globalThis.incrementalCache.set<true>(key, data, true);
275+
} else if (data?.kind === "REDIRECT") {
276+
await globalThis.incrementalCache.set(
257277
key,
258278
{
259-
type: "page",
260-
html,
261-
json: pageData,
279+
type: "redirect",
280+
props: data.props,
262281
},
263282
false,
264283
);
284+
} else if (data === null || data === undefined) {
285+
await globalThis.incrementalCache.delete(key);
265286
}
266-
} else if (data?.kind === "FETCH") {
267-
await globalThis.incrementalCache.set<true>(key, data, true);
268-
} else if (data?.kind === "REDIRECT") {
269-
await globalThis.incrementalCache.set(
270-
key,
271-
{
272-
type: "redirect",
273-
props: data.props,
274-
},
275-
false,
276-
);
277-
} else if (data === null || data === undefined) {
278-
await globalThis.incrementalCache.delete(key);
279-
}
280-
// Write derivedTags to dynamodb
281-
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
282-
const derivedTags: string[] =
283-
data?.kind === "FETCH"
284-
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
285-
: data?.kind === "PAGE"
286-
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
287-
: [];
288-
debug("derivedTags", derivedTags);
289-
// Get all tags stored in dynamodb for the given key
290-
// If any of the derived tags are not stored in dynamodb for the given key, write them
291-
const storedTags = await globalThis.tagCache.getByPath(key);
292-
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
293-
if (tagsToWrite.length > 0) {
294-
await globalThis.tagCache.writeTags(
295-
tagsToWrite.map((tag) => ({
296-
path: key,
297-
tag: tag,
298-
})),
287+
// Write derivedTags to dynamodb
288+
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
289+
const derivedTags: string[] =
290+
data?.kind === "FETCH"
291+
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
292+
: data?.kind === "PAGE"
293+
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
294+
: [];
295+
debug("derivedTags", derivedTags);
296+
// Get all tags stored in dynamodb for the given key
297+
// If any of the derived tags are not stored in dynamodb for the given key, write them
298+
const storedTags = await globalThis.tagCache.getByPath(key);
299+
const tagsToWrite = derivedTags.filter(
300+
(tag) => !storedTags.includes(tag),
299301
);
302+
if (tagsToWrite.length > 0) {
303+
await globalThis.tagCache.writeTags(
304+
tagsToWrite.map((tag) => ({
305+
path: key,
306+
tag: tag,
307+
})),
308+
);
309+
}
310+
debug("Finished setting cache");
311+
} catch (e) {
312+
error("Failed to set cache", e);
313+
} finally {
314+
// We need to resolve the promise even if there was an error
315+
detachedPromise.resolve();
300316
}
301317
}
302318

303319
public async revalidateTag(tag: string) {
304320
if (globalThis.disableDynamoDBCache || globalThis.disableIncrementalCache) {
305321
return;
306322
}
307-
debug("revalidateTag", tag);
308-
// Find all keys with the given tag
309-
const paths = await globalThis.tagCache.getByTag(tag);
310-
debug("Items", paths);
311-
// Update all keys with the given tag with revalidatedAt set to now
312-
await globalThis.tagCache.writeTags(
313-
paths?.map((path) => ({
314-
path: path,
315-
tag: tag,
316-
})) ?? [],
317-
);
323+
try {
324+
debug("revalidateTag", tag);
325+
// Find all keys with the given tag
326+
const paths = await globalThis.tagCache.getByTag(tag);
327+
debug("Items", paths);
328+
// Update all keys with the given tag with revalidatedAt set to now
329+
await globalThis.tagCache.writeTags(
330+
paths?.map((path) => ({
331+
path: path,
332+
tag: tag,
333+
})) ?? [],
334+
);
335+
} catch (e) {
336+
error("Failed to revalidate tag", e);
337+
}
318338
}
319339
}

packages/open-next/src/adapters/server-adapter.ts

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ setNodeEnv();
1111
setBuildIdEnv();
1212
setNextjsServerWorkingDirectory();
1313

14+
// Because next is messing with fetch, we have to make sure that we use an untouched version of fetch
15+
declare global {
16+
var internalFetch: typeof fetch;
17+
}
18+
globalThis.internalFetch = fetch;
19+
1420
/////////////
1521
// Handler //
1622
/////////////

packages/open-next/src/cache/tag/dynamodb-lite.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ const tagCache: TagCache = {
179179
for (const paramsChunk of toInsert) {
180180
await Promise.all(
181181
paramsChunk.map(async (params) => {
182-
const response = await awsClient.fetch(
182+
const response = await awsFetch(
183183
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
184184
{
185185
method: "POST",

packages/open-next/src/core/createMainHandler.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AsyncLocalStorage } from "node:async_hooks";
22

33
import type { OpenNextConfig } from "types/open-next";
4+
import { DetachedPromise } from "utils/promise";
45

56
import { debug } from "../adapters/logger";
67
import { generateUniqueId } from "../adapters/util";
@@ -20,7 +21,10 @@ declare global {
2021
var incrementalCache: IncrementalCache;
2122
var fnName: string | undefined;
2223
var serverId: string;
23-
var __als: AsyncLocalStorage<string>;
24+
var __als: AsyncLocalStorage<{
25+
requestId: string;
26+
pendingPromises: DetachedPromise<void>[];
27+
}>;
2428
}
2529

2630
export async function createMainHandler() {

packages/open-next/src/core/requestHandler.ts

+45-32
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@ import {
66
StreamCreator,
77
} from "http/index.js";
88
import { InternalEvent, InternalResult } from "types/open-next";
9+
import { DetachedPromise } from "utils/promise";
910

1011
import { debug, error, warn } from "../adapters/logger";
1112
import { convertRes, createServerResponse, proxyRequest } from "./routing/util";
1213
import routingHandler, { MiddlewareOutputEvent } from "./routingHandler";
1314
import { requestHandler, setNextjsPrebundledReact } from "./util";
1415

1516
// This is used to identify requests in the cache
16-
globalThis.__als = new AsyncLocalStorage<string>();
17+
globalThis.__als = new AsyncLocalStorage<{
18+
requestId: string;
19+
pendingPromises: DetachedPromise<any>[];
20+
}>();
1721

1822
export async function openNextHandler(
1923
internalEvent: InternalEvent,
@@ -81,37 +85,46 @@ export async function openNextHandler(
8185
remoteAddress: preprocessedEvent.remoteAddress,
8286
};
8387
const requestId = Math.random().toString(36);
84-
const internalResult = await globalThis.__als.run(requestId, async () => {
85-
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
86-
const req = new IncomingMessage(reqProps);
87-
const res = createServerResponse(
88-
preprocessedEvent,
89-
overwrittenResponseHeaders,
90-
responseStreaming,
91-
);
92-
93-
await processRequest(
94-
req,
95-
res,
96-
preprocessedEvent,
97-
preprocessedResult.isExternalRewrite,
98-
);
99-
100-
const { statusCode, headers, isBase64Encoded, body } = convertRes(res);
101-
102-
const internalResult = {
103-
type: internalEvent.type,
104-
statusCode,
105-
headers,
106-
body,
107-
isBase64Encoded,
108-
};
109-
110-
// reset lastModified. We need to do this to avoid memory leaks
111-
delete globalThis.lastModified[requestId];
112-
113-
return internalResult;
114-
});
88+
const pendingPromises: DetachedPromise<void>[] = [];
89+
const internalResult = await globalThis.__als.run(
90+
{ requestId, pendingPromises },
91+
async () => {
92+
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
93+
const req = new IncomingMessage(reqProps);
94+
const res = createServerResponse(
95+
preprocessedEvent,
96+
overwrittenResponseHeaders,
97+
responseStreaming,
98+
);
99+
100+
await processRequest(
101+
req,
102+
res,
103+
preprocessedEvent,
104+
preprocessedResult.isExternalRewrite,
105+
);
106+
107+
const { statusCode, headers, isBase64Encoded, body } = convertRes(res);
108+
109+
const internalResult = {
110+
type: internalEvent.type,
111+
statusCode,
112+
headers,
113+
body,
114+
isBase64Encoded,
115+
};
116+
117+
// reset lastModified. We need to do this to avoid memory leaks
118+
delete globalThis.lastModified[requestId];
119+
120+
// Wait for all promises to resolve
121+
// We are not catching errors here, because they are catched before
122+
// This may need to change in the future
123+
await Promise.all(pendingPromises.map((p) => p.promise));
124+
125+
return internalResult;
126+
},
127+
);
115128
return internalResult;
116129
}
117130
}

0 commit comments

Comments
 (0)