Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lazy load dsm only when needed #5305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { TEXT_MAP } = require('../../../ext/formats')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const { getResourceName } = require('./util')

class AmqplibConsumerPlugin extends ConsumerPlugin {
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-amqplib/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const { TEXT_MAP } = require('../../../ext/formats')
const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const { getResourceName } = require('./util')

class AmqplibProducerPlugin extends ProducerPlugin {
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-avsc/src/schema_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const {
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')
} = require('../../dd-trace/src/datastreams')

class SchemaExtractor {
constructor (schema) {
Expand Down
5 changes: 1 addition & 4 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use strict'
const {
getSizeOrZero
} = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams')
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')
const { storage } = require('../../../datadog-core')
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-aws-sdk/src/services/sns.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'
const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams')
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')

Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')
const { storage } = require('../../../datadog-core')
const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams')

class Sqs extends BaseAwsSdkPlugin {
static get id () { return 'sqs' }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams')

class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
static get id () { return 'google-cloud-pubsub' }
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const dc = require('dc-polyfill')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart')
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/producer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams')

const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers'

Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he
const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants')

const { expectedSchema, rawExpectedSchema } = require('./naming')
const DataStreamsContext = require('../../dd-trace/src/data_streams_context')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const {
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')
} = require('../../dd-trace/src/datastreams')

class SchemaExtractor {
constructor (schema) {
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-rhea/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { storage } = require('../../datadog-core')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams')

class RheaConsumerPlugin extends ConsumerPlugin {
static get id () { return 'rhea' }
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-rhea/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize, DsmPathwayCodec } = require('../../dd-trace/src/datastreams')

class RheaProducerPlugin extends ProducerPlugin {
static get id () { return 'rhea' }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const DataStreamsContext = require('./data_streams_context')
const DataStreamsContext = require('./context')

class DataStreamsCheckpointer {
constructor (tracer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { storage } = require('../../datadog-core')
const log = require('./log')
const { storage } = require('../../../datadog-core')
const log = require('../log')

function getDataStreamsContext () {
const store = storage('legacy').getStore()
Expand Down
92 changes: 92 additions & 0 deletions packages/dd-trace/src/datastreams/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'

const {
getAmqpMessageSize,
getHeadersSize,
getMessageSize,
getSizeOrZero
} = require('./size')

function lazyClass (classGetter, methods = [], staticMethods = []) {
let constructorArgs

const LazyClass = function (...args) {
constructorArgs = args
}

for (const method of methods) {
LazyClass.prototype[method] = function (...args) {
const ActiveClass = classGetter()
const instance = new ActiveClass(...constructorArgs)

Object.setPrototypeOf(this, instance)

return this[method](...args)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment that explains how this replaces all prototype replaced methods in one go.

Please add a separate TODO (including a time frame for a lintrule similar to how unicorn does it https://github.com/sindresorhus/eslint-plugin-unicorn/blob/main/docs/rules/expiring-todo-comments.md) that this is going to be replaced with a system that encapsulates modules and have dedicates plugins for that module or feature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually writing the comments made me realize the code can be simplified a bit so I refactored it. I also added the comments and TODO as requested, but I didn't add a time frame as this is not really something we do today, and I felt like this would need to be properly discussed and the linter capability added first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other concerns with the PR?

}

for (const method of staticMethods) {
LazyClass[method] = function (...args) {
const ActiveClass = classGetter()

for (const method of staticMethods) {
LazyClass[method] = ActiveClass[method]
}

return LazyClass[method](...args)
}
}

return LazyClass
}

const DsmPathwayCodec = lazyClass(() => require('./pathway').DsmPathwayCodec, [], [
'encode',
'decode'
])

const DataStreamsCheckpointer = lazyClass(() => require('./checkpointer').DataStreamsCheckpointer, [
'setProduceCheckpoint',
'setConsumeCheckpoint'
])

const DataStreamsManager = lazyClass(() => require('./manager').DataStreamsManager, [
'setCheckpoint',
'decodeDataStreamsContext'
])

// TODO: Are all those methods actually public?
const DataStreamsProcessor = lazyClass(() => require('./processor').DataStreamsProcessor, [
'onInterval',
'bucketFromTimestamp',
'recordCheckpoint',
'setCheckpoint',
'recordOffset',
'setOffset',
'setUrl',
'trySampleSchema',
'canSampleSchema',
'getSchema'
])

const SchemaBuilder = lazyClass(() => require('./schemas/schema_builder').SchemaBuilder, [
'build',
'addProperty',
'shouldExtractSchema'
], [
'getCache',
'getSchemaDefinition',
'getSchema'
])

module.exports = {
DsmPathwayCodec,
DataStreamsCheckpointer,
DataStreamsManager,
DataStreamsProcessor,
SchemaBuilder,
getAmqpMessageSize,
getHeadersSize,
getMessageSize,
getSizeOrZero
}
27 changes: 27 additions & 0 deletions packages/dd-trace/src/datastreams/manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict'

const { DsmPathwayCodec } = require('./pathway')
const DataStreamsContext = require('./context')

class DataStreamsManager {
constructor (processor) {
this._dataStreamsProcessor = processor
}

setCheckpoint (edgeTags, span, payloadSize = 0) {
const ctx = this._dataStreamsProcessor.setCheckpoint(
edgeTags, span, DataStreamsContext.getDataStreamsContext(), payloadSize
)
DataStreamsContext.setDataStreamsContext(ctx)
return ctx
}

decodeDataStreamsContext (carrier) {
const ctx = DsmPathwayCodec.decode(carrier)
// we erase the previous context everytime we decode a new one
DataStreamsContext.setDataStreamsContext(ctx)
return ctx
}
}

module.exports = { DataStreamsManager }
45 changes: 1 addition & 44 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { LogCollapsingLowestDenseDDSketch } = require('@datadog/sketches-js')
const { DsmPathwayCodec } = require('./pathway')
const { DataStreamsWriter } = require('./writer')
const { computePathwayHash } = require('./pathway')
const { types } = require('util')
const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')
Expand Down Expand Up @@ -115,49 +115,6 @@ class StatsBucket {
}
}

function getSizeOrZero (obj) {
if (typeof obj === 'string') {
return Buffer.from(obj, 'utf-8').length
}
if (types.isArrayBuffer(obj)) {
return obj.byteLength
}
if (Buffer.isBuffer(obj)) {
return obj.length
}
if (Array.isArray(obj) && obj.length > 0) {
if (typeof obj[0] === 'number') return Buffer.from(obj).length
let payloadSize = 0
obj.forEach(item => {
payloadSize += getSizeOrZero(item)
})
return payloadSize
}
if (obj !== null && typeof obj === 'object') {
try {
return getHeadersSize(obj)
} catch {
// pass
}
}
return 0
}

function getHeadersSize (headers) {
if (headers === undefined) return 0
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
}

function getMessageSize (message) {
const { key, value, headers } = message
return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers)
}

function getAmqpMessageSize (message) {
const { headers, content } = message
return getSizeOrZero(content) + getHeadersSize(headers)
}

class TimeBuckets extends Map {
forTime (time) {
if (!this.has(time)) {
Expand Down
53 changes: 53 additions & 0 deletions packages/dd-trace/src/datastreams/size.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helpers in this file were copied from processor.js untouched.


const { types } = require('util')

function getSizeOrZero (obj) {
if (typeof obj === 'string') {
return Buffer.from(obj, 'utf-8').length
}
if (types.isArrayBuffer(obj)) {
return obj.byteLength
}
if (Buffer.isBuffer(obj)) {
return obj.length
}
if (Array.isArray(obj) && obj.length > 0) {
if (typeof obj[0] === 'number') return Buffer.from(obj).length
let payloadSize = 0
obj.forEach(item => {
payloadSize += getSizeOrZero(item)
})
return payloadSize
}
if (obj !== null && typeof obj === 'object') {
try {
return getHeadersSize(obj)
} catch {
// pass
}
}
return 0
}

function getHeadersSize (headers) {
if (headers === undefined) return 0
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
}

function getMessageSize (message) {
const { key, value, headers } = message
return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers)
}

function getAmqpMessageSize (message) {
const { headers, content } = message
return getSizeOrZero(content) + getHeadersSize(headers)
}

module.exports = {
getMessageSize,
getHeadersSize,
getSizeOrZero,
getAmqpMessageSize
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const pick = require('../../../../datadog-core/src/utils/src/pick')
const log = require('../../log')

const { DsmPathwayCodec } = require('../../datastreams/pathway')
const { DsmPathwayCodec } = require('../../datastreams')

const base64Key = 'dd-pathway-ctx-base64'
const logKeys = [base64Key]
Expand Down
Loading
Loading