Skip to content

Commit d95cdcd

Browse files
committed
[fix][dingo-executor] Fix session leakage caused by GC deleting regions
1 parent b75c004 commit d95cdcd

File tree

19 files changed

+189
-63
lines changed

19 files changed

+189
-63
lines changed

dingo-calcite/src/main/codegen/config.fmpp

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ data: {
6767
"io.dingodb.calcite.grammar.ddl.SqlAlterTruncatePart"
6868
"io.dingodb.calcite.grammar.ddl.SqlAlterSchema"
6969
"io.dingodb.calcite.grammar.ddl.SqlAnalyze"
70+
"io.dingodb.calcite.grammar.ddl.SqlBatchCreateTable"
7071
"io.dingodb.calcite.grammar.ddl.SqlCreateTenant"
7172
"io.dingodb.calcite.grammar.ddl.SqlCreateSchema"
7273
"io.dingodb.calcite.grammar.ddl.DingoSqlCreateView"
@@ -166,6 +167,8 @@ data: {
166167
keywords: [
167168
"ALGORITHM"
168169
"AUTO_RANDOM"
170+
"BATCH"
171+
"CANCEL"
169172
"INPLACE"
170173
"IF"
171174
"COPY"

dingo-calcite/src/main/codegen/includes/Admin.ftl

+3
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,8 @@ SqlAdmin SqlAdmin(): {
3939
|
4040
<START_GC>
4141
{ return new SqlStartGc(s.end(this)); }
42+
| <BATCH> <CREATE> <TABLE>
43+
{ return new SqlBatchCreateTable(s.end(this), true); }
44+
| <CANCEL> <BATCH> <CREATE> <TABLE> { return new SqlBatchCreateTable(s.end(this), false); }
4245
)
4346
}

dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java

+26-6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.dingodb.calcite.grammar.ddl.SqlAlterTenant;
4949
import io.dingodb.calcite.grammar.ddl.SqlAlterTruncatePart;
5050
import io.dingodb.calcite.grammar.ddl.SqlAlterUser;
51+
import io.dingodb.calcite.grammar.ddl.SqlBatchCreateTable;
5152
import io.dingodb.calcite.grammar.ddl.SqlCreateIndex;
5253
import io.dingodb.calcite.grammar.ddl.SqlCreateSchema;
5354
import io.dingodb.calcite.grammar.ddl.SqlCreateSequence;
@@ -518,9 +519,19 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {
518519

519520
tableDefinition.setIndices(indexTableDefinitions);
520521
DdlService ddlService = DdlService.root();
521-
String connId = (String) context.getDataContext().get("connId");
522-
ddlService.createTableWithInfo(schema.getSchemaName(), tableDefinition,
523-
connId, create.getOriginalCreateSql());
522+
ActionType actionType;
523+
if (InfoSchemaService.root().getBatchCreateTable()) {
524+
tableDefinition.setSchemaState(SchemaState.SCHEMA_PUBLIC);
525+
List<IndexDefinition> indices = tableDefinition.getIndices();
526+
if (indices != null) {
527+
indices.forEach(index -> index.setSchemaState(SchemaState.SCHEMA_PUBLIC));
528+
}
529+
MetaService.root().createTables(schema.getSchemaId(), tableDefinition, indices);
530+
} else {
531+
String connId = (String) context.getDataContext().get("connId");
532+
ddlService.createTableWithInfo(schema.getSchemaName(), tableDefinition,
533+
connId, create.getOriginalCreateSql());
534+
}
524535

525536
RootCalciteSchema rootCalciteSchema = (RootCalciteSchema) context.getMutableRootSchema();
526537
RootSnapshotSchema rootSnapshotSchema = (RootSnapshotSchema) rootCalciteSchema.schema;
@@ -779,7 +790,7 @@ public void execute(DingoSqlCreateView sqlCreateView, CalcitePrepare.Context con
779790
String tableName = getTableName(sqlCreateView.name);
780791

781792
// Check table exist
782-
if (schema.getTable(tableName) != null) {
793+
if (schema.getTable(tableName) != null && !sqlCreateView.getReplace()) {
783794
throw DINGO_RESOURCE.tableExists(tableName).ex();
784795
}
785796
SqlNode query = renameColumns(sqlCreateView.columnList, sqlCreateView.query);
@@ -863,7 +874,7 @@ public void execute(DingoSqlCreateView sqlCreateView, CalcitePrepare.Context con
863874
properties.setProperty("algorithm", sqlCreateView.alg);
864875
tableDefinition.setProperties(properties);
865876
DdlService ddlService = DdlService.root();
866-
ddlService.createViewWithInfo(schemaName, tableDefinition, connId, null);
877+
ddlService.createViewWithInfo(schemaName, tableDefinition, connId, null, sqlCreateView.getReplace());
867878
}
868879

869880
public void execute(@NonNull SqlCreateUser sqlCreateUser, CalcitePrepare.Context context) {
@@ -1889,6 +1900,15 @@ public void execute(SqlAlterExchangePart sqlAlterExchangePart, CalcitePrepare.Co
18891900
}
18901901
}
18911902

1903+
public void execute(SqlBatchCreateTable sqlBatchCreateTable, CalcitePrepare.Context context) {
1904+
LogUtils.info(log, "DDL execute sql batch create table");
1905+
InfoSchemaService.root().setBatchCreateTable(sqlBatchCreateTable.batchCreateTable);
1906+
// increment schema version
1907+
if (!sqlBatchCreateTable.batchCreateTable) {
1908+
InfoSchemaService.root().genSchemaVersion(101);
1909+
}
1910+
}
1911+
18921912
public static void validatePartitionBy(
18931913
@NonNull List<String> keyList,
18941914
@NonNull TableDefinition tableDefinition,
@@ -2765,7 +2785,7 @@ public static DdlJob getRecoverJobBySql(String querySql, boolean table) {
27652785
} catch (Exception e) {
27662786
LogUtils.error(log, e.getMessage(), e);
27672787
} finally {
2768-
session.destroy();
2788+
SessionUtil.INSTANCE.closeSession(session);
27692789
}
27702790
return ddlJob;
27712791
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2021 DataCanvas
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.dingodb.calcite.grammar.ddl;
18+
19+
import org.apache.calcite.sql.SqlKind;
20+
import org.apache.calcite.sql.SqlOperator;
21+
import org.apache.calcite.sql.SqlSpecialOperator;
22+
import org.apache.calcite.sql.SqlWriter;
23+
import org.apache.calcite.sql.parser.SqlParserPos;
24+
25+
public class SqlBatchCreateTable extends SqlAdmin {
26+
27+
public boolean batchCreateTable;
28+
29+
private static final SqlOperator OPERATOR =
30+
new SqlSpecialOperator("ADMIN BATCH CREATE TABLE", SqlKind.OTHER_DDL);
31+
32+
public SqlBatchCreateTable(SqlParserPos pos, boolean batchCreateTable) {
33+
super(OPERATOR, pos);
34+
this.batchCreateTable = batchCreateTable;
35+
}
36+
37+
@Override
38+
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
39+
writer.keyword("ADMIN ");
40+
if (!batchCreateTable) {
41+
writer.keyword("CANCEL");
42+
}
43+
writer.keyword("BATCH CREATE TABLE");
44+
}
45+
46+
}

dingo-common/src/main/java/io/dingodb/common/ddl/ActionType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public enum ActionType {
4747
ActionAlterCheckConstraint(45),
4848
ActionCreateTables(60),
4949
ActionResetAutoInc(61),
50-
ActionRecoverSchema(63),
50+
ActionRecoverSchema(63)
5151
;
52+
5253
private final int code;
5354

5455
ActionType(int code) {

dingo-common/src/main/java/io/dingodb/common/ddl/DdlJob.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class DdlJob {
7575

7676
private int priority;
7777
private long seqNu;
78+
private boolean replace;
7879

7980
@Builder
8081
public DdlJob(
@@ -92,7 +93,8 @@ public DdlJob(
9293
SchemaState schemaState, long snapshotVer, long realStartTs,
9394
long startTs, long dependencyId, String query, long version, int priority, long seqNu,
9495
List<Object> args,
95-
DingoErr err
96+
DingoErr err,
97+
boolean replace
9698
) {
9799
this.id = id;
98100
this.actionType = actionType;
@@ -117,6 +119,7 @@ public DdlJob(
117119
this.seqNu = seqNu;
118120
this.args = args;
119121
this.dingoErr = err;
122+
this.replace = replace;
120123
}
121124

122125
public DdlJob() {

dingo-common/src/main/java/io/dingodb/common/ddl/FieldTypeChecker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public static DingoSqlException checkNullVal(String schemaName, String tableName
8989
} catch (SQLException e) {
9090
throw new DingoSqlException(e);
9191
} finally {
92-
session.destroy();
92+
SessionUtil.INSTANCE.closeSession(session);
9393
}
9494
}
9595

dingo-common/src/main/java/io/dingodb/common/metrics/DingoMetrics.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.codahale.metrics.jmx.JmxReporter;
2626
import io.dingodb.common.concurrent.Executors;
2727
import io.dingodb.common.ddl.RunningJobs;
28+
import io.dingodb.common.session.SessionUtil;
2829
import lombok.extern.slf4j.Slf4j;
2930
import org.checkerframework.checker.nullness.qual.NonNull;
3031

@@ -104,7 +105,7 @@ protected Integer loadValue() {
104105
metricRegistry.register("activeSessionCount", new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
105106
@Override
106107
protected Integer loadValue() {
107-
return RunningJobs.runningJobs.size();
108+
return SessionUtil.INSTANCE.getSessionPool().getNumActive();
108109
}
109110
});
110111
metricRegistry.register("select-latency", new CachedGauge<Double>(5, TimeUnit.MINUTES) {

dingo-executor/src/main/java/io/dingodb/server/executor/ddl/DdlWorkerPool.java

-4
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,11 @@
1616

1717
package io.dingodb.server.executor.ddl;
1818

19-
import com.codahale.metrics.CachedGauge;
20-
import io.dingodb.common.log.LogUtils;
2119
import lombok.extern.slf4j.Slf4j;
2220
import org.apache.commons.pool2.PooledObjectFactory;
2321
import org.apache.commons.pool2.impl.GenericObjectPool;
2422
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
2523

26-
import java.util.concurrent.TimeUnit;
27-
2824
@Slf4j
2925
public class DdlWorkerPool extends GenericObjectPool<DdlWorker> {
3026

dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
@Slf4j
4343
public final class JobTableUtil {
44-
// TODO
4544
private static final String updateDDLJobSQL = "update mysql.dingo_ddl_job set job_meta = '%s' where job_id = %d";
4645
private static final String getJobSQL = "select job_meta, processing, job_id from mysql.dingo_ddl_job where "
4746
+ "job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids, processing) "
@@ -283,12 +282,19 @@ public static boolean gcDeleteDone(
283282
sql = String.format(sql, jobId, regionId, ts, Utils.quoteForSql(startKey.toString()),
284283
Utils.quoteForSql(endKey.toString()), Utils.quoteForSql(eleId), Utils.quoteForSql(eleType));
285284
Session session = SessionUtil.INSTANCE.getSession();
286-
session.setAutoCommit(false);
287-
session.executeUpdate(sql);
285+
try {
286+
session.setAutoCommit(false);
287+
session.executeUpdate(sql);
288288

289-
String removeSql = "delete from mysql.gc_delete_range where job_id=" + jobId;
290-
session.executeUpdate(removeSql);
291-
session.commit();
289+
String removeSql = "delete from mysql.gc_delete_range where job_id=" + jobId;
290+
session.executeUpdate(removeSql);
291+
session.commit();
292+
} catch (Exception e) {
293+
LogUtils.error(log, e.getMessage(), e);
294+
session.rollback();
295+
} finally {
296+
SessionUtil.INSTANCE.closeSession(session);
297+
}
292298
LogUtils.info(log, "gcDeleteDone, regionId:{}, jobId:{}", regionId, jobId);
293299
return true;
294300
}

dingo-executor/src/main/java/io/dingodb/server/executor/ddl/TableUtil.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
import java.util.List;
3636

37+
import static io.dingodb.common.mysql.error.ErrorCode.ErrTableExists;
38+
3739
@Slf4j
3840
public final class TableUtil {
3941

@@ -75,32 +77,26 @@ public static Pair<TableDefinition, String> createTable(DdlJob ddlJob) {
7577
return Pair.of(tableInfo, "ErrInvalidDDLState");
7678
}
7779

78-
public static Pair<TableDefinition, String> createView(DdlJob ddlJob) {
80+
public static synchronized Pair<TableDefinition, String> createView(DdlJob ddlJob) {
7981
long schemaId = ddlJob.getSchemaId();
8082
TableDefinition tableInfo = (TableDefinition) ddlJob.getArgs().get(0);
8183
tableInfo.setSchemaState(SchemaState.SCHEMA_NONE);
8284
long tableId = ddlJob.getTableId();
8385
tableInfo.setPrepareTableId(tableId);
8486

85-
InfoSchemaService service = InfoSchemaService.root();
86-
Object tabObj = service.getTable(schemaId, tableInfo.getName());
87-
if (tabObj != null) {
88-
ddlJob.setState(JobState.jobStateCancelled);
89-
return Pair.of(null, "view has existed");
90-
}
9187
if (tableInfo.getSchemaState() == SchemaState.SCHEMA_NONE) {
9288
tableInfo.setSchemaState(SchemaState.SCHEMA_PUBLIC);
9389
MetaService metaService = MetaService.root();
94-
List<IndexDefinition> indices = tableInfo.getIndices();
95-
if (indices != null) {
96-
indices.forEach(index -> index.setSchemaState(SchemaState.SCHEMA_PUBLIC));
90+
InfoSchemaService service = InfoSchemaService.root();
91+
Object tabObj = service.getTable(schemaId, tableInfo.getName());
92+
if (tabObj != null && !ddlJob.isReplace()) {
93+
ddlJob.setState(JobState.jobStateCancelled);
94+
return Pair.of(null, "view has existed");
9795
}
9896
try {
99-
assert indices != null;
10097
metaService.createView(schemaId, tableInfo.getName(), tableInfo);
10198
return Pair.of(tableInfo, null);
10299
} catch (Exception e) {
103-
metaService.rollbackCreateTable(schemaId, tableInfo, indices);
104100
LogUtils.error(log, "[ddl-error]" + e.getMessage(), e);
105101
ddlJob.setState(JobState.jobStateCancelled);
106102
if (e instanceof NullPointerException) {
@@ -208,6 +204,8 @@ public static void recoverTable(
208204
});
209205
} catch (Exception e) {
210206
LogUtils.error(log, e.getMessage(), e);
207+
} finally {
208+
SessionUtil.INSTANCE.closeSession(session);
211209
}
212210

213211
// create table Info and set autoIncId

dingo-executor/src/main/java/io/dingodb/server/executor/schedule/LoadInfoSchemaTask.java

+12-10
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static void loadInfo() {
111111
refreshMDLCheckTableInfo();
112112
DdlJobEventSource.forcePut(DdlJobEventSource.ddlJobEventSource.mdlCheckVerQueue, 1L);
113113
} catch (Exception e) {
114-
LogUtils.error(log, "refreshMDLCheckTableInfo error, reason:{}", e.getMessage());
114+
LogUtils.error(log, "refreshMDLCheckTableInfo error, reason:{}", e.getMessage(), e);
115115
}
116116
}
117117

@@ -233,21 +233,23 @@ public static LoadSchemaDiffs tryLoadSchemaDiffs(
233233
}
234234

235235
public static void refreshMDLCheckTableInfo() {
236-
Session session = SessionUtil.INSTANCE.getSession();
237-
InfoSchema is = InfoCache.infoCache.getLatest();
238-
if (is == null) {
239-
return;
240-
}
241-
long schemaVer = is.schemaMetaVersion;
242236
if (!ScopeVariables.runDdl()) {
243237
return;
244238
}
239+
InfoSchema is = InfoCache.infoCache.getLatest();
240+
MdlCheckTableInfo mdlCheckTableInfo;
245241
long start = System.currentTimeMillis();
246-
String sql = "select job_id, version, table_ids from mysql.dingo_mdl_info where version <= %d";
247-
sql = String.format(sql, schemaVer);
248-
MdlCheckTableInfo mdlCheckTableInfo = ExecutionEnvironment.INSTANCE.mdlCheckTableInfo;
242+
long schemaVer;
249243
List<Object[]> resList;
244+
Session session = SessionUtil.INSTANCE.getSession();
250245
try {
246+
if (is == null) {
247+
return;
248+
}
249+
schemaVer = is.schemaMetaVersion;
250+
String sql = "select job_id, version, table_ids from mysql.dingo_mdl_info where version <= %d";
251+
sql = String.format(sql, schemaVer);
252+
mdlCheckTableInfo = ExecutionEnvironment.INSTANCE.mdlCheckTableInfo;
251253
resList = session.executeQuery(sql);
252254
if (resList.isEmpty()) {
253255
LogUtils.debug(log, "[ddl] load mdl table info empty, ver:{}", schemaVer);

dingo-meta-api/src/main/java/io/dingodb/meta/DdlService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void createTableWithInfo(String schemaName,
4242
TableDefinition tableDefinition, String connId, String sql);
4343

4444
default void createViewWithInfo(String schemaName,
45-
TableDefinition tableDefinition, String connId, String sql) {
45+
TableDefinition tableDefinition, String connId, String sql, boolean replace) {
4646

4747
}
4848

dingo-meta-api/src/main/java/io/dingodb/meta/InfoSchemaService.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ public interface InfoSchemaService {
4545
String nextGlobalID = "NextGlobalID";
4646
String mDDLJobHistoryKey = "DDLJobHistory";
4747
String mHistoryJobPrefix = String.format("%s:%s", DdlUtil.tenantPrefix, mDDLJobHistoryKey);
48-
4948
byte[] mHistoryJobPrefixKeys = mHistoryJobPrefix.getBytes();
49+
String createTables = String.format("%s:%s", DdlUtil.tenantPrefix, "createTables");
50+
byte[] mCreateTables = createTables.getBytes();
5051

5152
String mDdlTemplate = "%s:%s:%d";
5253

@@ -296,6 +297,8 @@ default void delSchemaDiff(long ver) {
296297

297298
}
298299

300+
void setBatchCreateTable(boolean batchCreateTable);
301+
299302
void updateTable(long schemaId, Object table);
300303

301304
default void updateReplicaTable(long schemaId, long tableId, Object table) {
@@ -306,6 +309,10 @@ default void updateIndex(long tableId, Object index) {
306309

307310
}
308311

312+
default boolean getBatchCreateTable() {
313+
return false;
314+
}
315+
309316
DdlJob getHistoryDDLJob(long jobId);
310317

311318
void addHistoryDDLJob(DdlJob job, boolean updateRawArgs);

0 commit comments

Comments
 (0)