Skip to content

Commit

Permalink
Reduce number of get method variants in ShardGetService (#122175)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx authored Feb 11, 2025
1 parent 6e6e42f commit e706193
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void getAndAddToResponse(
MultiGetRequest.Item item = request.items.get(location);
try {
GetResult getResult = indexShard.getService()
.get(
.mget(
item.id(),
item.storedFields(),
request.realtime(),
Expand Down
102 changes: 43 additions & 59 deletions server/src/main/java/org/elasticsearch/index/get/ShardGetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -93,7 +92,7 @@ public GetResult get(
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
) throws IOException {
return get(
return doGet(
id,
gFields,
realtime,
Expand All @@ -107,7 +106,7 @@ public GetResult get(
);
}

public GetResult get(
public GetResult mget(
String id,
String[] gFields,
boolean realtime,
Expand All @@ -117,7 +116,7 @@ public GetResult get(
boolean forceSyntheticSource,
MultiEngineGet mget
) throws IOException {
return get(
return doGet(
id,
gFields,
realtime,
Expand All @@ -131,7 +130,7 @@ public GetResult get(
);
}

private GetResult get(
private GetResult doGet(
String id,
String[] gFields,
boolean realtime,
Expand All @@ -144,21 +143,40 @@ private GetResult get(
Function<Engine.Get, Engine.GetResult> engineGetOperator
) throws IOException {
currentMetric.inc();
final long now = System.nanoTime();
try {
long now = System.nanoTime();
GetResult getResult = innerGet(
id,
gFields,
realtime,
version,
versionType,
ifSeqNo,
ifPrimaryTerm,
fetchSourceContext,
forceSyntheticSource,
engineGetOperator
);
var engineGet = new Engine.Get(realtime, realtime, id).version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);

final GetResult getResult;
try (Engine.GetResult get = engineGetOperator.apply(engineGet)) {
if (get == null) {
getResult = null;
} else if (get.exists() == false) {
getResult = new GetResult(
shardId.getIndexName(),
id,
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
-1,
false,
null,
null,
null
);
} else {
// break between having loaded it from translog (so we only have _source), and having a document to load
getResult = innerGetFetch(
id,
gFields,
normalizeFetchSourceContent(fetchSourceContext, gFields),
get,
forceSyntheticSource
);
}
}
if (getResult != null && getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
Expand All @@ -179,7 +197,7 @@ public GetResult getFromTranslog(
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource
) throws IOException {
return get(
return doGet(
id,
gFields,
realtime,
Expand All @@ -193,12 +211,8 @@ public GetResult getFromTranslog(
);
}

public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throws IOException {
return getForUpdate(id, ifSeqNo, ifPrimaryTerm, new String[] { RoutingFieldMapper.NAME });
}

public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm, String[] gFields) throws IOException {
return get(
return doGet(
id,
gFields,
true,
Expand Down Expand Up @@ -259,35 +273,6 @@ private static FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSou
return FetchSourceContext.DO_NOT_FETCH_SOURCE;
}

private GetResult innerGet(
String id,
String[] gFields,
boolean realtime,
long version,
VersionType versionType,
long ifSeqNo,
long ifPrimaryTerm,
FetchSourceContext fetchSourceContext,
boolean forceSyntheticSource,
Function<Engine.Get, Engine.GetResult> engineGetOperator
) throws IOException {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
var engineGet = new Engine.Get(realtime, realtime, id).version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);
try (Engine.GetResult get = engineGetOperator.apply(engineGet)) {
if (get == null) {
return null;
}
if (get.exists() == false) {
return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
// break between having loaded it from translog (so we only have _source), and having a document to load
return innerGetFetch(id, gFields, fetchSourceContext, get, forceSyntheticSource);
}
}

private GetResult innerGetFetch(
String id,
String[] storedFields,
Expand All @@ -298,7 +283,6 @@ private GetResult innerGetFetch(
assert get.exists() : "method should only be called if document could be retrieved";
// check first if stored fields to be loaded don't contain an object field
MappingLookup mappingLookup = mapperService.mappingLookup();
final IndexVersion indexVersion = indexSettings.getIndexVersionCreated();
final Set<String> storedFieldSet = new HashSet<>();
boolean hasInferenceMetadataFields = false;
if (storedFields != null) {
Expand Down Expand Up @@ -338,6 +322,9 @@ private GetResult innerGetFetch(
throw new ElasticsearchException("Failed to get id [" + id + "]", e);
}

final boolean supportDocValuesForIgnoredMetaField = indexSettings.getIndexVersionCreated()
.onOrAfter(IndexVersions.DOC_VALUES_FOR_IGNORED_META_FIELD);

// put stored fields into result objects
if (leafStoredFieldLoader.storedFields().isEmpty() == false) {
Set<String> needed = new HashSet<>();
Expand All @@ -351,8 +338,7 @@ private GetResult innerGetFetch(
if (false == needed.contains(entry.getKey())) {
continue;
}
if (IgnoredFieldMapper.NAME.equals(entry.getKey())
&& indexVersion.onOrAfter(IndexVersions.DOC_VALUES_FOR_IGNORED_META_FIELD)) {
if (IgnoredFieldMapper.NAME.equals(entry.getKey()) && supportDocValuesForIgnoredMetaField) {
continue;
}
MappedFieldType ft = mapperService.fieldType(entry.getKey());
Expand All @@ -371,9 +357,7 @@ private GetResult innerGetFetch(
// NOTE: when _ignored is requested via `stored_fields` we need to load it from doc values instead of loading it from stored fields.
// The _ignored field used to be stored, but as a result of supporting aggregations on it, it moved from using a stored field to
// using doc values.
if (indexVersion.onOrAfter(IndexVersions.DOC_VALUES_FOR_IGNORED_META_FIELD)
&& storedFields != null
&& Arrays.asList(storedFields).contains(IgnoredFieldMapper.NAME)) {
if (supportDocValuesForIgnoredMetaField && storedFields != null && Arrays.asList(storedFields).contains(IgnoredFieldMapper.NAME)) {
final DocumentField ignoredDocumentField = loadIgnoredMetadataField(docIdAndVersion);
if (ignoredDocumentField != null) {
if (metadataFields == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

public class ShardGetServiceTests extends IndexShardTestCase {

private GetResult getForUpdate(IndexShard indexShard, String id, long ifSeqNo, long ifPrimaryTerm) throws IOException {
return indexShard.getService().getForUpdate(id, ifSeqNo, ifPrimaryTerm, new String[] { RoutingFieldMapper.NAME });
}

public void testGetForUpdate() throws IOException {
Settings settings = indexSettings(IndexVersion.current(), 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
Expand All @@ -44,7 +48,7 @@ public void testGetForUpdate() throws IOException {
long translogInMemorySegmentCountExpected = 0;
Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet = getForUpdate(primary, "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(testGet.sourceRef().utf8ToString(), "{\"foo\" : \"bar\"}");
assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong());
Expand All @@ -54,7 +58,7 @@ public void testGetForUpdate() throws IOException {

Engine.IndexResult test1 = indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet1 = primary.getService().getForUpdate("1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet1 = getForUpdate(primary, "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertEquals(testGet1.sourceRef().utf8ToString(), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
Expand All @@ -70,19 +74,19 @@ public void testGetForUpdate() throws IOException {
// now again from the reader
Engine.IndexResult test2 = indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
testGet1 = primary.getService().getForUpdate("1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
testGet1 = getForUpdate(primary, "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertEquals(testGet1.sourceRef().utf8ToString(), "{\"foo\" : \"baz\"}");
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong());

final long primaryTerm = primary.getOperationPrimaryTerm();
testGet1 = primary.getService().getForUpdate("1", test2.getSeqNo(), primaryTerm);
testGet1 = getForUpdate(primary, "1", test2.getSeqNo(), primaryTerm);
assertEquals(testGet1.sourceRef().utf8ToString(), "{\"foo\" : \"baz\"}");
assertEquals(translogInMemorySegmentCountExpected, translogInMemorySegmentCount.getAsLong());

expectThrows(VersionConflictEngineException.class, () -> primary.getService().getForUpdate("1", test2.getSeqNo() + 1, primaryTerm));
expectThrows(VersionConflictEngineException.class, () -> primary.getService().getForUpdate("1", test2.getSeqNo(), primaryTerm + 1));
expectThrows(VersionConflictEngineException.class, () -> getForUpdate(primary, "1", test2.getSeqNo() + 1, primaryTerm));
expectThrows(VersionConflictEngineException.class, () -> getForUpdate(primary, "1", test2.getSeqNo(), primaryTerm + 1));
closeShards(primary);
}

Expand Down Expand Up @@ -183,7 +187,7 @@ private void runGetFromTranslogWithOptions(
Engine.IndexResult res = indexDoc(primary, "test", "0", docToIndex);
assertTrue(res.isCreated());
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet = getForUpdate(primary, "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertFalse(testGet.getFields().containsKey("foo"));
assertFalse(testGet.getFields().containsKey("bar"));
Expand All @@ -194,7 +198,7 @@ private void runGetFromTranslogWithOptions(

indexDoc(primary, "1", docToIndex, XContentType.JSON, "foobar");
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet1 = primary.getService().getForUpdate("1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult testGet1 = getForUpdate(primary, "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertEquals(testGet1.sourceRef() == null ? "" : testGet1.sourceRef().utf8ToString(), expectedResult);
assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
assertFalse(testGet.getFields().containsKey("foo"));
Expand Down Expand Up @@ -252,7 +256,7 @@ public void testTypelessGetForUpdate() throws IOException {
Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}");
assertTrue(indexResult.isCreated());

GetResult getResult = shard.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
GetResult getResult = getForUpdate(shard, "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertTrue(getResult.isExists());

closeShards(shard);
Expand Down

0 comments on commit e706193

Please sign in to comment.