Skip to content

Commit

Permalink
fix: use a singleton http agent for queue events (#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Nov 1, 2023
1 parent 23650d2 commit 84c627e
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 85 deletions.
8 changes: 5 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dotenv from 'dotenv'

type StorageBackendType = 'file' | 's3'
export type StorageBackendType = 'file' | 's3'

type StorageConfigType = {
version: string
Expand All @@ -12,7 +12,7 @@ type StorageConfigType = {
encryptionKey: string
fileSizeLimit: number
fileStoragePath?: string
globalS3Protocol?: 'http' | 'https' | string
globalS3Protocol: 'http' | 'https'
globalS3MaxSockets?: number
globalS3Bucket: string
globalS3Endpoint?: string
Expand Down Expand Up @@ -111,7 +111,9 @@ export function getConfig(): StorageConfigType {
fileSizeLimit: Number(getConfigFromEnv('FILE_SIZE_LIMIT')),
fileStoragePath: getOptionalConfigFromEnv('FILE_STORAGE_BACKEND_PATH'),
globalS3MaxSockets: parseInt(getOptionalConfigFromEnv('GLOBAL_S3_MAX_SOCKETS') || '200', 10),
globalS3Protocol: getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https',
globalS3Protocol: (getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https') as
| 'http'
| 'https',
globalS3Bucket: getConfigFromEnv('GLOBAL_S3_BUCKET'),
globalS3Endpoint: getOptionalConfigFromEnv('GLOBAL_S3_ENDPOINT'),
globalS3ForcePathStyle: getOptionalConfigFromEnv('GLOBAL_S3_FORCE_PATH_STYLE') === 'true',
Expand Down
6 changes: 2 additions & 4 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,22 @@ export class TenantConnection {
acquireConnectionTimeout: databaseConnectionTimeout,
})

DbActivePool.inc({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.inc({ is_external: isExternalPool.toString() })

knexPool.client.pool.on('createSuccess', () => {
DbActiveConnection.inc({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('destroySuccess', () => {
DbActiveConnection.dec({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('poolDestroySuccess', () => {
DbActivePool.dec({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.dec({ is_external: isExternalPool.toString() })
})

if (!isExternalPool) {
Expand Down
22 changes: 1 addition & 21 deletions src/http/plugins/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import fastifyPlugin from 'fastify-plugin'
import { MetricsRegistrar, RequestErrors } from '../../monitoring/metrics'
import { MetricsRegistrar } from '../../monitoring/metrics'
import fastifyMetrics from 'fastify-metrics'
import { getConfig } from '../../config'

Expand Down Expand Up @@ -33,26 +33,6 @@ export const metrics = ({ enabledEndpoint }: MetricsOptions) =>
},
registeredRoutesOnly: true,
groupStatusCodes: true,
customLabels: {
tenant_id: (req) => {
return req.tenantId
},
},
},
})

// Errors
fastify.addHook('onResponse', async (request, reply) => {
const error = (reply.raw as any).executionError || reply.executionError

if (error) {
RequestErrors.inc({
name: error.name || error.constructor.name,
tenant_id: request.tenantId,
path: request.routerPath,
method: request.routerMethod,
status: reply.statusCode,
})
}
})
})
5 changes: 4 additions & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fastifyPlugin from 'fastify-plugin'
import { StorageBackendAdapter, createStorageBackend } from '../../storage/backend'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { getConfig } from '../../config'

declare module 'fastify' {
interface FastifyRequest {
Expand All @@ -10,8 +11,10 @@ declare module 'fastify' {
}
}

const { storageBackendType } = getConfig()

export const storage = fastifyPlugin(async (fastify) => {
const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType)

fastify.decorateRequest('storage', undefined)
fastify.addHook('preHandler', async (request) => {
Expand Down
5 changes: 1 addition & 4 deletions src/http/routes/tus/s3-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ export class S3Store extends BaseS3Store {
const timer = S3UploadPart.startTimer()

const result = await super.uploadPart(metadata, readStream, partNumber)
const resource = UploadId.fromString(metadata.file.id)

timer({
tenant_id: resource.tenant,
})
timer()

return result
}
Expand Down
28 changes: 11 additions & 17 deletions src/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,65 @@ export const MetricsRegistrar = new Registry()
export const FileUploadStarted = new client.Gauge({
name: 'storage_api_upload_started',
help: 'Upload started',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const FileUploadedSuccess = new client.Gauge({
name: 'storage_api_upload_success',
help: 'Successful uploads',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const DbQueryPerformance = new client.Histogram({
name: 'storage_api_database_query_performance',
help: 'Database query performance',
labelNames: ['tenant_id', 'region', 'name'],
})

export const RequestErrors = new client.Gauge({
name: 'storage_api_request_errors',
labelNames: ['tenant_id', 'region', 'method', 'path', 'status', 'name'],
help: 'Response Errors',
labelNames: ['region', 'name'],
})

export const QueueJobSchedulingTime = new client.Histogram({
name: 'storage_api_queue_job_scheduled_time',
help: 'Time taken to schedule a job in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobScheduled = new client.Gauge({
name: 'storage_api_queue_job_scheduled',
help: 'Current number of pending messages in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobCompleted = new client.Gauge({
name: 'storage_api_queue_job_completed',
help: 'Current number of processed messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobRetryFailed = new client.Gauge({
name: 'storage_api_queue_job_retry_failed',
help: 'Current number of failed attempts messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobError = new client.Gauge({
name: 'storage_api_queue_job_error',
help: 'Current number of errored messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const S3UploadPart = new client.Histogram({
name: 'storage_api_s3_upload_part',
help: 'S3 upload part performance',
labelNames: ['tenant_id', 'region'],
labelNames: ['region'],
})

export const DbActivePool = new client.Gauge({
name: 'storage_api_db_pool',
help: 'Number of database pools created',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})

export const DbActiveConnection = new client.Gauge({
name: 'storage_api_db_connections',
help: 'Number of database connections',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})
11 changes: 6 additions & 5 deletions src/queue/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getServiceKeyUser } from '../../database/tenant'
import { getPostgresConnection } from '../../database'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { createStorageBackend } from '../../storage/backend'
import { createAgent, createStorageBackend } from '../../storage/backend'
import { getConfig } from '../../config'
import { QueueJobScheduled, QueueJobSchedulingTime } from '../../monitoring/metrics'
import { logger } from '../../monitoring'
Expand All @@ -20,7 +20,8 @@ export interface BasePayload {

export type StaticThis<T> = { new (...args: any): T }

const { enableQueueEvents } = getConfig()
const { enableQueueEvents, storageBackendType, globalS3Protocol } = getConfig()
const httpAgent = createAgent(globalS3Protocol)

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
public static readonly version: string = 'v1'
Expand Down Expand Up @@ -113,7 +114,9 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
host: payload.tenant.host,
})

const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType, {
httpAgent,
})

return new Storage(storageBackend, db)
}
Expand Down Expand Up @@ -145,12 +148,10 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {

timer({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

QueueJobScheduled.inc({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

return res
Expand Down
3 changes: 0 additions & 3 deletions src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ export abstract class Queue {
const res = await event.handle(job)

QueueJobCompleted.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

return res
} catch (e) {
QueueJobRetryFailed.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

Expand All @@ -106,7 +104,6 @@ export abstract class Queue {
}
if (dbJob.retrycount === dbJob.retrylimit) {
QueueJobError.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})
}
Expand Down
25 changes: 19 additions & 6 deletions src/storage/backend/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import { StorageBackendAdapter } from './generic'
import { FileBackend } from './file'
import { S3Backend } from './s3'
import { getConfig } from '../../config'
import { S3Backend, S3ClientOptions } from './s3'
import { getConfig, StorageBackendType } from '../../config'

export * from './s3'
export * from './file'
export * from './generic'

const { region, globalS3Endpoint, globalS3ForcePathStyle, storageBackendType } = getConfig()
const { region, globalS3Endpoint, globalS3ForcePathStyle } = getConfig()

export function createStorageBackend() {
type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
: undefined

export function createStorageBackend<Type extends StorageBackendType>(
type: Type,
config?: ConfigForStorage<Type>
) {
let storageBackend: StorageBackendAdapter

if (storageBackendType === 'file') {
if (type === 'file') {
storageBackend = new FileBackend()
} else {
storageBackend = new S3Backend(region, globalS3Endpoint, globalS3ForcePathStyle)
const defaultOptions: S3ClientOptions = {
region: region,
endpoint: globalS3Endpoint,
forcePathStyle: globalS3ForcePathStyle,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
}

return storageBackend
Expand Down
45 changes: 31 additions & 14 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,52 @@ import Agent, { HttpsAgent } from 'agentkeepalive'

const { globalS3Protocol, globalS3MaxSockets } = getConfig()

/**
* Creates an agent for the given protocol
* @param protocol
*/
export function createAgent(protocol: 'http' | 'https') {
const agentOptions = {
maxSockets: globalS3MaxSockets,
keepAlive: true,
}

return protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
}

export interface S3ClientOptions {
endpoint?: string
region?: string
forcePathStyle?: boolean
accessKey?: string
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
}

/**
* S3Backend
* Interacts with an s3-compatible file system with this S3Adapter
*/
export class S3Backend implements StorageBackendAdapter {
client: S3Client

constructor(region: string, endpoint?: string | undefined, globalS3ForcePathStyle?: boolean) {
const agentOptions = {
maxSockets: globalS3MaxSockets,
keepAlive: true,
}

const agent =
globalS3Protocol === 'http'
? { httpAgent: new Agent(agentOptions) }
: { httpsAgent: new HttpsAgent(agentOptions) }
constructor(options: S3ClientOptions) {
const agent = options.httpAgent ? options.httpAgent : createAgent(globalS3Protocol)

const params: S3ClientConfig = {
region,
region: options.region,
runtime: 'node',
requestHandler: new NodeHttpHandler({
...agent,
}),
}
if (endpoint) {
params.endpoint = endpoint
if (options.endpoint) {
params.endpoint = options.endpoint
}
if (globalS3ForcePathStyle) {
if (options.forcePathStyle) {
params.forcePathStyle = true
}
this.client = new S3Client(params)
Expand Down
1 change: 0 additions & 1 deletion src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ export class StorageKnexDB implements Database {
): Promise<Awaited<ReturnType<T>>> {
const timer = DbQueryPerformance.startTimer({
name: queryName,
tenant_id: this.options.tenantId,
})

let tnx = this.options.tnx
Expand Down
Loading

0 comments on commit 84c627e

Please sign in to comment.