Skip to content

Commit f3c408c

Browse files
committed
Handle potentially conflicting keys
1 parent 4b9a526 commit f3c408c

File tree

5 files changed

+241
-125
lines changed

5 files changed

+241
-125
lines changed

.github/workflows/ci.yml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
8+
jobs:
9+
build:
10+
runs-on: ubuntu-latest
11+
12+
steps:
13+
- uses: actions/checkout@v4
14+
- name: Install pnpm
15+
uses: pnpm/action-setup@v4
16+
- name: Use Node.js 22
17+
uses: actions/setup-node@v4
18+
with:
19+
node-version: 22
20+
- name: Install, test
21+
run: |
22+
pnpm install
23+
pnpm test

client/src/biDirectionalSync.ts

+84-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ import { statSync } from "node:fs";
22
import { readdir } from "node:fs/promises";
33
import { join } from "node:path";
44
import { logger } from "@s3-smart-sync/shared/logger.js";
5-
import { LOCAL_DIR } from "./consts.js";
5+
import { LOCAL_DIR, S3_BUCKET } from "./consts.js";
66
import {
77
convertAbsolutePathToKey,
8+
deleteObject,
89
download,
910
listS3Files,
11+
s3Client,
1012
upload,
1113
} from "./s3Operations.js";
1214
import { destroyTrayIcon } from "./trayWrapper.js";
15+
import { _Object, ListObjectsV2Command } from "@aws-sdk/client-s3";
1316

1417
async function listLocalFiles(dir: string) {
1518
const files: Array<{
@@ -45,10 +48,7 @@ export async function biDirectionalSync() {
4548
logger.info("Starting full sync...");
4649
isSyncInProgress = true;
4750

48-
const [localFiles, [s3Files, noLastModifiedInfo]] = await Promise.all([
49-
listLocalFiles(LOCAL_DIR),
50-
listS3Files(),
51-
]);
51+
const [preliminaryS3Files, noLastModifiedInfo] = await listS3Files();
5252
if (noLastModifiedInfo.length > 0) {
5353
logger.error(
5454
`No "last modified" date from S3 for file(s): ${noLastModifiedInfo.join(", ")}.\nPlease address this before starting the client again.`,
@@ -57,6 +57,15 @@ export async function biDirectionalSync() {
5757
process.exit(1);
5858
}
5959

60+
const fixConflictsPromises = await fixConflicts(preliminaryS3Files);
61+
// It's possible that some things are attempted to be deleted twice for various reasons. Obviously, we don't care about errors thrown because something that caused the config was already deleted.
62+
await Promise.allSettled(fixConflictsPromises);
63+
64+
const [localFiles, [s3Files]] = await Promise.all([
65+
listLocalFiles(LOCAL_DIR),
66+
listS3Files(),
67+
]);
68+
6069
const localPromises = localFiles
6170
.map(({ key, lastModified }) => {
6271
const s3File = s3Files.find((s3File) => s3File.key === key);
@@ -95,3 +104,73 @@ export async function biDirectionalSync() {
95104
isSyncInProgress = false;
96105
logger.info("Done.\n");
97106
}
107+
108+
async function fixConflicts(
109+
s3Files: Array<{ key: string; lastModified: Date }>,
110+
) {
111+
const deletePromises: Promise<void>[] = [];
112+
113+
const keyMap = new Map<string, Array<{ key: string; lastModified: Date }>>();
114+
for (const file of s3Files) {
115+
const baseKey = file.key.endsWith("/") ? file.key.slice(0, -1) : file.key;
116+
if (!keyMap.has(baseKey)) {
117+
keyMap.set(baseKey, []);
118+
}
119+
keyMap.get(baseKey)!.push(file);
120+
}
121+
122+
// Find conflicts (where both a file and directory with the same baseKey exist)
123+
for (const entries of keyMap.values()) {
124+
if (entries.length === 2) {
125+
const directory = entries[0]!.key.endsWith("/")
126+
? entries[0]!
127+
: entries[1]!;
128+
const file = entries[0]!.key.endsWith("/") ? entries[1]! : entries[0]!;
129+
130+
if (file.lastModified < directory.lastModified) {
131+
logger.info(
132+
`Resolving conflict: deleting file ${file.key} (keeping ${directory.key})`,
133+
);
134+
deletePromises.push(deleteObject(file.key));
135+
} else {
136+
logger.info(
137+
`Resolving conflict: deleting directory ${directory.key} (keeping ${file.key})`,
138+
);
139+
deletePromises.push(deleteObject(directory.key));
140+
141+
// Also delete all objects found under this directory
142+
let continuationToken: string | undefined = undefined;
143+
do {
144+
const { Contents, NextContinuationToken } = (await s3Client.send(
145+
new ListObjectsV2Command({
146+
Bucket: S3_BUCKET,
147+
Prefix: directory.key,
148+
...(continuationToken
149+
? { ContinuationToken: continuationToken }
150+
: {}),
151+
}),
152+
)) as {
153+
Contents?: _Object[];
154+
NextContinuationToken: string | undefined;
155+
};
156+
157+
if (Contents && Contents.length > 0) {
158+
for (const { Key } of Contents) {
159+
if (Key) {
160+
deletePromises.push(deleteObject(Key));
161+
}
162+
}
163+
}
164+
165+
continuationToken = NextContinuationToken;
166+
} while (continuationToken);
167+
}
168+
} else if (entries.length > 2) {
169+
logger.error(
170+
`Found ${entries.length} conflicting entries for "${entries[0]!.key}": ${entries.map((e) => e.key).join(", ")}`,
171+
);
172+
}
173+
}
174+
175+
return deletePromises;
176+
}

client/src/s3Operations.ts

+24-15
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
import { Upload } from "@aws-sdk/lib-storage";
1010
import { logger } from "@s3-smart-sync/shared/logger.js";
1111
import { createReadStream, createWriteStream } from "node:fs";
12-
import { mkdir, stat, utimes } from "node:fs/promises";
12+
import { mkdir, rm, stat, utimes } from "node:fs/promises";
1313
import { dirname, join, relative } from "node:path";
1414
import { pipeline } from "stream/promises";
1515
import {
@@ -78,14 +78,30 @@ export async function deleteObject(key: string) {
7878
export async function download(key: string, localPath: string) {
7979
logger.info(`Downloading: ${key}`);
8080

81-
if (key.endsWith("/")) {
82-
const { LastModified } = await s3Client.send(
83-
new GetObjectCommand({
84-
Bucket: S3_BUCKET,
85-
Key: key,
86-
}),
87-
);
81+
const { Body, LastModified } = await s3Client.send(
82+
new GetObjectCommand({
83+
Bucket: S3_BUCKET,
84+
Key: key,
85+
}),
86+
);
8887

88+
// delete a possibly outdated local file that will be replaced by a directory
89+
try {
90+
const stats = await stat(localPath);
91+
if (
92+
stats.mtime < (LastModified ?? 0) &&
93+
key.endsWith("/") &&
94+
stats.isFile()
95+
) {
96+
ignore(FileOperationType.Remove, localPath);
97+
await rm(localPath);
98+
unignore(FileOperationType.Remove, localPath);
99+
}
100+
} catch (_) {
101+
// local file doesn't exist yet
102+
}
103+
104+
if (key.endsWith("/")) {
89105
ignore(FileOperationType.Sync, localPath);
90106
try {
91107
await mkdir(localPath, { recursive: true });
@@ -98,13 +114,6 @@ export async function download(key: string, localPath: string) {
98114
return;
99115
}
100116

101-
const { Body, LastModified } = await s3Client.send(
102-
new GetObjectCommand({
103-
Bucket: S3_BUCKET,
104-
Key: key,
105-
}),
106-
);
107-
108117
if (Body) {
109118
// We don't manage ignoring potentially new created directories here because that would be a lot of overhead. Instead, if syncing is triggered, we let the upload of the directory handle breaking that chain. (via updating modification time and that timestamp then being the same)
110119
await mkdir(dirname(localPath), { recursive: true });

client/tests/e2e.spec.mts

+31-49
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ import {
1010
cleanupS3,
1111
createClientDirectories,
1212
createFile,
13+
list,
1314
pause,
1415
sendSnsMessage,
1516
startClients,
1617
startServer,
1718
stopClients,
1819
stopServer,
20+
upload,
1921
waitUntil,
2022
} from "./utilities.js";
2123

@@ -25,8 +27,7 @@ const clientDirectories = await createClientDirectories(clientIds);
2527
describe("E2E Tests", () => {
2628
beforeAll(async () => {
2729
await startServer();
28-
// startClients([0, 1]);
29-
startClients([0, 1], [0, 1]);
30+
startClients([0, 1]);
3031
await pause(1000);
3132
});
3233

@@ -121,10 +122,8 @@ describe("E2E Tests", () => {
121122

122123
it("should handle replacing a file with an empty directory", async () => {
123124
await createFile(0, "file-then-directory", "starts as a file");
124-
await waitUntil(async () =>
125-
expect(
126-
await fileExists(join(clientDirectories[1], "file-then-directory")),
127-
).toBe(true),
125+
await waitUntil(() =>
126+
fileExists(join(clientDirectories[1], "file-then-directory")),
128127
);
129128

130129
await rm(join(clientDirectories[0], "file-then-directory"));
@@ -137,25 +136,23 @@ describe("E2E Tests", () => {
137136
await pause(WATCHER_DEBOUNCE_DURATION + 300);
138137
await sendSnsMessage("file-then-directory/", "put");
139138

140-
await waitUntil(async () => {
141-
const stats = await stat(
142-
join(clientDirectories[1], "file-then-directory"),
143-
);
144-
return stats.isDirectory();
145-
});
139+
await waitUntil(async () =>
140+
(
141+
await stat(join(clientDirectories[1], "file-then-directory"))
142+
).isDirectory(),
143+
);
146144
});
147145

148146
it("should handle replacing an empty directory with a file", async () => {
149147
await mkdir(join(clientDirectories[0], "directory-then-file"));
150148
// First, the debounced upload. Then we have to wait for the upload to actually have finished
151149
await pause(WATCHER_DEBOUNCE_DURATION + 300);
152150
await sendSnsMessage("directory-then-file/", "put");
153-
await waitUntil(async () => {
154-
const stats = await stat(
155-
join(clientDirectories[1], "directory-then-file"),
156-
);
157-
return stats.isDirectory();
158-
});
151+
await waitUntil(async () =>
152+
(
153+
await stat(join(clientDirectories[1], "directory-then-file"))
154+
).isDirectory(),
155+
);
159156

160157
await rm(join(clientDirectories[0], "directory-then-file"), {
161158
recursive: true,
@@ -174,35 +171,20 @@ describe("E2E Tests", () => {
174171
});
175172
});
176173

177-
// it("handles duplicate file/directory on S3", async () => {
178-
// await s3Client.send(
179-
// new PutObjectCommand({
180-
// Bucket: S3_BUCKET,
181-
// Key: "duplicate-file",
182-
// Body: Buffer.from(""),
183-
// }),
184-
// );
185-
// await s3Client.send(
186-
// new PutObjectCommand({
187-
// Bucket: S3_BUCKET,
188-
// Key: "duplicate-file/",
189-
// Body: Buffer.from(""),
190-
// }),
191-
// );
192-
193-
// await sendSnsMessage("duplicate-file/", "put");
194-
// await waitUntil(async () => {
195-
// const stats = await stat(join(CLIENT_1_DIR, "duplicate-file/"));
196-
// return stats.isDirectory();
197-
// });
198-
199-
// const { Contents } = await s3Client.send(
200-
// new ListObjectsV2Command({
201-
// Bucket: S3_BUCKET,
202-
// Prefix: "duplicate-file/",
203-
// }),
204-
// );
205-
// expect(Contents?.length).toBe(1);
206-
// expect(Contents?.[0]?.Key).toBe("duplicate-file/");
207-
// });
174+
it("handles duplicate file/directory on S3", async () => {
175+
await stopClients([0, 1]);
176+
await upload("duplicate-file/", "");
177+
await upload("duplicate-file/nested/", "");
178+
await upload("duplicate-file/nested/file.txt", "...");
179+
await upload("duplicate-file", "");
180+
181+
startClients([0, 1]);
182+
await waitUntil(() =>
183+
readFile(join(clientDirectories[0], "duplicate-file"), "utf-8"),
184+
);
185+
186+
const { Contents } = await list("duplicate-file");
187+
expect(Contents?.length).toBe(1);
188+
expect(Contents?.[0]?.Key).toBe("duplicate-file");
189+
});
208190
});

0 commit comments

Comments
 (0)