Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update jaeger api implementation for new trace modeling #5655

Merged
merged 18 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----
111 changes: 97 additions & 14 deletions src/frontend/src/instance/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::dataframe::DataFrame;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SessionStateBuilder;
use datafusion_expr::{col, lit, lit_timestamp_nano, Expr};
use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr};
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
TableNotFoundSnafu,
};
use servers::http::jaeger::{QueryTraceParams, FIND_TRACES_COLS};
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
use table::table::adapter::DfTableProviderAdapter;

use super::Instance;
Expand Down Expand Up @@ -82,7 +83,19 @@ impl JaegerQueryHandler for Instance {
))));
}

// It's equivalent to `SELECT span_name, span_kind FROM {db}.{trace_table} WHERE service_name = '{service_name}'`.
// It's equivalent to
//
// ```
// SELECT
// span_name,
// span_kind
// FROM
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}'
// ORDER BY
// timestamp
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
Expand All @@ -101,9 +114,19 @@ impl JaegerQueryHandler for Instance {
}

async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
// It's equivalent to `SELECT trace_id, timestamp, duration_nano, service_name, span_name, span_id, span_attributes, resource_attributes, parent_span_id
// FROM {db}.{trace_table} WHERE trace_id = '{trace_id}'`.
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
// It's equivalent to
//
// ```
// SELECT
// *
// FROM
// {db}.{trace_table}
// WHERE
// trace_id = '{trace_id}'
// ORDER BY
// timestamp
// ```.
let selects = vec![wildcard()];

let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];

Expand All @@ -125,7 +148,7 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> ServerResult<Output> {
let selects: Vec<Expr> = FIND_TRACES_COLS.clone();
let selects = vec![wildcard()];

let mut filters = vec![];

Expand Down Expand Up @@ -174,17 +197,34 @@ async fn query_trace_table(
tags: Option<HashMap<String, JsonValue>>,
distinct: bool,
) -> ServerResult<Output> {
let db = ctx.get_db_string();
let table_name = ctx
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
.unwrap_or(TRACE_TABLE_NAME);

let table = catalog_manager
.table(ctx.current_catalog(), &db, TRACE_TABLE_NAME, Some(&ctx))
.table(
ctx.current_catalog(),
&ctx.current_schema(),
table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table: TRACE_TABLE_NAME,
table: table_name,
catalog: ctx.current_catalog(),
schema: db,
schema: ctx.current_schema(),
})?;

let is_data_model_v1 = table
.table_info()
.meta
.options
.extra_options
.get(TABLE_DATA_MODEL)
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);

let df_context = create_df_context(query_engine, ctx.clone())?;

let dataframe = df_context
Expand All @@ -196,7 +236,9 @@ async fn query_trace_table(
// Apply all filters.
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| tags_filters(&dataframe, t))?)
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
})?;
Expand All @@ -205,7 +247,10 @@ async fn query_trace_table(
let dataframe = if distinct {
dataframe.distinct().context(DataFusionSnafu)?
} else {
// for non distinct query, sort by timestamp to make results stable
dataframe
.sort_by(vec![col(TIMESTAMP_COLUMN)])
.context(DataFusionSnafu)?
};

// Apply the limit if needed.
Expand Down Expand Up @@ -237,7 +282,7 @@ fn create_df_context(
SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
);

// The following JSON UDFs will be used for tags filters.
// The following JSON UDFs will be used for tags filters on v0 data model.
let udfs: Vec<FunctionRef> = vec![
Arc::new(JsonGetInt),
Arc::new(JsonGetFloat),
Expand All @@ -256,7 +301,7 @@ fn create_df_context(
Ok(df_context)
}

fn tags_filters(
fn json_tag_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
) -> ServerResult<Vec<Expr>> {
Expand Down Expand Up @@ -322,3 +367,41 @@ fn tags_filters(

Ok(filters)
}

fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
let key = format!("\"span_attributes.{}\"", key);
match value {
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
Some(col(key).eq(lit(value.as_f64().unwrap())))
} else {
Some(col(key).eq(lit(value.as_i64().unwrap())))
}
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
}
})
.collect();
Ok(filters)
}

fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags)
} else {
json_tag_filters(dataframe, tags)
}
}
17 changes: 13 additions & 4 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);

let (requests, rows) = otlp::trace::to_grpc_insert_requests(
request,
pipeline,
Expand All @@ -101,10 +103,17 @@ impl OpenTelemetryProtocolHandler for Instance {

OTLP_TRACES_ROWS.inc_by(rows as u64);

self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
if is_trace_v1_model {
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
self.handle_log_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}

#[tracing::instrument(skip_all)]
Expand Down
16 changes: 13 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use api::v1::{
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
Expand All @@ -54,7 +54,10 @@ use store_api::metric_engine_consts::{
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::requests::{
InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
TABLE_DATA_MODEL_TRACE_V1, TTL_KEY,
};
use table::table_reference::TableReference;
use table::TableRef;

Expand Down Expand Up @@ -578,7 +581,8 @@ impl Inserter {
// - trace_id: when searching by trace id
// - parent_span_id: when searching root span
// - span_name: when searching certain types of span
let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN];
let index_columns =
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
for index_column in index_columns {
if let Some(col) = create_table
.column_defs
Expand All @@ -595,6 +599,12 @@ impl Inserter {
}
}

// use table_options to mark table model version
create_table.table_options.insert(
TABLE_DATA_MODEL.to_string(),
TABLE_DATA_MODEL_TRACE_V1.to_string(),
);

let table = self
.create_physical_table(
create_table,
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ pub use etl::{
pub use manager::{
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
};
Loading
Loading