Skip to content

Commit

Permalink
nats compute servers: change frontend to use new nats based manager
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 19, 2025
1 parent b7c2ca2 commit 1429472
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 151 deletions.
165 changes: 18 additions & 147 deletions src/packages/frontend/compute/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,159 +6,30 @@ are available and how they are used for a given project.
When doing dev from the browser console, do:
cc.client.project_client.computeServers('...project_id...')
cc.client.project_client.computeServers(cc.current().project_id)
*/

import { SYNCDB_PARAMS, decodeUUIDtoNum } from "@cocalc/util/compute/manager";
import { webapp_client } from "@cocalc/frontend/webapp-client";
import { redux } from "@cocalc/frontend/app-framework";
import debug from "debug";
import { once } from "@cocalc/util/async-utils";
import { EventEmitter } from "events";
import { excludeFromComputeServer } from "@cocalc/frontend/file-associations";

const log = debug("cocalc:frontend:compute:manager");

export class ComputeServersManager extends EventEmitter {
private sync_db;
private project_id;

constructor(project_id: string) {
super();
this.project_id = project_id;
this.sync_db = webapp_client.sync_db({
project_id,
...SYNCDB_PARAMS,
});
this.sync_db.on("change", () => {
this.emit("change");
});
// It's reasonable to have many clients, e.g., one for each open file
this.setMaxListeners(100);
log("created", this.project_id);
}

waitUntilReady = async () => {
const { sync_db } = this;
if (sync_db.get_state() == "init") {
// make sure project is running
redux.getActions("projects").start_project(this.project_id);

// now wait for syncdb to be ready
await once(sync_db, "ready");
}
if (sync_db.get_state() != "ready") {
throw Error("syncdb not ready");
}
};

close = () => {
delete computeServerManagerCache[this.project_id];
this.sync_db.close();
};

// save the current state to the backend. This is critical to do, e.g., before
// opening a file and after calling connectComputeServerToPath, since otherwise
// the project doesn't even know that the file should open on the compute server
// until after it has opened it, which is disconcerting and not efficient (but
// does mostly work, though it is intentionally making things really hard on ourselves).
save = async () => {
await this.sync_db.save();
};

getComputeServers = () => {
const servers = {};
const cursors = this.sync_db.get_cursors({ excludeSelf: "never" }).toJS();
for (const client_id in cursors) {
const server = cursors[client_id];
servers[decodeUUIDtoNum(client_id)] = {
time: server.time,
...server.locs[0],
};
}
return servers;
};

// Call this if you want the compute server with given id to
// connect and handle being the server for the given path.
connectComputeServerToPath = ({ id, path }: { id: number; path: string }) => {
if (id == 0) {
this.disconnectComputeServer({ path });
return;
}
assertSupportedPath(path);
this.sync_db.set({ id, path, open: true });
this.sync_db.commit();
};

// Call this if you want no compute servers to provide the backend server
// for given path.
disconnectComputeServer = ({ path }: { path: string }) => {
this.sync_db.delete({ path });
this.sync_db.commit();
};

// For interactive debugging -- display in the console how things are configured.
showStatus = () => {
console.log(JSON.stringify(this.sync_db.get().toJS(), undefined, 2));
};

// Returns the explicitly set server id for the given
// path, if one is set. Otherwise, return undefined
// if nothing is explicitly set for this path.
getServerIdForPath = async (path: string): Promise<number | undefined> => {
await this.waitUntilReady();
const { sync_db } = this;
return sync_db.get_one({ path })?.get("id");
};

// Get the server ids (as a map) for every file and every directory contained in path.
// NOTE/TODO: this just does a linear search through all paths with a server id; nothing clever.
getServerIdForSubtree = async (
path: string,
): Promise<{ [path: string]: number }> => {
const { sync_db } = this;
if (sync_db.get_state() == "init") {
await once(sync_db, "ready");
}
if (sync_db.get_state() != "ready") {
throw Error("syncdb not ready");
}
const x = sync_db.get();
const v: { [path: string]: number } = {};
if (x == null) {
return v;
}
const slash = path.endsWith("/") ? path : path + "/";
for (const y of x) {
const p = y.get("path");
if (p == path || p.startsWith(slash)) {
v[p] = y.get("id");
}
}
return v;
};
}

function assertSupportedPath(path: string) {
if (excludeFromComputeServer(path)) {
throw Error(
`Opening '${path}' on a compute server is not yet supported -- copy it to the project and open it there instead`,
);
}
}
import {
computeServerManager,
type ComputeServerManager,
} from "@cocalc/nats/compute/manager";

const computeServerManagerCache: {
[project_id: string]: ComputeServersManager;
[project_id: string]: ComputeServerManager;
} = {};

export const computeServers = (project_id: string) => {
// very simple cache with no ref counting or anything.
// close a manager only when closing the project.
export default function computeServers(
project_id: string,
): ComputeServerManager {
if (computeServerManagerCache[project_id]) {
return computeServerManagerCache[project_id];
}
const m = new ComputeServersManager(project_id);
computeServerManagerCache[project_id] = m;
return m;
};

export default computeServers;
const M = computeServerManager({ project_id });
computeServerManagerCache[project_id] = M;
M.on("close", () => {
delete computeServerManagerCache[project_id];
});
return M;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export function create_jupyter_actions(
// Ensure meta_file isn't marked as deleted, which would block
// opening the syncdb, which is clearly not the user's intention
// at this point (since we're opening the ipynb file).
redux.getProjectStore(project_id)?.get_listings()?.undelete(syncdb_path);
redux.getProjectActions(project_id)?.setNotDeleted(syncdb_path);

const syncdb = new_syncdb({
...SYNCDB_OPTIONS,
Expand Down
10 changes: 10 additions & 0 deletions src/packages/frontend/nats/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import type {
CreateNatsServiceFunction,
} from "@cocalc/nats/service";
import { listingsClient } from "@cocalc/nats/service/listings";
import {
computeServerManager,
type Options as ComputeServerManagerOptions,
} from "@cocalc/nats/compute/manager";

export class NatsClient {
client: WebappClient;
Expand Down Expand Up @@ -466,6 +470,12 @@ export class NatsClient {
}) => {
return await listingsClient(opts);
};

computeServerManager = async (options: ComputeServerManagerOptions) => {
const M = computeServerManager(options);
await M.init();
return M;
};
}

function setDeleted({ project_id, path, deleted }) {
Expand Down
2 changes: 1 addition & 1 deletion src/packages/frontend/project/nats/listings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class Listings extends EventEmitter {
};

private init = reuseInFlight(async () => {
let start = Date.now();
//let start = Date.now();
await this.createClient();
// console.log("createClient finished in ", Date.now() - start, "ms");
if (this.state == "closed") return;
Expand Down
1 change: 1 addition & 0 deletions src/packages/nats/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"./sync/*": "./dist/sync/*.js",
"./hub-api": "./dist/hub-api/index.js",
"./hub-api/*": "./dist/hub-api/*.js",
"./compute/*": "./dist/compute/*.js",
"./service": "./dist/service/index.js",
"./project-api": "./dist/project-api/index.js",
"./browser-api": "./dist/browser-api/index.js",
Expand Down
5 changes: 3 additions & 2 deletions src/packages/nats/sync/open-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import { dkv, type DKV } from "@cocalc/nats/sync/dkv";
import { nanos } from "@cocalc/nats/util";
import { EventEmitter } from "events";

// 1 day
const MAX_AGE_MS = 1000 * 60 * 60 * 24;
// info about interest in open files (and also what was explicitly deleted) older
// than this is automatically purged.
const MAX_AGE_MS = 7 * (1000 * 60 * 60 * 24);

export interface Entry {
// path to file relative to HOME
Expand Down

0 comments on commit 1429472

Please sign in to comment.