Skip to content

Commit

Permalink
nats: starting project and initial listing -- improve
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 18, 2025
1 parent cd3a551 commit b7c2ca2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 15 deletions.
86 changes: 78 additions & 8 deletions src/packages/frontend/project/nats/listings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
type ListingsApi,
MIN_INTEREST_INTERVAL_MS,
} from "@cocalc/nats/service/listings";
import { delay } from "awaiting";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";

export const WATCH_THROTTLE_MS = MIN_INTEREST_INTERVAL_MS;

Expand All @@ -40,25 +42,93 @@ export class Listings extends EventEmitter {
this.init();
}

private init = async () => {
this.listingsClient = await listingsClient({
project_id: this.project_id,
compute_server_id: this.compute_server_id,
});
private createClient = async () => {
let d = 3000;
const MAX_DELAY_MS = 15000;
while (this.state != "closed") {
try {
this.listingsClient = await listingsClient({
project_id: this.project_id,
compute_server_id: this.compute_server_id,
});
// success!
return;
} catch (err) {
// console.log("creating listings client failed", err);
if (err.code == "PERMISSIONS_VIOLATION") {
try {
// console.log(
// `request update of our credentials to include ${this.project_id}, then try again`,
// );
await webapp_client.nats_client.hub.projects.addProjectPermission({
project_id: this.project_id,
});
continue;
} catch (err) {
// console.log("updating permissions failed", err);
d = Math.max(7000, d);
}
}
}
if (this.state == ("closed" as State)) return;
d = Math.min(MAX_DELAY_MS, d * 1.3);
await delay(d);
}
};

private init = reuseInFlight(async () => {
let start = Date.now();
await this.createClient();
// console.log("createClient finished in ", Date.now() - start, "ms");
if (this.state == "closed") return;
if (this.listingsClient == null) {
throw Error("bug");
}
this.listingsClient.on("change", (path) => {
this.emit("change", [path]);
});
// cause load of all cached data into redux
this.emit("change", Object.keys(this.listingsClient.getAll()));
// [ ] TODO: delete event for deleted paths
this.setState("ready");
};
});

// Watch directory for changes.
watch = async (path: string, force?): Promise<void> => {
if (this.state == "closed") {
return;
}
if (this.state != "ready") {
await this.init();
}
if (this.state != "ready") {
// failed forever or closed explicitly so don't care
return;
}
if (this.listingsClient == null) {
throw Error("listings not ready");
}
try {
await this.listingsClient?.watch(path, force);
} catch {}
await this.listingsClient.watch(path, force);
} catch (err) {
if (err.code == "503") {
// The listings service isn't running in the project right now,
// e.g., maybe the project isn't running at all.
// So watch is a no-op, as it does nothing when listing
// server doesn't exist. So we at least wait for a while
// e.g., maybe project is tarting, then try once more.
await this.listingsClient.api.nats.waitFor();
try {
await this.listingsClient.watch(path, force);
} catch (err) {
if (err.code != "503") {
throw err;
}
}
} else {
throw err;
}
}
};

get = async (
Expand Down
27 changes: 20 additions & 7 deletions src/packages/nats/service/listings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export async function getListingsTimesKV(

export class ListingsClient extends EventEmitter {
options: { project_id: string; compute_server_id: number };
api: ListingsApi;
api: Awaited<ReturnType<typeof createListingsApiClient>>;
times?: DKV<Times>;
listings?: DKV<Listing>;

Expand All @@ -125,10 +125,19 @@ export class ListingsClient extends EventEmitter {
}

init = async () => {
this.api = createListingsApiClient(this.options);
this.times = await getListingsTimesKV(this.options);
this.listings = await getListingsKV(this.options);
this.listings.on("change", ({ key: path }) => this.emit("change", path));
try {
this.api = createListingsApiClient(this.options);
this.times = await getListingsTimesKV(this.options);
this.listings = await getListingsKV(this.options);
this.listings.on("change", this.handleListingsChange);
} catch (err) {
this.close();
throw err;
}
};

handleListingsChange = ({ key: path }) => {
this.emit("change", path);
};

get = (path: string): Listing | undefined => {
Expand All @@ -146,10 +155,14 @@ export class ListingsClient extends EventEmitter {
};

close = () => {
this.removeAllListeners();
this.times?.close();
delete this.times;
this.listings?.close();
delete this.listings;
if (this.listings != null) {
this.listings.removeListener("change", this.handleListingsChange);
this.listings.close();
delete this.listings;
}
};

watch = async (path, force = false) => {
Expand Down

0 comments on commit b7c2ca2

Please sign in to comment.