Skip to content

Commit e062b7b

Browse files
committed
es 8.15.1 support
1 parent fc80e3d commit e062b7b

File tree

7 files changed

+55
-45
lines changed

7 files changed

+55
-45
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ jdk:
55

66
before_install:
77
- sudo rm -rf /var/lib/elasticsearch
8-
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.0-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
8+
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.1-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
99
- sudo cp ./src/test/resources/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml
1010
- sudo cat /etc/elasticsearch/elasticsearch.yml
1111
- sudo java -version

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>org.nlpcn</groupId>
55
<artifactId>elasticsearch-sql</artifactId>
6-
<version>8.15.0.0</version>
6+
<version>8.15.1.0</version>
77
<packaging>jar</packaging>
88
<description>Query elasticsearch using SQL</description>
99
<name>elasticsearch-sql</name>
@@ -44,7 +44,7 @@
4444
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4545
<runSuite>**/MainTestSuite.class</runSuite>
4646
<elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
47-
<elasticsearch.version>8.15.0</elasticsearch.version>
47+
<elasticsearch.version>8.15.1</elasticsearch.version>
4848
<elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
4949
<druid.version>1.2.15</druid.version>
5050
<guava.version>32.0.0-jre</guava.version>

src/main/java/org/elasticsearch/plugin/nlpcn/ActionRequestRestExecuter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import org.elasticsearch.action.search.SearchRequest;
66
import org.elasticsearch.client.internal.Client;
77
import org.elasticsearch.rest.RestChannel;
8-
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
8+
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
99
import org.nlpcn.es4sql.exception.SqlParseException;
1010
import org.nlpcn.es4sql.query.SqlElasticDeleteByQueryRequestBuilder;
1111
import org.nlpcn.es4sql.query.SqlElasticRequestBuilder;
@@ -39,7 +39,7 @@ public void execute() throws Exception {
3939
executeJoinRequestAndSendResponse();
4040
}
4141
else if (request instanceof SearchRequest) {
42-
client.search((SearchRequest) request, new RestChunkedToXContentListener<>(channel));
42+
client.search((SearchRequest) request, new RestRefCountedChunkedToXContentListener<>(channel));
4343
} else if (requestBuilder instanceof SqlElasticDeleteByQueryRequestBuilder) {
4444
throw new UnsupportedOperationException("currently not support delete on elastic 2.0.0");
4545
}

src/main/java/org/elasticsearch/plugin/nlpcn/ElasticJoinExecutor.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,10 @@ protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) {
4949
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
5050
}
5151

52-
public void sendResponse(RestChannel channel){
53-
try {
54-
XContentBuilder builder = ElasticUtils.hitsAsXContentBuilder(results,metaResults);
55-
RestResponse bytesRestResponse = new RestResponse(RestStatus.OK, builder);
56-
channel.sendResponse(bytesRestResponse);
57-
} catch (IOException e) {
58-
e.printStackTrace();
59-
}
52+
public void sendResponse(RestChannel channel) throws IOException {
53+
XContentBuilder builder = ElasticUtils.hitsAsXContentBuilder(results, metaResults);
54+
RestResponse bytesRestResponse = new RestResponse(RestStatus.OK, builder);
55+
channel.sendResponse(bytesRestResponse);
6056
}
6157

6258
@Override

src/main/java/org/elasticsearch/plugin/nlpcn/RestSqlAction.java

+30-21
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.apache.logging.log4j.LogManager;
44
import org.apache.logging.log4j.Logger;
55
import org.elasticsearch.client.internal.node.NodeClient;
6+
import org.elasticsearch.rest.RestChannel;
67
import org.elasticsearch.rest.RestResponse;
78
import org.elasticsearch.xcontent.XContentParseException;
89
import org.elasticsearch.xcontent.XContentParser;
@@ -13,18 +14,18 @@
1314
import org.elasticsearch.rest.RestRequest;
1415
import org.elasticsearch.rest.RestStatus;
1516
import org.nlpcn.es4sql.SearchDao;
16-
import org.nlpcn.es4sql.exception.SqlParseException;
1717
import org.nlpcn.es4sql.query.QueryAction;
1818

1919
import java.io.IOException;
20-
import java.sql.SQLFeatureNotSupportedException;
2120
import java.util.Arrays;
2221
import java.util.Collections;
2322
import java.util.HashMap;
2423
import java.util.HashSet;
2524
import java.util.List;
2625
import java.util.Map;
26+
import java.util.Optional;
2727
import java.util.Set;
28+
import java.util.concurrent.ExecutorService;
2829

2930
import static org.elasticsearch.rest.RestRequest.Method.GET;
3031
import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -56,27 +57,39 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
5657
// LOGGER.warn("Please use json format params, like: {\"sql\":\"SELECT * FROM test\"}");
5758
}
5859

59-
String sql = request.param("sql");
60+
String sql = Optional.ofNullable(request.param("sql")).orElseGet(() -> request.content().utf8ToString());
61+
boolean useThreadPool = request.paramAsBoolean("useThreadPool", false);
6062

61-
if (sql == null) {
62-
sql = request.content().utf8ToString();
63+
if (useThreadPool) {
64+
ExecutorService executor = client.threadPool().executor("nlpcn_sql");
65+
return channel -> executor.execute(() -> doSqlRequest(request, client, sql, channel));
6366
}
67+
return channel -> doSqlRequest(request, client, sql, channel);
68+
}
69+
70+
@Override
71+
protected Set<String> responseParams() {
72+
Set<String> responseParams = new HashSet<>(super.responseParams());
73+
responseParams.addAll(Arrays.asList("sql", "flat", "separator", "_score", "_type", "_id", "_scroll_id", "newLine", "format", "showHeader", "quote", "useThreadPool"));
74+
return Collections.unmodifiableSet(responseParams);
75+
}
76+
77+
private void doSqlRequest(RestRequest request, NodeClient client, String sql, RestChannel channel) {
6478
try {
6579
SearchDao searchDao = new SearchDao(client);
66-
QueryAction queryAction = null;
6780

68-
queryAction = searchDao.explain(sql);//zhongshu-comment 语法解析,将sql字符串解析为一个Java查询对象
81+
//zhongshu-comment 语法解析,将sql字符串解析为一个Java查询对象
82+
QueryAction queryAction = searchDao.explain(sql);
6983

7084
// TODO add unit tests to explain. (rest level?)
7185
if (request.path().endsWith("/explain")) {
7286
final String jsonExplanation = queryAction.explain().explain();
73-
return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, XContentType.JSON.mediaType(), jsonExplanation));
87+
channel.sendResponse(new RestResponse(RestStatus.OK, XContentType.JSON.mediaType(), jsonExplanation));
7488
} else {
7589
Map<String, String> params = request.params();
7690

7791
//zhongshu-comment 生成一个负责用rest方式查询es的对象RestExecutor,返回的实现类是:ElasticDefaultRestExecutor
7892
RestExecutor restExecutor = ActionRequestRestExecuterFactory.createExecutor(params.get("format"));
79-
final QueryAction finalQueryAction = queryAction;
8093
//doing this hack because elasticsearch throws exception for un-consumed props
8194
Map<String, String> additionalParams = new HashMap<>();
8295
for (String paramName : responseParams()) {
@@ -87,19 +100,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
87100
//zhongshu-comment restExecutor.execute()方法里会调用es查询的相关rest api
88101
//zhongshu-comment restExecutor.execute()方法的第1、4个参数是框架传进来的参数,第2、3个参数是可以自己生成的参数,所以要多注重一点
89102
//zhongshu-comment 默认调用的是ElasticDefaultRestExecutor这个子类
90-
//todo 这是什么语法:搜索java8 -> lambda表达式:https://blog.csdn.net/ioriogami/article/details/12782141
91-
return channel -> restExecutor.execute(client, additionalParams, finalQueryAction, channel);
103+
restExecutor.execute(client, additionalParams, queryAction, channel);
104+
}
105+
} catch (Exception e) {
106+
try {
107+
channel.sendResponse(new RestResponse(channel, e));
108+
} catch (Exception inner) {
109+
inner.addSuppressed(e);
110+
LOGGER.error("failed to send failure response", inner);
92111
}
93-
} catch (SqlParseException | SQLFeatureNotSupportedException e) {
94-
e.printStackTrace();
95112
}
96-
return null;
97-
}
98-
99-
@Override
100-
protected Set<String> responseParams() {
101-
Set<String> responseParams = new HashSet<>(super.responseParams());
102-
responseParams.addAll(Arrays.asList("sql", "flat", "separator", "_score", "_type", "_id", "_scroll_id", "newLine", "format", "showHeader", "quote"));
103-
return Collections.unmodifiableSet(responseParams);
104113
}
105114
}

src/main/java/org/elasticsearch/plugin/nlpcn/SqlPlug.java

+9
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77
import org.elasticsearch.common.settings.IndexScopedSettings;
88
import org.elasticsearch.common.settings.Settings;
99
import org.elasticsearch.common.settings.SettingsFilter;
10+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1011
import org.elasticsearch.features.NodeFeature;
1112
import org.elasticsearch.plugins.ActionPlugin;
1213
import org.elasticsearch.plugins.Plugin;
1314
import org.elasticsearch.rest.RestController;
1415
import org.elasticsearch.rest.RestHandler;
16+
import org.elasticsearch.threadpool.ExecutorBuilder;
17+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
1518

1619
import java.util.Collection;
1720
import java.util.Collections;
21+
import java.util.List;
1822
import java.util.function.Predicate;
1923
import java.util.function.Supplier;
2024

@@ -40,4 +44,9 @@ public Collection<?> createComponents(PluginServices services) {
4044
public Collection<RestHandler> getRestHandlers(Settings settings, NamedWriteableRegistry namedWriteableRegistry, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster, Predicate<NodeFeature> clusterSupportsFeature) {
4145
return Collections.singletonList(new RestSqlAction());
4246
}
47+
48+
@Override
49+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
50+
return Collections.singletonList(new FixedExecutorBuilder(settings, "nlpcn_sql", 10, 100, "thread_pool.nlpcn_sql", EsExecutors.TaskTrackingConfig.DEFAULT));
51+
}
4352
}

src/main/java/org/elasticsearch/plugin/nlpcn/executors/ElasticDefaultRestExecutor.java

+7-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.elasticsearch.client.internal.Client;
1111
import org.elasticsearch.common.bytes.BytesReference;
1212
import org.elasticsearch.rest.RestResponse;
13-
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
13+
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
1414
import org.elasticsearch.xcontent.XContentBuilder;
1515
import org.elasticsearch.reindex.BulkIndexByScrollResponseContentListener;
1616
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@@ -63,7 +63,7 @@ public void execute(Client client, Map<String, String> params, QueryAction query
6363
} else if (request instanceof SearchRequest) {
6464
//zhongshu-comment 对应的QueryAction实现子类:DefaultQueryAction、AggregationQueryAction
6565
//zhongshu-comment 对应的SqlElasticRequestBuilder实现子类:SqlElasticSearchRequestBuilder
66-
client.search((SearchRequest) request, new RestChunkedToXContentListener<>(channel));
66+
client.search((SearchRequest) request, new RestRefCountedChunkedToXContentListener<>(channel));
6767
} else if (request instanceof DeleteByQueryRequest) {
6868
//zhongshu-comment 对应的QueryAction实现子类:DeleteQueryAction
6969
//zhongshu-comment 对应的SqlElasticRequestBuilder实现子类:SqlElasticDeleteByQueryRequestBuilder
@@ -73,7 +73,7 @@ public void execute(Client client, Map<String, String> params, QueryAction query
7373
//zhongshu-comment 对应的SqlElasticRequestBuilder实现子类:是一个匿名内部类,跳进去queryAction.explain()看
7474
requestBuilder.getBuilder().execute(new GetIndexRequestRestListener(channel, (GetIndexRequest) request));
7575
} else if (request instanceof SearchScrollRequest) {
76-
client.searchScroll((SearchScrollRequest) request, new RestChunkedToXContentListener<>(channel));
76+
client.searchScroll((SearchScrollRequest) request, new RestRefCountedChunkedToXContentListener<>(channel));
7777
} else {
7878
throw new Exception(String.format("Unsupported ActionRequest provided: %s", request.getClass().getName()));
7979
}
@@ -107,13 +107,9 @@ public String execute(Client client, Map<String, String> params, QueryAction que
107107

108108
}
109109

110-
private void sendDefaultResponse(SearchHits hits, RestChannel channel) {
111-
try {
112-
XContentBuilder builder = ElasticUtils.hitsAsXContentBuilder(hits, new MetaSearchResult());
113-
RestResponse bytesRestResponse = new RestResponse(RestStatus.OK, builder);
114-
channel.sendResponse(bytesRestResponse);
115-
} catch (IOException e) {
116-
e.printStackTrace();
117-
}
110+
private void sendDefaultResponse(SearchHits hits, RestChannel channel) throws IOException {
111+
XContentBuilder builder = ElasticUtils.hitsAsXContentBuilder(hits, new MetaSearchResult());
112+
RestResponse bytesRestResponse = new RestResponse(RestStatus.OK, builder);
113+
channel.sendResponse(bytesRestResponse);
118114
}
119115
}

0 commit comments

Comments
 (0)