-
Notifications
You must be signed in to change notification settings - Fork 298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Temporary bridge between contexts and spans/scopes #8636
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,17 @@ | |
import static datadog.trace.api.ConfigDefaults.DEFAULT_ASYNC_PROPAGATING; | ||
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopScope; | ||
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopSpan; | ||
import static datadog.trace.core.scopemanager.ContinuableScope.CONTEXT; | ||
import static datadog.trace.core.scopemanager.ContinuableScope.INSTRUMENTATION; | ||
import static datadog.trace.core.scopemanager.ContinuableScope.ITERATION; | ||
import static datadog.trace.core.scopemanager.ContinuableScope.MANUAL; | ||
import static java.util.concurrent.TimeUnit.MINUTES; | ||
import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
||
import datadog.context.Context; | ||
import datadog.context.ContextManager; | ||
import datadog.context.ContextScope; | ||
import datadog.trace.api.Config; | ||
import datadog.trace.api.Stateful; | ||
import datadog.trace.api.scopemanager.ExtendedScopeListener; | ||
|
@@ -40,7 +44,7 @@ | |
* from being reported even if all related spans are finished. It also delegates to other | ||
* ScopeInterceptors to provide additional functionality. | ||
*/ | ||
public final class ContinuableScopeManager implements ScopeStateAware { | ||
public final class ContinuableScopeManager implements ScopeStateAware, ContextManager { | ||
|
||
static final Logger log = LoggerFactory.getLogger(ContinuableScopeManager.class); | ||
static final RatelimitedLogger ratelimitedLog = new RatelimitedLogger(log, 1, MINUTES); | ||
|
@@ -83,6 +87,8 @@ public ContinuableScopeManager( | |
this.healthMetrics = healthMetrics; | ||
this.tlsScopeStack = new ScopeStackThreadLocal(profilingContextIntegration); | ||
this.profilingContextIntegration = profilingContextIntegration; | ||
|
||
ContextManager.register(this); | ||
} | ||
|
||
public AgentScope activateSpan(final AgentSpan span) { | ||
|
@@ -96,10 +102,12 @@ public AgentScope activateManualSpan(final AgentSpan span) { | |
public AgentScope.Continuation captureActiveSpan() { | ||
ContinuableScope activeScope = scopeStack().active(); | ||
if (null != activeScope && activeScope.isAsyncPropagating()) { | ||
return captureSpan(activeScope.span(), activeScope.source()); | ||
} else { | ||
return AgentTracer.noopContinuation(); | ||
AgentSpan span = activeScope.span(); | ||
if (span != null) { | ||
return captureSpan(span, activeScope.source()); | ||
} | ||
} | ||
return AgentTracer.noopContinuation(); | ||
} | ||
|
||
public AgentScope.Continuation captureSpan(final AgentSpan span) { | ||
|
@@ -111,14 +119,14 @@ private AgentScope.Continuation captureSpan(final AgentSpan span, byte source) { | |
} | ||
|
||
private AgentScope activate( | ||
final AgentSpan span, | ||
final Context context, | ||
final byte source, | ||
final boolean overrideAsyncPropagation, | ||
final boolean isAsyncPropagating) { | ||
ScopeStack scopeStack = scopeStack(); | ||
|
||
final ContinuableScope top = scopeStack.top; | ||
if (top != null && top.span.equals(span)) { | ||
if (top != null && top.context.equals(context)) { | ||
top.incrementReferences(); | ||
return top; | ||
} | ||
|
@@ -131,7 +139,7 @@ private AgentScope activate( | |
return noopScope(); | ||
} | ||
|
||
assert span != null; | ||
assert context != null; | ||
|
||
// Inherit the async propagation from the active scope unless the value is overridden | ||
boolean asyncPropagation = | ||
|
@@ -140,7 +148,7 @@ private AgentScope activate( | |
: top != null ? top.isAsyncPropagating() : DEFAULT_ASYNC_PROPAGATING; | ||
|
||
final ContinuableScope scope = | ||
new ContinuableScope(this, span, source, asyncPropagation, createScopeState(span)); | ||
new ContinuableScope(this, context, source, asyncPropagation, createScopeState(context)); | ||
scopeStack.push(scope); | ||
healthMetrics.onActivateScope(); | ||
|
||
|
@@ -153,26 +161,26 @@ private AgentScope activate( | |
* @param continuation {@code null} if a continuation is re-used | ||
*/ | ||
ContinuableScope continueSpan( | ||
final ScopeContinuation continuation, final AgentSpan span, final byte source) { | ||
final ScopeContinuation continuation, final Context context, final byte source) { | ||
ScopeStack scopeStack = scopeStack(); | ||
|
||
// optimization: if the top scope is already keeping the same span alive | ||
// then re-use that scope (avoids allocation) and cancel the continuation | ||
final ContinuableScope top = scopeStack.top; | ||
if (top != null && top.span.equals(span)) { | ||
if (top != null && top.context.equals(context)) { | ||
top.incrementReferences(); | ||
if (continuation != null) { | ||
continuation.cancelFromContinuedScopeClose(); | ||
} | ||
return top; | ||
} | ||
|
||
Stateful scopeState = createScopeState(span); | ||
Stateful scopeState = createScopeState(context); | ||
final ContinuableScope scope; | ||
if (continuation != null) { | ||
scope = new ContinuingScope(this, span, source, true, continuation, scopeState); | ||
scope = new ContinuingScope(this, context, source, true, continuation, scopeState); | ||
} else { | ||
scope = new ContinuableScope(this, span, source, true, scopeState); | ||
scope = new ContinuableScope(this, context, source, true, scopeState); | ||
} | ||
scopeStack.push(scope); | ||
|
||
|
@@ -202,8 +210,9 @@ public void closePrevious(final boolean finishSpan) { | |
} | ||
top.close(); | ||
scopeStack.cleanup(); | ||
if (finishSpan) { | ||
top.span.finishWithEndToEnd(); | ||
AgentSpan span = top.span(); | ||
if (finishSpan && span != null) { | ||
span.finishWithEndToEnd(); | ||
} | ||
} | ||
} | ||
|
@@ -261,7 +270,7 @@ public void rollbackActiveToCheckpoint() { | |
|
||
public AgentSpan activeSpan() { | ||
final ContinuableScope active = scopeStack().active(); | ||
return active == null ? null : active.span; | ||
return active == null ? null : active.span(); | ||
} | ||
|
||
/** Attach a listener to scope activation events */ | ||
|
@@ -289,11 +298,12 @@ private void addExtendedScopeListener(final ExtendedScopeListener listener) { | |
} | ||
} | ||
|
||
private Stateful createScopeState(AgentSpan span) { | ||
private Stateful createScopeState(Context context) { | ||
// currently this just manages things the profiler has to do per scope, but could be expanded | ||
// to encapsulate other scope lifecycle activities | ||
// FIXME DDSpanContext is always a ProfilerContext anyway... | ||
if (span.context() instanceof ProfilerContext) { | ||
AgentSpan span = AgentSpan.fromContext(context); | ||
if (span != null && span.context() instanceof ProfilerContext) { | ||
return profilingContextIntegration.newScopeState((ProfilerContext) span.context()); | ||
} | ||
return Stateful.DEFAULT; | ||
|
@@ -308,6 +318,22 @@ public ScopeState newScopeState() { | |
return new ContinuableScopeState(); | ||
} | ||
|
||
@Override | ||
public Context current() { | ||
final ContinuableScope active = scopeStack().active(); | ||
return active == null ? Context.root() : active.context; | ||
} | ||
|
||
@Override | ||
public ContextScope attach(Context context) { | ||
return activate(context, CONTEXT, false, true); | ||
} | ||
|
||
@Override | ||
public Context swap(Context context) { | ||
throw new UnsupportedOperationException("Not yet implemented"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it safe to throw an |
||
} | ||
|
||
private class ContinuableScopeState implements ScopeState { | ||
|
||
private ScopeStack localScopeStack = tlsScopeStack.initialValue(); | ||
|
@@ -383,11 +409,14 @@ public void run(Map<ScopeStack, ContinuableScope> rootIterationScopes) { | |
|
||
if (!rootScope.alive()) { // no need to track this anymore | ||
itr.remove(); | ||
} else if (NANOSECONDS.toMillis(rootScope.span.getStartTime()) < cutOff) { | ||
// mark scope as overdue to allow cleanup and avoid further spans being attached | ||
scopeStack.overdueRootScope = rootScope; | ||
rootScope.span.finishWithEndToEnd(); | ||
itr.remove(); | ||
} else { | ||
AgentSpan span = rootScope.span(); | ||
if (span != null && NANOSECONDS.toMillis(span.getStartTime()) < cutOff) { | ||
// mark scope as overdue to allow cleanup and avoid further spans being attached | ||
scopeStack.overdueRootScope = rootScope; | ||
span.finishWithEndToEnd(); | ||
itr.remove(); | ||
} | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -210,6 +210,6 @@ default <T> T get(@Nonnull ContextKey<T> key) { | |
|
||
@Override | ||
default <T> Context with(@Nonnull ContextKey<T> key, @Nullable T value) { | ||
return Context.root().with(SPAN_KEY, this, key, value); | ||
return SPAN_KEY == key ? (Context) value : Context.root().with(SPAN_KEY, this, key, value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting optimization! 👍 |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we dump only
span()
here instead of full context?