Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Configurable metrics #41

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-fans-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': minor
---

Optimized metrics initialization and configuration for extensibility.
5 changes: 5 additions & 0 deletions .changeset/silent-tips-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': minor
---

Added ability to extend configuration collection process.
5 changes: 5 additions & 0 deletions .changeset/wicked-geese-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/lib-services-framework': minor
---

Added ability for generic implementation registration and fetching on the Container.
64 changes: 55 additions & 9 deletions libs/lib-services/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,34 @@ export type ContainerImplementationDefaultGenerators = {
[type in ContainerImplementation]: () => ContainerImplementationTypes[type];
};

/**
* Helper for identifying constructors
*/
export interface Abstract<T> {
prototype: T;
}
/**
* A basic class constructor
*/
export type Newable<T> = new (...args: never[]) => T;

/**
* Identifier used to get and register implementations
*/
export type ServiceIdentifier<T = unknown> = string | symbol | Newable<T> | Abstract<T> | ContainerImplementation;

const DEFAULT_GENERATORS: ContainerImplementationDefaultGenerators = {
[ContainerImplementation.REPORTER]: () => NoOpReporter,
[ContainerImplementation.PROBES]: () => createFSProbe(),
[ContainerImplementation.TERMINATION_HANDLER]: () => createTerminationHandler()
};

/**
* A container which provides means for registering and getting various
* function implementations.
*/
export class Container {
protected implementations: Partial<ContainerImplementationTypes>;
protected implementations: Map<ServiceIdentifier<any>, any>;

/**
* Manager for system health probes
Expand All @@ -54,13 +74,39 @@ export class Container {
}

constructor() {
this.implementations = {};
this.implementations = new Map();
}

/**
* Gets an implementation given an identifier.
* An exception is thrown if the implementation has not been registered.
* Core [ContainerImplementation] identifiers are mapped to their respective implementation types.
* This also allows for getting generic implementations (unknown to the core framework) which have been registered.
*/
getImplementation<T>(identifier: Newable<T> | Abstract<T>): T;
getImplementation<T extends ContainerImplementation>(identifier: T): ContainerImplementationTypes[T];
getImplementation<T>(identifier: ServiceIdentifier<T>): T;
getImplementation<T>(identifier: ServiceIdentifier<T>): T {
const implementation = this.implementations.get(identifier);
if (!implementation) {
throw new Error(`Implementation for ${String(identifier)} has not been registered.`);
}
return implementation;
}

getImplementation<Type extends ContainerImplementation>(type: Type) {
const implementation = this.implementations[type];
/**
* Gets an implementation given an identifier.
* Null is returned if the implementation has not been registered yet.
* Core [ContainerImplementation] identifiers are mapped to their respective implementation types.
* This also allows for getting generic implementations (unknown to the core framework) which have been registered.
*/
getOptional<T>(identifier: Newable<T> | Abstract<T>): T | null;
getOptional<T extends ContainerImplementation>(identifier: T): ContainerImplementationTypes[T] | null;
getOptional<T>(identifier: ServiceIdentifier<T>): T | null;
getOptional<T>(identifier: ServiceIdentifier<T>): T | null {
const implementation = this.implementations.get(identifier);
if (!implementation) {
throw new Error(`Implementation for ${type} has not been registered.`);
return null;
}
return implementation;
}
Expand All @@ -71,15 +117,15 @@ export class Container {
registerDefaults(options?: RegisterDefaultsOptions) {
_.difference(Object.values(ContainerImplementation), options?.skip ?? []).forEach((type) => {
const generator = DEFAULT_GENERATORS[type];
this.implementations[type] = generator() as any; // :(
this.register(type, generator());
});
}

/**
* Allows for overriding a default implementation
* Allows for registering core and generic implementations of services/helpers.
*/
register<Type extends ContainerImplementation>(type: Type, implementation: ContainerImplementationTypes[Type]) {
this.implementations[type] = implementation;
register<T>(identifier: ServiceIdentifier<T>, implementation: T) {
this.implementations.set(identifier, implementation);
}
}

Expand Down
2 changes: 0 additions & 2 deletions packages/service-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
"dependencies": {
"@js-sdsl/ordered-set": "^4.4.2",
"@opentelemetry/api": "~1.8.0",
"@opentelemetry/exporter-metrics-otlp-http": "^0.51.1",
"@opentelemetry/exporter-prometheus": "^0.51.1",
"@opentelemetry/resources": "^1.24.1",
"@opentelemetry/sdk-metrics": "1.24.1",
"@powersync/lib-services-framework": "workspace:*",
"@powersync/service-jpgwire": "workspace:*",
Expand Down
69 changes: 2 additions & 67 deletions packages/service-core/src/metrics/Metrics.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import { Attributes, Counter, ObservableGauge, UpDownCounter, ValueType } from '@opentelemetry/api';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
import { MeterProvider, MetricReader, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http';
import { MeterProvider } from '@opentelemetry/sdk-metrics';
import * as jpgwire from '@powersync/service-jpgwire';
import * as util from '../util/util-index.js';
import * as storage from '../storage/storage-index.js';
import { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js';
import { Resource } from '@opentelemetry/resources';
import { logger } from '@powersync/lib-services-framework';

export interface MetricsOptions {
Expand All @@ -16,8 +13,6 @@ export interface MetricsOptions {
}

export class Metrics {
private static instance: Metrics;

private prometheusExporter: PrometheusExporter;
private meterProvider: MeterProvider;

Expand Down Expand Up @@ -60,7 +55,7 @@ export class Metrics {
// Record on API pod
public concurrent_connections: UpDownCounter<Attributes>;

private constructor(meterProvider: MeterProvider, prometheusExporter: PrometheusExporter) {
constructor(meterProvider: MeterProvider, prometheusExporter: PrometheusExporter) {
this.meterProvider = meterProvider;
this.prometheusExporter = prometheusExporter;
const meter = meterProvider.getMeter('powersync');
Expand Down Expand Up @@ -132,66 +127,6 @@ export class Metrics {
this.concurrent_connections.add(0);
}

public static getInstance(): Metrics {
if (!Metrics.instance) {
throw new Error('Metrics have not been initialised');
}

return Metrics.instance;
}

public static async initialise(options: MetricsOptions): Promise<void> {
if (Metrics.instance) {
return;
}
logger.info('Configuring telemetry.');

logger.info(
`
Attention:
PowerSync collects completely anonymous telemetry regarding usage.
This information is used to shape our roadmap to better serve our customers.
You can learn more, including how to opt-out if you'd not like to participate in this anonymous program, by visiting the following URL:
https://docs.powersync.com/self-hosting/telemetry
Anonymous telemetry is currently: ${options.disable_telemetry_sharing ? 'disabled' : 'enabled'}
`.trim()
);

const configuredExporters: MetricReader[] = [];

const port: number = util.env.METRICS_PORT ?? 0;
const prometheusExporter = new PrometheusExporter({ port: port, preventServerStart: true });
configuredExporters.push(prometheusExporter);

if (!options.disable_telemetry_sharing) {
logger.info('Sharing anonymous telemetry');
const periodicExporter = new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: options.internal_metrics_endpoint
}),
exportIntervalMillis: 1000 * 60 * 5 // 5 minutes
});

configuredExporters.push(periodicExporter);
}

const meterProvider = new MeterProvider({
resource: new Resource({
['service']: 'PowerSync',
['instance_id']: options.powersync_instance_id
}),
readers: configuredExporters
});

if (port > 0) {
await prometheusExporter.startServer();
}

Metrics.instance = new Metrics(meterProvider, prometheusExporter);

logger.info('Telemetry configuration complete.');
}

public async shutdown(): Promise<void> {
await this.meterProvider.shutdown();
}
Expand Down
16 changes: 10 additions & 6 deletions packages/service-core/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ WHERE oid = $1::regclass`,
await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: record });
}
at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);
container.getImplementation(Metrics).rows_replicated_total.add(rows.length);

await touch();
}
Expand Down Expand Up @@ -492,19 +492,21 @@ WHERE oid = $1::regclass`,
return null;
}

const metrics = container.getImplementation(Metrics);

if (msg.tag == 'insert') {
Metrics.getInstance().rows_replicated_total.add(1);
metrics.rows_replicated_total.add(1);
const baseRecord = util.constructAfterRecord(msg);
return await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: baseRecord });
} else if (msg.tag == 'update') {
Metrics.getInstance().rows_replicated_total.add(1);
metrics.rows_replicated_total.add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const before = util.constructBeforeRecord(msg);
const after = util.constructAfterRecord(msg);
return await batch.save({ tag: 'update', sourceTable: table, before: before, after: after });
} else if (msg.tag == 'delete') {
Metrics.getInstance().rows_replicated_total.add(1);
metrics.rows_replicated_total.add(1);
const before = util.constructBeforeRecord(msg)!;

return await batch.save({ tag: 'delete', sourceTable: table, before: before, after: undefined });
Expand Down Expand Up @@ -555,6 +557,8 @@ WHERE oid = $1::regclass`,
// Auto-activate as soon as initial replication is done
await this.storage.autoActivate();

const metrics = container.getImplementation(Metrics);

await this.storage.startBatch({}, async (batch) => {
// Replication never starts in the middle of a transaction
let inTx = false;
Expand All @@ -577,7 +581,7 @@ WHERE oid = $1::regclass`,
} else if (msg.tag == 'begin') {
inTx = true;
} else if (msg.tag == 'commit') {
Metrics.getInstance().transactions_replicated_total.add(1);
metrics.transactions_replicated_total.add(1);
inTx = false;
await batch.commit(msg.lsn!);
await this.ack(msg.lsn!, replicationStream);
Expand All @@ -602,7 +606,7 @@ WHERE oid = $1::regclass`,
}
}

Metrics.getInstance().chunks_replicated_total.add(1);
metrics.chunks_replicated_total.add(1);
}
});
}
Expand Down
8 changes: 5 additions & 3 deletions packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { errors, logger, schema } from '@powersync/lib-services-framework';
import { container, errors, logger, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { serialize } from 'bson';

Expand Down Expand Up @@ -66,7 +66,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
observer.triggerCancel();
});

Metrics.getInstance().concurrent_connections.add(1);
const metrics = container.getImplementation(Metrics);

metrics.concurrent_connections.add(1);
const tracker = new RequestTracker();
try {
for await (const data of streamResponse({
Expand Down Expand Up @@ -134,7 +136,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
operations_synced: tracker.operationsSynced,
data_synced_bytes: tracker.dataSyncedBytes
});
Metrics.getInstance().concurrent_connections.add(-1);
metrics.concurrent_connections.add(-1);
}
}
});
9 changes: 5 additions & 4 deletions packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { errors, logger, router, schema } from '@powersync/lib-services-framework';
import { container, errors, logger, router, schema } from '@powersync/lib-services-framework';
import { RequestParameters } from '@powersync/service-sync-rules';
import { Readable } from 'stream';

Expand Down Expand Up @@ -43,10 +43,11 @@ export const syncStreamed = routeDefinition({
description: 'No sync rules available'
});
}
const metrics = container.getImplementation(Metrics);
const controller = new AbortController();
const tracker = new RequestTracker();
try {
Metrics.getInstance().concurrent_connections.add(1);
metrics.concurrent_connections.add(1);
const stream = Readable.from(
sync.transformToBytesTracked(
sync.ndjson(
Expand Down Expand Up @@ -89,7 +90,7 @@ export const syncStreamed = routeDefinition({
data: stream,
afterSend: async () => {
controller.abort();
Metrics.getInstance().concurrent_connections.add(-1);
metrics.concurrent_connections.add(-1);
logger.info(`Sync stream complete`, {
user_id: syncParams.user_id,
operations_synced: tracker.operationsSynced,
Expand All @@ -99,7 +100,7 @@ export const syncStreamed = routeDefinition({
});
} catch (ex) {
controller.abort();
Metrics.getInstance().concurrent_connections.add(-1);
metrics.concurrent_connections.add(-1);
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions packages/service-core/src/sync/RequestTracker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { container } from '@powersync/lib-services-framework';
import { Metrics } from '../metrics/Metrics.js';

/**
Expand All @@ -9,13 +10,12 @@ export class RequestTracker {

addOperationsSynced(operations: number) {
this.operationsSynced += operations;

Metrics.getInstance().operations_synced_total.add(operations);
container.getImplementation(Metrics).operations_synced_total.add(operations);
}

addDataSynced(bytes: number) {
this.dataSyncedBytes += bytes;

Metrics.getInstance().data_synced_bytes.add(bytes);
container.getImplementation(Metrics).data_synced_bytes.add(bytes);
}
}
1 change: 0 additions & 1 deletion packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import * as storage from '../storage/storage-index.js';
import * as util from '../util/util-index.js';

import { logger } from '@powersync/lib-services-framework';
import { Metrics } from '../metrics/Metrics.js';
import { mergeAsyncIterables } from './merge.js';
import { TokenStreamOptions, tokenStream } from './util.js';
import { RequestTracker } from './RequestTracker.js';
Expand Down
1 change: 0 additions & 1 deletion packages/service-core/src/sync/util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as timers from 'timers/promises';

import * as util from '../util/util-index.js';
import { Metrics } from '../metrics/Metrics.js';
import { RequestTracker } from './RequestTracker.js';

export type TokenStreamOptions = {
Expand Down
Loading
Loading