diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index accd04568b1..9cf3405a7c5 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -2,7 +2,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') -const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { diff --git a/packages/datadog-plugin-amqplib/src/producer.js b/packages/datadog-plugin-amqplib/src/producer.js index 02f27b590be..a59890d7e3d 100644 --- a/packages/datadog-plugin-amqplib/src/producer.js +++ b/packages/datadog-plugin-amqplib/src/producer.js @@ -3,8 +3,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const ProducerPlugin = require('../../dd-trace/src/plugins/producer') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') -const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams') const { getResourceName } = require('./util') class AmqplibProducerPlugin extends ProducerPlugin { diff --git a/packages/datadog-plugin-avsc/src/schema_iterator.js b/packages/datadog-plugin-avsc/src/schema_iterator.js index 0b4874ceea8..096b64631e9 100644 --- a/packages/datadog-plugin-avsc/src/schema_iterator.js +++ b/packages/datadog-plugin-avsc/src/schema_iterator.js @@ -10,7 +10,7 @@ const { const log = require('../../dd-trace/src/log') const { SchemaBuilder -} = require('../../dd-trace/src/datastreams/schemas/schema_builder') +} = require('../../dd-trace/src/datastreams') class SchemaExtractor { constructor (schema) { diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index 0bd457a90f6..45033696ec1 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -1,8 +1,5 @@ 'use strict' -const { - getSizeOrZero -} = require('../../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway') +const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams') const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') const { storage } = require('../../../datadog-core') diff --git a/packages/datadog-plugin-aws-sdk/src/services/sns.js b/packages/datadog-plugin-aws-sdk/src/services/sns.js index 4e2b16f1d18..f5822f976c7 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sns.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sns.js @@ -1,6 +1,5 @@ 'use strict' -const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway') +const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams') const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index 092465cf67f..dc64c0a9eaf 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -3,8 +3,7 @@ const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') const { storage } = require('../../../datadog-core') -const { getHeadersSize } = require('../../../dd-trace/src/datastreams/processor') -const { DsmPathwayCodec } = require('../../../dd-trace/src/datastreams/pathway') +const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams') class Sqs extends BaseAwsSdkPlugin { static get id () { return 'sqs' } diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index 84c4122ec57..37a1ea0a4ab 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -1,6 +1,6 @@ 'use strict' -const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getMessageSize } = require('../../dd-trace/src/datastreams') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index b6261ee85b6..646710f3268 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -1,8 +1,7 @@ 'use strict' const ProducerPlugin = require('../../dd-trace/src/plugins/producer') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') -const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor') +const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams') class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { static get id () { return 'google-cloud-pubsub' } diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index e0228a018c2..44eb960fa36 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -1,5 +1,5 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') -const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getMessageSize } = require('../../dd-trace/src/datastreams') class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index ee04c5eb60c..3f21cf84299 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -1,7 +1,7 @@ 'use strict' const dc = require('dc-polyfill') -const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getMessageSize } = require('../../dd-trace/src/datastreams') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index aa12357b4cf..c70c8ee2648 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -1,8 +1,7 @@ 'use strict' const ProducerPlugin = require('../../dd-trace/src/plugins/producer') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') -const { getMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers' diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index f67279bdd9f..8d5d77503ea 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -8,7 +8,7 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') const { expectedSchema, rawExpectedSchema } = require('./naming') -const DataStreamsContext = require('../../dd-trace/src/data_streams_context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') diff --git a/packages/datadog-plugin-protobufjs/src/schema_iterator.js b/packages/datadog-plugin-protobufjs/src/schema_iterator.js index ea3c8ba2bf0..cbf12ff0016 100644 --- a/packages/datadog-plugin-protobufjs/src/schema_iterator.js +++ b/packages/datadog-plugin-protobufjs/src/schema_iterator.js @@ -10,7 +10,7 @@ const { const log = require('../../dd-trace/src/log') const { SchemaBuilder -} = require('../../dd-trace/src/datastreams/schemas/schema_builder') +} = require('../../dd-trace/src/datastreams') class SchemaExtractor { constructor (schema) { diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index a504c94029d..84d4bb03298 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -2,7 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { storage } = require('../../datadog-core') -const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') class RheaConsumerPlugin extends ConsumerPlugin { static get id () { return 'rhea' } diff --git a/packages/datadog-plugin-rhea/src/producer.js b/packages/datadog-plugin-rhea/src/producer.js index 8f2116d3d75..324b0f373b8 100644 --- a/packages/datadog-plugin-rhea/src/producer.js +++ b/packages/datadog-plugin-rhea/src/producer.js @@ -2,8 +2,7 @@ const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const ProducerPlugin = require('../../dd-trace/src/plugins/producer') -const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway') -const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams/processor') +const { getAmqpMessageSize, DsmPathwayCodec } = require('../../dd-trace/src/datastreams') class RheaProducerPlugin extends ProducerPlugin { static get id () { return 'rhea' } diff --git a/packages/dd-trace/src/data_streams.js b/packages/dd-trace/src/datastreams/checkpointer.js similarity index 94% rename from packages/dd-trace/src/data_streams.js rename to packages/dd-trace/src/datastreams/checkpointer.js index ae3d4c22316..1b6e2a28c0f 100644 --- a/packages/dd-trace/src/data_streams.js +++ b/packages/dd-trace/src/datastreams/checkpointer.js @@ -1,4 +1,4 @@ -const DataStreamsContext = require('./data_streams_context') +const DataStreamsContext = require('./context') class DataStreamsCheckpointer { constructor (tracer) { diff --git a/packages/dd-trace/src/data_streams_context.js b/packages/dd-trace/src/datastreams/context.js similarity index 84% rename from packages/dd-trace/src/data_streams_context.js rename to packages/dd-trace/src/datastreams/context.js index b266eb2cf61..26b9c368da8 100644 --- a/packages/dd-trace/src/data_streams_context.js +++ b/packages/dd-trace/src/datastreams/context.js @@ -1,5 +1,5 @@ -const { storage } = require('../../datadog-core') -const log = require('./log') +const { storage } = require('../../../datadog-core') +const log = require('../log') function getDataStreamsContext () { const store = storage('legacy').getStore() diff --git a/packages/dd-trace/src/datastreams/index.js b/packages/dd-trace/src/datastreams/index.js new file mode 100644 index 00000000000..1704e0ea127 --- /dev/null +++ b/packages/dd-trace/src/datastreams/index.js @@ -0,0 +1,104 @@ +'use strict' + +const { + getAmqpMessageSize, + getHeadersSize, + getMessageSize, + getSizeOrZero +} = require('./size') + +// This is only needed because DSM code is spread across existing tracing +// plugins instead of having dedicated DSM plugins that are themselves +// lazy loaded. +// +// TODO: Remove this when DSM has been moved to dedicaed plugins. +function lazyClass (classGetter, methods = [], staticMethods = []) { + let constructorArgs + let ActiveClass + + const LazyClass = function (...args) { + constructorArgs = args + } + + const activate = () => { + return (ActiveClass = ActiveClass || classGetter()) + } + + for (const method of methods) { + LazyClass.prototype[method] = function (...args) { + const instance = activate() && new ActiveClass(...constructorArgs) + + // Replace the whole prototype instead of only the method itself whenever + // any individual method is called to avoid running through this code + // again every time another method is called. This is not only more + // efficient but it also means that the class instance does not need to be + // stored for future calls to other methods. + Object.setPrototypeOf(this, instance) + + return this[method](...args) + } + } + + for (const method of staticMethods) { + LazyClass[method] = function (...args) { + LazyClass[method] = activate() && ActiveClass[method] + + return LazyClass[method](...args) + } + } + + return LazyClass +} + +const DsmPathwayCodec = lazyClass(() => require('./pathway').DsmPathwayCodec, [], [ + 'encode', + 'decode' +]) + +const DataStreamsCheckpointer = lazyClass(() => require('./checkpointer').DataStreamsCheckpointer, [ + 'setProduceCheckpoint', + 'setConsumeCheckpoint' +]) + +const DataStreamsManager = lazyClass(() => require('./manager').DataStreamsManager, [ + 'setCheckpoint', + 'decodeDataStreamsContext' +]) + +// TODO: Are all those methods actually public? +const DataStreamsProcessor = lazyClass(() => require('./processor').DataStreamsProcessor, [ + 'onInterval', + 'bucketFromTimestamp', + 'recordCheckpoint', + 'setCheckpoint', + 'recordOffset', + 'setOffset', + 'setUrl', + 'trySampleSchema', + 'canSampleSchema', + 'getSchema' +]) + +const SchemaBuilder = lazyClass(() => require('./schemas/schema_builder').SchemaBuilder, [ + 'build', + 'addProperty', + 'shouldExtractSchema' +], [ + 'getCache', + 'getSchemaDefinition', + 'getSchema' +]) + +module.exports = { + DsmPathwayCodec, + DataStreamsCheckpointer, + DataStreamsManager, + DataStreamsProcessor, + SchemaBuilder, + + // These are small functions so they are exposed directly and not lazy loaded. + getAmqpMessageSize, + getHeadersSize, + getMessageSize, + getSizeOrZero +} diff --git a/packages/dd-trace/src/datastreams/manager.js b/packages/dd-trace/src/datastreams/manager.js new file mode 100644 index 00000000000..513d1026a08 --- /dev/null +++ b/packages/dd-trace/src/datastreams/manager.js @@ -0,0 +1,27 @@ +'use strict' + +const { DsmPathwayCodec } = require('./pathway') +const DataStreamsContext = require('./context') + +class DataStreamsManager { + constructor (processor) { + this._dataStreamsProcessor = processor + } + + setCheckpoint (edgeTags, span, payloadSize = 0) { + const ctx = this._dataStreamsProcessor.setCheckpoint( + edgeTags, span, DataStreamsContext.getDataStreamsContext(), payloadSize + ) + DataStreamsContext.setDataStreamsContext(ctx) + return ctx + } + + decodeDataStreamsContext (carrier) { + const ctx = DsmPathwayCodec.decode(carrier) + // we erase the previous context everytime we decode a new one + DataStreamsContext.setDataStreamsContext(ctx) + return ctx + } +} + +module.exports = { DataStreamsManager } diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index d997ba098ae..8c79da25c83 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -5,7 +5,7 @@ const { LogCollapsingLowestDenseDDSketch } = require('@datadog/sketches-js') const { DsmPathwayCodec } = require('./pathway') const { DataStreamsWriter } = require('./writer') const { computePathwayHash } = require('./pathway') -const { types } = require('util') +const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size') const { PATHWAY_HASH } = require('../../../../ext/tags') const { SchemaBuilder } = require('./schemas/schema_builder') const { SchemaSampler } = require('./schemas/schema_sampler') @@ -115,49 +115,6 @@ class StatsBucket { } } -function getSizeOrZero (obj) { - if (typeof obj === 'string') { - return Buffer.from(obj, 'utf-8').length - } - if (types.isArrayBuffer(obj)) { - return obj.byteLength - } - if (Buffer.isBuffer(obj)) { - return obj.length - } - if (Array.isArray(obj) && obj.length > 0) { - if (typeof obj[0] === 'number') return Buffer.from(obj).length - let payloadSize = 0 - obj.forEach(item => { - payloadSize += getSizeOrZero(item) - }) - return payloadSize - } - if (obj !== null && typeof obj === 'object') { - try { - return getHeadersSize(obj) - } catch { - // pass - } - } - return 0 -} - -function getHeadersSize (headers) { - if (headers === undefined) return 0 - return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0) -} - -function getMessageSize (message) { - const { key, value, headers } = message - return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers) -} - -function getAmqpMessageSize (message) { - const { headers, content } = message - return getSizeOrZero(content) + getHeadersSize(headers) -} - class TimeBuckets extends Map { forTime (time) { if (!this.has(time)) { diff --git a/packages/dd-trace/src/datastreams/size.js b/packages/dd-trace/src/datastreams/size.js new file mode 100644 index 00000000000..7918d2aa1e1 --- /dev/null +++ b/packages/dd-trace/src/datastreams/size.js @@ -0,0 +1,53 @@ +'use strict' + +const { types } = require('util') + +function getSizeOrZero (obj) { + if (typeof obj === 'string') { + return Buffer.from(obj, 'utf-8').length + } + if (types.isArrayBuffer(obj)) { + return obj.byteLength + } + if (Buffer.isBuffer(obj)) { + return obj.length + } + if (Array.isArray(obj) && obj.length > 0) { + if (typeof obj[0] === 'number') return Buffer.from(obj).length + let payloadSize = 0 + obj.forEach(item => { + payloadSize += getSizeOrZero(item) + }) + return payloadSize + } + if (obj !== null && typeof obj === 'object') { + try { + return getHeadersSize(obj) + } catch { + // pass + } + } + return 0 +} + +function getHeadersSize (headers) { + if (headers === undefined) return 0 + return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0) +} + +function getMessageSize (message) { + const { key, value, headers } = message + return getSizeOrZero(key) + getSizeOrZero(value) + getHeadersSize(headers) +} + +function getAmqpMessageSize (message) { + const { headers, content } = message + return getSizeOrZero(content) + getHeadersSize(headers) +} + +module.exports = { + getMessageSize, + getHeadersSize, + getSizeOrZero, + getAmqpMessageSize +} diff --git a/packages/dd-trace/src/opentracing/propagation/text_map_dsm.js b/packages/dd-trace/src/opentracing/propagation/text_map_dsm.js index d44518fe828..109746a620a 100644 --- a/packages/dd-trace/src/opentracing/propagation/text_map_dsm.js +++ b/packages/dd-trace/src/opentracing/propagation/text_map_dsm.js @@ -1,7 +1,7 @@ const pick = require('../../../../datadog-core/src/utils/src/pick') const log = require('../../log') -const { DsmPathwayCodec } = require('../../datastreams/pathway') +const { DsmPathwayCodec } = require('../../datastreams') const base64Key = 'dd-pathway-ctx-base64' const logKeys = [base64Key] diff --git a/packages/dd-trace/src/tracer.js b/packages/dd-trace/src/tracer.js index 243e25575a5..ed8f7d1953b 100644 --- a/packages/dd-trace/src/tracer.js +++ b/packages/dd-trace/src/tracer.js @@ -6,10 +6,7 @@ const Scope = require('./scope') const { isError } = require('./util') const { setStartupLogConfig } = require('./startup-log') const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants') -const { DataStreamsProcessor } = require('./datastreams/processor') -const { DsmPathwayCodec } = require('./datastreams/pathway') -const DataStreamsContext = require('./data_streams_context') -const { DataStreamsCheckpointer } = require('./data_streams') +const { DataStreamsCheckpointer, DataStreamsManager, DataStreamsProcessor } = require('./datastreams') const { flushStartupLogs } = require('../../datadog-instrumentations/src/check_require_cache') const log = require('./log/writer') @@ -22,6 +19,7 @@ class DatadogTracer extends Tracer { constructor (config, prioritySampler) { super(config, prioritySampler) this._dataStreamsProcessor = new DataStreamsProcessor(config) + this._dataStreamsManager = new DataStreamsManager(this._dataStreamsProcessor) this.dataStreamsCheckpointer = new DataStreamsCheckpointer(this) this._scope = new Scope() setStartupLogConfig(config) @@ -35,18 +33,11 @@ class DatadogTracer extends Tracer { // todo[piochelepiotr] These two methods are not related to the tracer, but to data streams monitoring. // They should be moved outside of the tracer in the future. setCheckpoint (edgeTags, span, payloadSize = 0) { - const ctx = this._dataStreamsProcessor.setCheckpoint( - edgeTags, span, DataStreamsContext.getDataStreamsContext(), payloadSize - ) - DataStreamsContext.setDataStreamsContext(ctx) - return ctx + return this._dataStreamsManager.setCheckpoint(edgeTags, span, payloadSize) } decodeDataStreamsContext (carrier) { - const ctx = DsmPathwayCodec.decode(carrier) - // we erase the previous context everytime we decode a new one - DataStreamsContext.setDataStreamsContext(ctx) - return ctx + return this._dataStreamsManager.decodeDataStreamsContext(carrier) } setOffset (offsetData) {