diff --git a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts index f87c2a143b..96760bfa30 100644 --- a/apps/webapp/app/v3/dynamicFlushScheduler.server.ts +++ b/apps/webapp/app/v3/dynamicFlushScheduler.server.ts @@ -1,70 +1,180 @@ import { nanoid } from "nanoid"; +import pLimit from "p-limit"; +import { Gauge } from "prom-client"; +import { metricsRegister } from "~/metrics.server"; +import { logger } from "~/services/logger.server"; export type DynamicFlushSchedulerConfig = { batchSize: number; flushInterval: number; + maxConcurrency?: number; callback: (flushId: string, batch: T[]) => Promise; }; export class DynamicFlushScheduler { - private batchQueue: T[][]; // Adjust the type according to your data structure private currentBatch: T[]; // Adjust the type according to your data structure private readonly BATCH_SIZE: number; private readonly FLUSH_INTERVAL: number; + private readonly MAX_CONCURRENCY: number; + private readonly concurrencyLimiter: ReturnType; private flushTimer: NodeJS.Timeout | null; private readonly callback: (flushId: string, batch: T[]) => Promise; + private isShuttingDown; + private failedBatchCount; constructor(config: DynamicFlushSchedulerConfig) { - this.batchQueue = []; this.currentBatch = []; this.BATCH_SIZE = config.batchSize; this.FLUSH_INTERVAL = config.flushInterval; - this.callback = config.callback; + this.MAX_CONCURRENCY = config.maxConcurrency || 1; + this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY); this.flushTimer = null; + this.callback = config.callback; + this.isShuttingDown = false; + this.failedBatchCount = 0; + + logger.info("Initializing DynamicFlushScheduler", { + batchSize: this.BATCH_SIZE, + flushInterval: this.FLUSH_INTERVAL, + maxConcurrency: this.MAX_CONCURRENCY, + }); + this.startFlushTimer(); + this.setupShutdownHandlers(); + + if (!process.env.VITEST) { + const scheduler = this; + new Gauge({ + name: "dynamic_flush_scheduler_batch_size", + help: "Number of items in the current dynamic flush scheduler batch", + collect() { + this.set(scheduler.currentBatch.length); + }, + registers: [metricsRegister], + }); + + new Gauge({ + name: "dynamic_flush_scheduler_failed_batches", + help: "Number of failed batches", + collect() { + this.set(scheduler.failedBatchCount); + }, + registers: [metricsRegister], + }); + } } - addToBatch(items: T[]): void { + /** + * + * If you want to fire and forget, don't await this method. + */ + async addToBatch(items: T[]): Promise { + // TODO: consider using concat. spread is not performant this.currentBatch.push(...items); + logger.debug("Adding items to batch", { + currentBatchSize: this.currentBatch.length, + itemsAdded: items.length, + }); if (this.currentBatch.length >= this.BATCH_SIZE) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; - this.flushNextBatch(); + logger.debug("Batch size threshold reached, initiating flush", { + batchSize: this.BATCH_SIZE, + currentSize: this.currentBatch.length, + }); + await this.flushNextBatch(); this.resetFlushTimer(); } } private startFlushTimer(): void { this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL); + logger.debug("Started flush timer", { interval: this.FLUSH_INTERVAL }); } - private resetFlushTimer(): void { + private setupShutdownHandlers() { + process.on("SIGTERM", this.shutdown.bind(this)); + process.on("SIGINT", this.shutdown.bind(this)); + logger.debug("Shutdown handlers configured"); + } + + private async shutdown(): Promise { + if (this.isShuttingDown) return; + this.isShuttingDown = true; + logger.info("Initiating shutdown of dynamic flush scheduler", { + remainingItems: this.currentBatch.length, + }); + + await this.checkAndFlush(); + this.clearTimer(); + + logger.info("Dynamic flush scheduler shutdown complete", { + totalFailedBatches: this.failedBatchCount, + }); + } + + private clearTimer(): void { if (this.flushTimer) { clearInterval(this.flushTimer); + logger.debug("Flush timer cleared"); } + } + + private resetFlushTimer(): void { + this.clearTimer(); this.startFlushTimer(); + logger.debug("Flush timer reset"); } - private checkAndFlush(): void { + private async checkAndFlush(): Promise { if (this.currentBatch.length > 0) { - this.batchQueue.push(this.currentBatch); - this.currentBatch = []; + logger.debug("Periodic flush check triggered", { + currentBatchSize: this.currentBatch.length, + }); + await this.flushNextBatch(); } - this.flushNextBatch(); } private async flushNextBatch(): Promise { - if (this.batchQueue.length === 0) return; - - const batchToFlush = this.batchQueue.shift(); - try { - await this.callback(nanoid(), batchToFlush!); - if (this.batchQueue.length > 0) { - this.flushNextBatch(); - } - } catch (error) { - console.error("Error inserting batch:", error); + if (this.currentBatch.length === 0) return; + + const batches: T[][] = []; + while (this.currentBatch.length > 0) { + batches.push(this.currentBatch.splice(0, this.BATCH_SIZE)); } + + logger.info("Starting batch flush", { + numberOfBatches: batches.length, + totalItems: batches.reduce((sum, batch) => sum + batch.length, 0), + }); + + // TODO: report plimit.activeCount and pLimit.pendingCount and pLimit.concurrency to /metrics + const promises = batches.map((batch) => + this.concurrencyLimiter(async () => { + const batchId = nanoid(); + try { + await this.callback(batchId, batch!); + } catch (error) { + logger.error("Error processing batch", { + batchId, + error, + batchSize: batch.length, + errorMessage: error instanceof Error ? error.message : "Unknown error", + }); + throw error; + } + }) + ); + + const results = await Promise.allSettled(promises); + + const failedBatches = results.filter((result) => result.status === "rejected").length; + this.failedBatchCount += failedBatches; + + logger.info("Batch flush complete", { + totalBatches: batches.length, + successfulBatches: batches.length - failedBatches, + failedBatches, + totalFailedBatches: this.failedBatchCount, + }); } } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 25cd5f3844..6f485ec7c7 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -45,18 +45,19 @@ "@codemirror/view": "^6.5.0", "@conform-to/react": "^0.6.1", "@conform-to/zod": "^0.6.1", - "@depot/sdk-node": "^1.0.0", "@depot/cli": "0.0.1-cli.2.80.0", + "@depot/sdk-node": "^1.0.0", "@electric-sql/react": "^0.3.5", "@headlessui/react": "^1.7.8", "@heroicons/react": "^2.0.12", - "@internal/run-engine": "workspace:*", - "@internal/zod-worker": "workspace:*", "@internal/redis": "workspace:*", "@internal/redis-worker": "workspace:*", + "@internal/run-engine": "workspace:*", + "@internal/zod-worker": "workspace:*", "@internationalized/date": "^3.5.1", "@lezer/highlight": "^1.1.6", "@opentelemetry/api": "1.9.0", + "@opentelemetry/api-logs": "0.52.1", "@opentelemetry/core": "1.25.1", "@opentelemetry/exporter-logs-otlp-http": "0.52.1", "@opentelemetry/exporter-trace-otlp-http": "0.52.1", @@ -69,7 +70,6 @@ "@opentelemetry/sdk-trace-base": "1.25.1", "@opentelemetry/sdk-trace-node": "1.25.1", "@opentelemetry/semantic-conventions": "1.25.1", - "@opentelemetry/api-logs": "0.52.1", "@popperjs/core": "^2.11.8", "@prisma/instrumentation": "^5.11.0", "@radix-ui/react-alert-dialog": "^1.0.4", @@ -145,6 +145,7 @@ "non.geist": "^1.0.2", "ohash": "^1.1.3", "openai": "^4.33.1", + "p-limit": "^6.2.0", "parse-duration": "^1.1.0", "posthog-js": "^1.93.3", "posthog-node": "^3.1.3", diff --git a/apps/webapp/test/dynamicFlushScheduler.test.ts b/apps/webapp/test/dynamicFlushScheduler.test.ts new file mode 100644 index 0000000000..d2a4392fdf --- /dev/null +++ b/apps/webapp/test/dynamicFlushScheduler.test.ts @@ -0,0 +1,90 @@ +import { describe, it, expect } from "vitest"; +import { DynamicFlushScheduler } from "../app/v3/dynamicFlushScheduler.server"; + +describe("DynamicFlushScheduler", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.clearAllMocks(); + vi.resetAllMocks(); + }); + + it("doesn't call callback when there are no items", () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 5000, + callback, + }); + dynamicFlushScheduler.addToBatch([]); + + expect(callback).toBeCalledTimes(0); + }); + + it("calls callback once with batchSize items", async () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 5000, + callback, + }); + const items = [1, 2, 3]; + await dynamicFlushScheduler.addToBatch(items); + + expect(callback).toBeCalledTimes(1); + expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3]); + }); + + it("calls callback when flush interval is reached", async () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 100, + flushInterval: 3000, + callback, + }); + const items = [1, 2, 3, 4, 5]; + dynamicFlushScheduler.addToBatch(items); + + await vi.advanceTimersByTimeAsync(3000); + + expect(callback).toBeCalledTimes(1); + expect(callback).toBeCalledWith(expect.any(String), [1, 2, 3, 4, 5]); + }); + + it("calls callback multiple times with the correct batch size", async () => { + const callback = vi.fn(); + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 3, + flushInterval: 10000, + callback, + }); + const items = [1, 2, 3, 4, 5, 6]; + await dynamicFlushScheduler.addToBatch(items); + + expect(callback).toHaveBeenCalledTimes(2); + expect(callback).toHaveBeenNthCalledWith(1, expect.any(String), [1, 2, 3]); + expect(callback).toHaveBeenNthCalledWith(2, expect.any(String), [4, 5, 6]); + }); + + it("handles SIGTERM signal correctly", async () => { + const callback = vi.fn(); + + const processOnMock = vi.fn(); + process.on = processOnMock; + + const dynamicFlushScheduler = new DynamicFlushScheduler({ + batchSize: 10, + flushInterval: 5000, + callback, + }); + + const items = [1, 2, 3, 4, 5, 6]; + await dynamicFlushScheduler.addToBatch(items); + + const sigtermHandler = processOnMock.mock.calls.find((call) => call[0] === "SIGTERM")[1]; + + await sigtermHandler(); + + expect(callback).toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith(expect.any(String), items); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 52ab355a58..e001bf94ab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -547,6 +547,9 @@ importers: openai: specifier: ^4.33.1 version: 4.33.1 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 parse-duration: specifier: ^1.1.0 version: 1.1.0