Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache HTTP Client 5: Fix content lengths #7463

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.apachehttpclient.v5_0.ApacheHttpClientSingletons.createOrGetBytesTransferMetrics;
import static io.opentelemetry.javaagent.instrumentation.apachehttpclient.v5_0.ApacheHttpClientSingletons.instrumenter;
import static java.util.logging.Level.FINE;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
Expand All @@ -20,17 +21,22 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.BasicHttpContext;
Expand Down Expand Up @@ -69,6 +75,7 @@ public static class ClientAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(
@Advice.Argument(value = 0, readOnly = false) AsyncRequestProducer requestProducer,
@Advice.Argument(value = 1, readOnly = false) AsyncResponseConsumer<?> responseConsumer,
@Advice.Argument(value = 3, readOnly = false) HttpContext httpContext,
@Advice.Argument(value = 4, readOnly = false) FutureCallback<?> futureCallback) {

Expand All @@ -80,23 +87,81 @@ public static void methodEnter(
WrappedFutureCallback<?> wrappedFutureCallback =
new WrappedFutureCallback<>(parentContext, httpContext, futureCallback);
requestProducer =
new DelegatingRequestProducer(parentContext, requestProducer, wrappedFutureCallback);
new WrappedRequestProducer(parentContext, requestProducer, wrappedFutureCallback);
responseConsumer = new WrappedResponseConsumer<>(parentContext, responseConsumer);
futureCallback = wrappedFutureCallback;
}
}

public static class DelegatingRequestProducer implements AsyncRequestProducer {
public static class WrappedResponseConsumer<T> implements AsyncResponseConsumer<T> {
private final AsyncResponseConsumer<T> delegate;
private final Context parentContext;

public WrappedResponseConsumer(Context parentContext, AsyncResponseConsumer<T> delegate) {
this.parentContext = parentContext;
this.delegate = delegate;
}

@Override
public void consumeResponse(
HttpResponse httpResponse,
EntityDetails entityDetails,
HttpContext httpContext,
FutureCallback<T> futureCallback)
throws HttpException, IOException {
if (entityDetails != null) {
BytesTransferMetrics metrics = createOrGetBytesTransferMetrics(parentContext);
metrics.setResponseContentLength(entityDetails.getContentLength());
}
delegate.consumeResponse(httpResponse, entityDetails, httpContext, futureCallback);
}

@Override
public void informationResponse(HttpResponse httpResponse, HttpContext httpContext)
throws HttpException, IOException {
delegate.informationResponse(httpResponse, httpContext);
}

@Override
public void failed(Exception e) {
delegate.failed(e);
}

@Override
public void updateCapacity(CapacityChannel capacityChannel) throws IOException {
delegate.updateCapacity(capacityChannel);
}

@Override
public void consume(ByteBuffer byteBuffer) throws IOException {
if (byteBuffer.hasRemaining()) {
BytesTransferMetrics metrics = createOrGetBytesTransferMetrics(parentContext);
metrics.addResponseBytes(byteBuffer.limit());
}
delegate.consume(byteBuffer);
}

@Override
public void streamEnd(List<? extends Header> list) throws HttpException, IOException {
delegate.streamEnd(list);
}

@Override
public void releaseResources() {
delegate.releaseResources();
}
}

public static class WrappedRequestProducer implements AsyncRequestProducer {
private final Context parentContext;
private final AsyncRequestProducer delegate;
private final WrappedFutureCallback<?> wrappedFutureCallback;
private final WrappedFutureCallback<?> callback;

public DelegatingRequestProducer(
Context parentContext,
AsyncRequestProducer delegate,
WrappedFutureCallback<?> wrappedFutureCallback) {
public WrappedRequestProducer(
Context parentContext, AsyncRequestProducer delegate, WrappedFutureCallback<?> callback) {
this.parentContext = parentContext;
this.delegate = delegate;
this.wrappedFutureCallback = wrappedFutureCallback;
this.callback = callback;
}

@Override
Expand All @@ -107,8 +172,7 @@ public void failed(Exception ex) {
@Override
public void sendRequest(RequestChannel channel, HttpContext context)
throws HttpException, IOException {
DelegatingRequestChannel requestChannel =
new DelegatingRequestChannel(channel, parentContext, wrappedFutureCallback);
RequestChannel requestChannel = new WrappedRequestChannel(channel, parentContext, callback);
delegate.sendRequest(requestChannel, context);
}

Expand All @@ -124,7 +188,7 @@ public int available() {

@Override
public void produce(DataStreamChannel channel) throws IOException {
delegate.produce(channel);
delegate.produce(new WrappedDataStreamChannel(parentContext, channel));
}

@Override
Expand All @@ -133,12 +197,44 @@ public void releaseResources() {
}
}

public static class DelegatingRequestChannel implements RequestChannel {
public static class WrappedDataStreamChannel implements DataStreamChannel {
private final Context parentContext;
private final DataStreamChannel delegate;

public WrappedDataStreamChannel(Context parentContext, DataStreamChannel delegate) {
this.parentContext = parentContext;
this.delegate = delegate;
}

@Override
public void requestOutput() {
delegate.requestOutput();
}

@Override
public int write(ByteBuffer byteBuffer) throws IOException {
BytesTransferMetrics metrics = createOrGetBytesTransferMetrics(parentContext);
metrics.addRequestBytes(byteBuffer.limit());
return delegate.write(byteBuffer);
}

@Override
public void endStream() throws IOException {
delegate.endStream();
}

@Override
public void endStream(List<? extends Header> list) throws IOException {
delegate.endStream(list);
}
}

public static class WrappedRequestChannel implements RequestChannel {
private final RequestChannel delegate;
private final Context parentContext;
private final WrappedFutureCallback<?> wrappedFutureCallback;

public DelegatingRequestChannel(
public WrappedRequestChannel(
RequestChannel requestChannel,
Context parentContext,
WrappedFutureCallback<?> wrappedFutureCallback) {
Expand All @@ -150,30 +246,34 @@ public DelegatingRequestChannel(
@Override
public void sendRequest(HttpRequest request, EntityDetails entityDetails, HttpContext context)
throws HttpException, IOException {
if (instrumenter().shouldStart(parentContext, request)) {
wrappedFutureCallback.context = instrumenter().start(parentContext, request);
wrappedFutureCallback.httpRequest = request;
if (entityDetails != null) {
BytesTransferMetrics metrics = createOrGetBytesTransferMetrics(parentContext);
metrics.setRequestContentLength(entityDetails.getContentLength());
}
ApacheHttpClientRequest otelRequest = new ApacheHttpClientRequest(parentContext, request);
if (instrumenter().shouldStart(parentContext, otelRequest)) {
wrappedFutureCallback.context = instrumenter().start(parentContext, otelRequest);
wrappedFutureCallback.otelRequest = otelRequest;
}

delegate.sendRequest(request, entityDetails, context);
}
}

public static class WrappedFutureCallback<T> implements FutureCallback<T> {

private static final Logger logger = Logger.getLogger(WrappedFutureCallback.class.getName());

private final Context parentContext;
private final HttpContext httpContext;
private final HttpCoreContext httpContext;
private final FutureCallback<T> delegate;

private volatile Context context;
private volatile HttpRequest httpRequest;
private volatile ApacheHttpClientRequest otelRequest;

public WrappedFutureCallback(
Context parentContext, HttpContext httpContext, FutureCallback<T> delegate) {
this.parentContext = parentContext;
this.httpContext = httpContext;
this.httpContext = HttpCoreContext.adapt(httpContext);
// Note: this can be null in real life, so we have to handle this carefully
this.delegate = delegate;
}
Expand All @@ -187,7 +287,7 @@ public void completed(T result) {
return;
}

instrumenter().end(context, httpRequest, getResponseFromHttpContext(), null);
instrumenter().end(context, otelRequest, getResponse(), null);

if (parentContext == null) {
completeDelegate(result);
Expand All @@ -209,7 +309,7 @@ public void failed(Exception ex) {
}

// end span before calling delegate
instrumenter().end(context, httpRequest, getResponseFromHttpContext(), ex);
instrumenter().end(context, otelRequest, getResponse(), ex);

if (parentContext == null) {
failDelegate(ex);
Expand All @@ -232,7 +332,7 @@ public void cancelled() {

// TODO (trask) add "canceled" span attribute
// end span before calling delegate
instrumenter().end(context, httpRequest, getResponseFromHttpContext(), null);
instrumenter().end(context, otelRequest, getResponse(), null);

if (parentContext == null) {
cancelDelegate();
Expand Down Expand Up @@ -263,8 +363,8 @@ private void cancelDelegate() {
}

@Nullable
private HttpResponse getResponseFromHttpContext() {
return (HttpResponse) httpContext.getAttribute(HttpCoreContext.HTTP_RESPONSE);
private HttpResponse getResponse() {
return httpContext.getResponse();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachehttpclient.v5_0;

import static io.opentelemetry.javaagent.instrumentation.apachehttpclient.v5_0.ApacheHttpClientSingletons.getBytesTransferMetrics;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.hc.core5.http.HttpResponse;

public class ApacheHttpClientContentLengthAttributesGetter
implements AttributesExtractor<ApacheHttpClientRequest, HttpResponse> {

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, ApacheHttpClientRequest request) {}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
ApacheHttpClientRequest request,
HttpResponse response,
Throwable error) {
Context parentContext = request.getParentContext();
BytesTransferMetrics metrics = getBytesTransferMetrics(parentContext);
if (metrics != null) {
Long responseLength = metrics.getResponseContentLength();
if (responseLength != null) {
attributes.put(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, responseLength);
}
Long requestLength = metrics.getRequestContentLength();
if (requestLength != null) {
attributes.put(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, requestLength);
}
}
}
}
Loading