Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lazy load dsm only when needed #5305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const { TEXT_MAP } = require('../../../ext/formats')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const { getResourceName } = require('./util')

class AmqplibConsumerPlugin extends ConsumerPlugin {
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-amqplib/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const { TEXT_MAP } = require('../../../ext/formats')
const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const { getResourceName } = require('./util')

class AmqplibProducerPlugin extends ProducerPlugin {
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-avsc/src/schema_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const {
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')
} = require('../../dd-trace/src/datastreams')

class SchemaExtractor {
constructor (schema) {
Expand Down
5 changes: 1 addition & 4 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use strict'
const {
getSizeOrZero
} = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams')
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')
const { storage } = require('../../../datadog-core')
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-aws-sdk/src/services/sns.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'
const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams')
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')

Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')
const { storage } = require('../../../datadog-core')
const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway')
const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams')

class Sqs extends BaseAwsSdkPlugin {
static get id () { return 'sqs' }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams')

class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
static get id () { return 'google-cloud-pubsub' }
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/src/batch-consumer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')

class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
static get id () { return 'kafkajs' }
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const dc = require('dc-polyfill')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart')
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-kafkajs/src/producer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams')

const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers'

Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-kafkajs/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he
const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants')

const { expectedSchema, rawExpectedSchema } = require('./naming')
const DataStreamsContext = require('../../dd-trace/src/data_streams_context')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const {
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')
} = require('../../dd-trace/src/datastreams')

class SchemaExtractor {
constructor (schema) {
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-plugin-rhea/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { storage } = require('../../datadog-core')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams')

class RheaConsumerPlugin extends ConsumerPlugin {
static get id () { return 'rhea' }
Expand Down
3 changes: 1 addition & 2 deletions packages/datadog-plugin-rhea/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor')
const { getAmqpMessageSize, DsmPathwayCodec } = require('../../dd-trace/src/datastreams')

class RheaProducerPlugin extends ProducerPlugin {
static get id () { return 'rhea' }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const DataStreamsContext = require('./data_streams_context')
const DataStreamsContext = require('./context')

class DataStreamsCheckpointer {
constructor (tracer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { storage } = require('../../datadog-core')
const log = require('./log')
const { storage } = require('../../../datadog-core')
const log = require('../log')

function getDataStreamsContext () {
const store = storage('legacy').getStore()
Expand Down
104 changes: 104 additions & 0 deletions packages/dd-trace/src/datastreams/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict'

const {
getAmqpMessageSize,
getHeadersSize,
getMessageSize,
getSizeOrZero
} = require('./size')

// This is only needed because DSM code is spread across existing tracing
// plugins instead of having dedicated DSM plugins that are themselves
// lazy loaded.
//
// TODO: Remove this when DSM has been moved to dedicaed plugins.
function lazyClass (classGetter, methods = [], staticMethods = []) {
let constructorArgs
let ActiveClass

const LazyClass = function (...args) {
constructorArgs = args
}

const activate = () => {
return (ActiveClass = ActiveClass || classGetter())
}

for (const method of methods) {
LazyClass.prototype[method] = function (...args) {
const instance = activate() && new ActiveClass(...constructorArgs)

// Replace the whole prototype instead of only the method itself whenever
// any individual method is called to avoid running through this code
// again every time another method is called. This is not only more
// efficient but it also means that the class instance does not need to be
// stored for future calls to other methods.
Object.setPrototypeOf(this, instance)

return this[method](...args)
}
}

for (const method of staticMethods) {
LazyClass[method] = function (...args) {
LazyClass[method] = activate() && ActiveClass[method]

return LazyClass[method](...args)
}
}

return LazyClass
}

const DsmPathwayCodec = lazyClass(() => require('./pathway').DsmPathwayCodec, [], [
'encode',
'decode'
])

const DataStreamsCheckpointer = lazyClass(() => require('./checkpointer').DataStreamsCheckpointer, [
'setProduceCheckpoint',
'setConsumeCheckpoint'
])

const DataStreamsManager = lazyClass(() => require('./manager').DataStreamsManager, [
'setCheckpoint',
'decodeDataStreamsContext'
])

// TODO: Are all those methods actually public?
const DataStreamsProcessor = lazyClass(() => require('./processor').DataStreamsProcessor, [
'onInterval',
'bucketFromTimestamp',
'recordCheckpoint',
'setCheckpoint',
'recordOffset',
'setOffset',
'setUrl',
'trySampleSchema',
'canSampleSchema',
'getSchema'
])

const SchemaBuilder = lazyClass(() => require('./schemas/schema_builder').SchemaBuilder, [
'build',
'addProperty',
'shouldExtractSchema'
], [
'getCache',
'getSchemaDefinition',
'getSchema'
])

module.exports = {
DsmPathwayCodec,
DataStreamsCheckpointer,
DataStreamsManager,
DataStreamsProcessor,
SchemaBuilder,

// These are small functions so they are exposed directly and not lazy loaded.
getAmqpMessageSize,
getHeadersSize,
getMessageSize,
getSizeOrZero
}
27 changes: 27 additions & 0 deletions packages/dd-trace/src/datastreams/manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict'

const { DsmPathwayCodec } = require('./pathway')
const DataStreamsContext = require('./context')

class DataStreamsManager {
constructor (processor) {
this._dataStreamsProcessor = processor
}

setCheckpoint (edgeTags, span, payloadSize = 0) {
const ctx = this._dataStreamsProcessor.setCheckpoint(
edgeTags, span, DataStreamsContext.getDataStreamsContext(), payloadSize
)
DataStreamsContext.setDataStreamsContext(ctx)
return ctx
}

decodeDataStreamsContext (carrier) {
const ctx = DsmPathwayCodec.decode(carrier)
// we erase the previous context everytime we decode a new one
DataStreamsContext.setDataStreamsContext(ctx)
return ctx
}
}

module.exports = { DataStreamsManager }
45 changes: 1 addition & 44 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { LogCollapsingLowestDenseDDSketch } = require('@datadog/sketches-js')
const { DsmPathwayCodec } = require('./pathway')
const { DataStreamsWriter } = require('./writer')
const { computePathwayHash } = require('./pathway')
const { types } = require('util')
const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size')
const { PATHWAY_HASH } = require('../../../../ext/tags')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')
Expand Down Expand Up @@ -115,49 +115,6 @@ class StatsBucket {
}
}

function getSizeOrZero (obj) {
if (typeof obj === 'string') {
return Buffer.from(obj, 'utf-8').length
}
if (types.isArrayBuffer(obj)) {
return obj.byteLength
}
if (Buffer.isBuffer(obj)) {
return obj.length
}
if (Array.isArray(obj) && obj.length > 0) {
if (typeof obj[0] === 'number') return Buffer.from(obj).length
let payloadSize = 0
obj.forEach(item => {
payloadSize += getSizeOrZero(item)
})
return payloadSize
}
if (obj !== null && typeof obj === 'object') {
try {
return getHeadersSize(obj)
} catch {
// pass
}
}
return 0
}

function getHeadersSize (headers) {
if (headers === undefined) return 0
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
}

function getMessageSize (message) {
const { key, value, headers } = message
return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers)
}

function getAmqpMessageSize (message) {
const { headers, content } = message
return getSizeOrZero(content) + getHeadersSize(headers)
}

class TimeBuckets extends Map {
forTime (time) {
if (!this.has(time)) {
Expand Down
53 changes: 53 additions & 0 deletions packages/dd-trace/src/datastreams/size.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helpers in this file were copied from processor.js untouched.


const { types } = require('util')

function getSizeOrZero (obj) {
if (typeof obj === 'string') {
return Buffer.from(obj, 'utf-8').length
}
if (types.isArrayBuffer(obj)) {
return obj.byteLength
}
if (Buffer.isBuffer(obj)) {
return obj.length
}
if (Array.isArray(obj) && obj.length > 0) {
if (typeof obj[0] === 'number') return Buffer.from(obj).length
let payloadSize = 0
obj.forEach(item => {
payloadSize += getSizeOrZero(item)
})
return payloadSize
}
if (obj !== null && typeof obj === 'object') {
try {
return getHeadersSize(obj)
} catch {
// pass
}
}
return 0
}

function getHeadersSize (headers) {
if (headers === undefined) return 0
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
}

function getMessageSize (message) {
const { key, value, headers } = message
return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers)
}

function getAmqpMessageSize (message) {
const { headers, content } = message
return getSizeOrZero(content) + getHeadersSize(headers)
}

module.exports = {
getMessageSize,
getHeadersSize,
getSizeOrZero,
getAmqpMessageSize
}
Loading
Loading