Skip to content

Commit

Permalink
nats listings: packaged client
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 17, 2025
1 parent 3a2c32b commit 2e9c543
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/packages/backend/path-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const logger = getLogger("backend:path-watcher");
const POLLING = true;

const DEFAULT_POLL_MS = parseInt(
process.env.COCALC_FS_WATCHER_POLL_INTERVAL_MS ?? "3000",
process.env.COCALC_FS_WATCHER_POLL_INTERVAL_MS ?? "2000",
);

const ChokidarOpts: WatchOptions = {
Expand Down
23 changes: 4 additions & 19 deletions src/packages/frontend/nats/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ import type {
CallNatsServiceFunction,
CreateNatsServiceFunction,
} from "@cocalc/nats/service";
import {
createListingsClient,
getListingsKV,
getListingsTimesKV,
} from "@cocalc/nats/service/listings";
import { listingsClient } from "@cocalc/nats/service/listings";

export class NatsClient {
client: WebappClient;
Expand Down Expand Up @@ -440,28 +436,17 @@ export class NatsClient {
return await dko<T>({ env: await this.getEnv(), ...opts });
};

microservicesClient = async () => {
microservices = async () => {
const nc = await this.getConnection();
// @ts-ignore
const svcm = new Svcm(nc);
return svcm.client();
};

listings = (opts: { project_id: string; compute_server_id?: number }) => {
return createListingsClient(opts);
};

getListingsKV = async (opts: {
project_id: string;
compute_server_id?: number;
}) => {
return await getListingsKV(opts);
};

getListingsTimesKV = async (opts: {
listings = async (opts: {
project_id: string;
compute_server_id?: number;
}) => {
return await getListingsTimesKV(opts);
return await listingsClient(opts);
};
}
61 changes: 57 additions & 4 deletions src/packages/nats/service/listings.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/*
Service for expressing interest in directory listings in a project or compute server.
Service for watching directory listings in a project or compute server.
*/

import { createServiceClient, createServiceHandler } from "./typed";
import type { DirectoryListingEntry } from "@cocalc/util/types";
import { dkv, type DKV } from "@cocalc/nats/sync/dkv";
import { EventEmitter } from "events";

// record info about at most this many files in a given directory
export const MAX_FILES_PER_DIRECTORY = 10;
Expand All @@ -19,9 +20,9 @@ export const INTEREST_CUTOFF_MS = 1000 * 30;

//export const INTEREST_CUTOFF_MS = 1000 * 60 * 10;

interface ListingsApi {
export interface ListingsApi {
// cause the directory listing key:value store to watch path
interest: (path: string) => Promise<void>;
watch: (path: string) => Promise<void>;

// just directly get the listing info now for this path
getListing: (opts: {
Expand Down Expand Up @@ -88,7 +89,7 @@ export async function getListingsKV(
export interface Times {
// time last files for a given directory were attempted to be updated
updated?: number;
// time user last expressed interest in a given directory
// time user requested to watch a given directory
interest?: number;
}

Expand All @@ -101,3 +102,55 @@ export async function getListingsTimesKV(
...opts,
});
}

/* Unified interface to the above components for clients */

export class ListingsClient extends EventEmitter {
options: { project_id: string; compute_server_id: number };
api: ListingsApi;
times: DKV<Times>;
listings: DKV<Listing>;

constructor({
project_id,
compute_server_id = 0,
}: {
project_id: string;
compute_server_id?: number;
}) {
super();
this.options = { project_id, compute_server_id };
}

init = async () => {
this.api = createListingsClient(this.options);
this.times = await getListingsTimesKV(this.options);
this.listings = await getListingsKV(this.options);
this.listings.on("change", (path) => this.emit("change", path));
};

get = (path: string): Listing | undefined => {
return this.listings.get(path);
};

getAll = () => this.listings.getAll();

close = () => {
this.times.close();
this.listings.close();
};

watch = async (path) => {
await this.api.watch(path);
};

getListing = async (opts) => {
return await this.api.getListing(opts);
};
}

export async function listingsClient(options) {
const C = new ListingsClient(options);
await C.init();
return C;
}
13 changes: 11 additions & 2 deletions src/packages/nats/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,16 @@ export async function waitForNatsService({
let d = 100;
let m = 100;
const start = Date.now();
let ping = await pingNatsService({ options, maxWait: m });
const getPing = async (m: number) => {
try {
return await pingNatsService({ options, maxWait: m });
} catch {
// ping can fail, e.g, if not connected to nats at all or the ping
// service isn't up yet.
return [] as ServiceIdentity[];
}
};
let ping = await getPing(m);
while (ping.length == 0) {
d = Math.min(10000, d * 1.3);
m = Math.min(1500, m * 1.3);
Expand All @@ -271,7 +280,7 @@ export async function waitForNatsService({
throw Error("timeout");
}
await delay(d);
ping = await pingNatsService({ options, maxWait: m });
ping = await getPing(m);
}
return ping;
}
2 changes: 1 addition & 1 deletion src/packages/nats/sync/dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class DKV<T = any> extends EventEmitter {
const kvname = jsName({ account_id, project_id });
this.name = name + localLocationName(options);
this.sha1 = env.sha1 ?? sha1;
this.prefix = this.sha1(name);
this.prefix = this.sha1(this.name);
this.opts = {
name: kvname,
filter: `${this.prefix}.>`,
Expand Down
2 changes: 1 addition & 1 deletion src/packages/nats/sync/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class KV<T = any> extends EventEmitter {
throw Error("env must be defined");
}
this.sha1 = env.sha1 ?? sha1;
this.prefix = this.sha1(name);
this.prefix = this.sha1(this.name);
this.generalKV = new GeneralKV({
name: kvname,
filter: `${this.prefix}.>`,
Expand Down
23 changes: 19 additions & 4 deletions src/packages/project/nats/listings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@
few hundred (ordered by recent) files in that directory, all relative
to the home directory.
DEVELOPMENT:
1. Setup project environment variables as usual
2. Stop listings service running in the project:
define COCALC_PROJECT_ID, COCALC_NATS_JWT, etc., as usual.
3. Start your own server
.../src/packages/project/nats$ node
> await require('@cocalc/project/nats/listings').init()
*/

import getListing from "@cocalc/backend/get-listing";
Expand All @@ -34,11 +49,11 @@ let listings: Listings | null;

const impl = {
// cause the directory listing key:value store to watch path
interest: async (path: string) => {
watch: async (path: string) => {
while (listings == null) {
await delay(3000);
}
listings.interest(path);
listings.watch(path);
},

getListing: async ({ path, hidden }) => {
Expand Down Expand Up @@ -164,8 +179,8 @@ class Listings {
}
};

interest = async (path: string) => {
logger.debug("interest", { path });
watch = async (path: string) => {
logger.debug("watch", { path });
path = canonicalPath(path);
this.times.set(path, { ...this.times.get(path), interest: Date.now() });
this.updateListing(path);
Expand Down

0 comments on commit 2e9c543

Please sign in to comment.