Skip to content

Commit

Permalink
[9.0] Simplify TransportStats assertions in v9 (elastic#114700)
Browse files Browse the repository at this point in the history
Backports elastic#114700 to 9.0

> Transport handling times were added in elastic#80581 (8.1), we don't need assertions for version prior to that in 9.0
  • Loading branch information
arteam committed Feb 5, 2025
1 parent 28ee4d1 commit 38760ed
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_002_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -70,18 +69,16 @@ public TransportStats(StreamInput in) throws IOException {
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) && in.readBoolean()) {
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
} else {
inboundHandlingTimeBucketFrequencies = new long[0];
outboundHandlingTimeBucketFrequencies = new long[0];
if (in.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
in.readBoolean();
}
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
transportActionStats = Collections.unmodifiableMap(in.readOrderedMap(StreamInput::readString, TransportActionStats::new));
Expand All @@ -99,15 +96,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(rxSize);
out.writeVLong(txCount);
out.writeVLong(txSize);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0)) {
assert (inboundHandlingTimeBucketFrequencies.length > 0) == (outboundHandlingTimeBucketFrequencies.length > 0);
out.writeBoolean(inboundHandlingTimeBucketFrequencies.length > 0);
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
assert outboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
if (out.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
out.writeBoolean(true);
}
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeMap(transportActionStats, StreamOutput::writeWriteable);
Expand Down Expand Up @@ -166,24 +164,13 @@ public Map<String, TransportActionStats> getTransportActionStats() {
return transportActionStats;
}

@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION)
// Review and simplify the if-else blocks containing this symbol once v9 is released
private static final boolean IMPOSSIBLE_IN_V9 = true;

private boolean assertHistogramsConsistent() {
assert inboundHandlingTimeBucketFrequencies.length == outboundHandlingTimeBucketFrequencies.length;
if (inboundHandlingTimeBucketFrequencies.length == 0) {
// Stats came from before v8.1
assert IMPOSSIBLE_IN_V9;
} else {
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
}
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
return true;
}

@Override
@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION)
// review the "if" blocks checking for non-empty once we have
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject(Fields.TRANSPORT);
Expand All @@ -193,19 +180,10 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize));
builder.field(Fields.TX_COUNT, txCount);
builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize));
if (inboundHandlingTimeBucketFrequencies.length > 0) {
histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
} else {
// Stats came from before v8.1
assert IMPOSSIBLE_IN_V9;
}
if (transportActionStats.isEmpty() == false) {
builder.startObject(Fields.ACTIONS);
} else {
// Stats came from before v8.8
assert IMPOSSIBLE_IN_V9;
}
assert inboundHandlingTimeBucketFrequencies.length > 0;
histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
builder.startObject(Fields.ACTIONS);
return builder;
}),

Expand All @@ -215,12 +193,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
return builder;
}),

Iterators.single((builder, params) -> {
if (transportActionStats.isEmpty() == false) {
builder.endObject();
}
return builder.endObject();
})
Iterators.single((builder, params) -> { return builder.endObject().endObject(); })
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,8 @@

public class TransportStatsTests extends ESTestCase {
public void testToXContent() {
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), new long[0], new long[0], Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456\
}}"""
);

final var histogram = new long[HandlingTimeTracker.BUCKET_COUNT];
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), histogram, histogram, Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[],\
"outbound_handling_time_histogram":[]\
}}"""
);

histogram[4] = 10;
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), histogram, histogram, Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"outbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}]\
}}"""
);

final var requestSizeHistogram = new long[29];
requestSizeHistogram[2] = 9;
Expand All @@ -84,8 +42,8 @@ public void testToXContent() {
ByteSizeUnit.MB.toBytes(4),
5,
ByteSizeUnit.MB.toBytes(6),
new long[0],
new long[0],
histogram,
histogram,
Map.of("internal:test/action", exampleActionStats)
),
false,
Expand All @@ -95,6 +53,8 @@ public void testToXContent() {
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"outbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"actions":{"internal:test/action":%s}}}""", Strings.toString(exampleActionStats, false, true))
);
}
Expand Down

0 comments on commit 38760ed

Please sign in to comment.