Skip to content

Commit 0707e37

Browse files
committed
merge release to github
2 parents e0a7da9 + c7b1e77 commit 0707e37

File tree

29 files changed

+624
-170
lines changed

29 files changed

+624
-170
lines changed

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

+15
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.net.URL;
30+
import java.net.URLClassLoader;
2631

2732
/**
2833
* local模式获取sql任务的执行计划
@@ -32,16 +37,26 @@
3237
*/
3338
public class GetPlan {
3439

40+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
41+
3542
public static String getExecutionPlan(String[] args) {
43+
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
3644
try {
3745
long start = System.currentTimeMillis();
3846
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
47+
paramsInfo.setGetPlan(true);
48+
ClassLoader envClassLoader = StreamExecutionEnvironment.class.getClassLoader();
49+
ClassLoader plannerClassLoader = URLClassLoader.newInstance(new URL[0], envClassLoader);
50+
Thread.currentThread().setContextClassLoader(plannerClassLoader);
3951
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4052
String executionPlan = env.getExecutionPlan();
4153
long end = System.currentTimeMillis();
4254
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4355
} catch (Exception e) {
56+
LOG.error("Get plan error", e);
4457
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
58+
} finally {
59+
Thread.currentThread().setContextClassLoader(currentClassLoader);
4560
}
4661
}
4762
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
7373

7474
confProperties = PropertiesUtils.propertiesTrim(confProperties);
7575
streamEnv.getConfig().disableClosureCleaner();
76-
// Disables reusing object
77-
streamEnv.getConfig().enableObjectReuse();
7876

7977
Configuration globalJobParameters = new Configuration();
8078
//Configuration unsupported set properties key-value

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+27-19
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.parser.CreateFuncParser;
22-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
23-
import com.dtstack.flink.sql.parser.FlinkPlanner;
24-
import com.dtstack.flink.sql.parser.InsertSqlParser;
25-
import com.dtstack.flink.sql.parser.SqlParser;
26-
import com.dtstack.flink.sql.parser.SqlTree;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31-
import org.apache.flink.table.api.*;
32-
import org.apache.flink.table.api.java.StreamTableEnvironment;
33-
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
34-
import org.apache.flink.table.sinks.TableSink;
35-
3621
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
3722
import com.dtstack.flink.sql.enums.ClusterMode;
3823
import com.dtstack.flink.sql.enums.ECacheType;
@@ -42,8 +27,14 @@
4227
import com.dtstack.flink.sql.function.FunctionManager;
4328
import com.dtstack.flink.sql.option.OptionParser;
4429
import com.dtstack.flink.sql.option.Options;
45-
import com.dtstack.flink.sql.side.SideSqlExec;
30+
import com.dtstack.flink.sql.parser.CreateFuncParser;
31+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32+
import com.dtstack.flink.sql.parser.FlinkPlanner;
33+
import com.dtstack.flink.sql.parser.InsertSqlParser;
34+
import com.dtstack.flink.sql.parser.SqlParser;
35+
import com.dtstack.flink.sql.parser.SqlTree;
4636
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
37+
import com.dtstack.flink.sql.side.SideSqlExec;
4738
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4839
import com.dtstack.flink.sql.source.StreamSourceFactory;
4940
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
@@ -62,6 +53,17 @@
6253
import org.apache.calcite.sql.SqlNode;
6354
import org.apache.commons.io.Charsets;
6455
import org.apache.commons.lang3.StringUtils;
56+
import org.apache.flink.api.common.typeinfo.TypeInformation;
57+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
58+
import org.apache.flink.streaming.api.datastream.DataStream;
59+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
60+
import org.apache.flink.table.api.EnvironmentSettings;
61+
import org.apache.flink.table.api.Table;
62+
import org.apache.flink.table.api.TableConfig;
63+
import org.apache.flink.table.api.TableEnvironment;
64+
import org.apache.flink.table.api.java.StreamTableEnvironment;
65+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
66+
import org.apache.flink.table.sinks.TableSink;
6567
import org.slf4j.Logger;
6668
import org.slf4j.LoggerFactory;
6769

@@ -71,13 +73,13 @@
7173
import java.net.URLClassLoader;
7274
import java.net.URLDecoder;
7375
import java.time.ZoneId;
76+
import java.util.ArrayList;
7477
import java.util.Arrays;
7578
import java.util.List;
7679
import java.util.Map;
7780
import java.util.Properties;
7881
import java.util.Set;
7982
import java.util.TimeZone;
80-
import java.util.ArrayList;
8183

8284
/**
8385
* 任务执行时的流程方法
@@ -158,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
158160
Map<String, Table> registerTableCache = Maps.newHashMap();
159161

160162
//register udf
161-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
163+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
162164
//register table schema
163165
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
164166
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -243,13 +245,19 @@ private static void sqlTranslation(String localSqlPluginPath,
243245
}
244246
}
245247

246-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
248+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
247249
throws IllegalAccessException, InvocationTargetException {
248250
// udf和tableEnv须由同一个类加载器加载
249251
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
252+
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
250253
URLClassLoader classLoader = null;
251254
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
252255
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
256+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
257+
if (getPlan) {
258+
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) currentClassLoader);
259+
}
260+
253261
//classloader
254262
if (classLoader == null) {
255263
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

+4-17
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.parser;
2220

2321
import com.dtstack.flink.sql.util.DtStringUtil;
@@ -41,8 +39,6 @@ public class CreateTableParser implements IParser {
4139

4240
private static final Pattern PATTERN = Pattern.compile(PATTERN_STR);
4341

44-
private static final Pattern PROP_PATTERN = Pattern.compile("^'\\s*(.+)\\s*'$");
45-
4642
public static CreateTableParser newInstance(){
4743
return new CreateTableParser();
4844
}
@@ -70,28 +66,19 @@ public void parseSql(String sql, SqlTree sqlTree) {
7066
}
7167
}
7268

73-
private Map parseProp(String propsStr){
74-
propsStr = propsStr.replaceAll("'\\s*,", "'|");
75-
String[] strs = propsStr.trim().split("\\|");
69+
private Map<String, Object> parseProp(String propsStr){
70+
List<String> strings = DtStringUtil.splitIgnoreQuota(propsStr.trim(), ',');
7671
Map<String, Object> propMap = Maps.newHashMap();
77-
for (String str : strs) {
72+
for (String str : strings) {
7873
List<String> ss = DtStringUtil.splitIgnoreQuota(str, '=');
7974
String key = ss.get(0).trim();
80-
String value = extractValue(ss.get(1).trim());
75+
String value = ss.get(1).trim().replaceAll("'", "").trim();
8176
propMap.put(key, value);
8277
}
8378

8479
return propMap;
8580
}
8681

87-
private String extractValue(String value) {
88-
Matcher matcher = PROP_PATTERN.matcher(value);
89-
if (matcher.find()) {
90-
return matcher.group(1);
91-
}
92-
throw new RuntimeException("[" + value + "] format is invalid");
93-
}
94-
9582
public static class SqlParserResult{
9683

9784
private String tableName;

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33-
import java.util.ArrayList;
3433
import java.util.List;
3534
import java.util.Set;
3635
import java.util.regex.Matcher;
@@ -57,7 +56,7 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5756
LOCAL_SQL_PLUGIN_ROOT = localSqlPluginRoot;
5857
}
5958

60-
private static final Pattern ADD_FIlE_PATTERN = Pattern.compile("(?i).*add\\s+file\\s+.+");
59+
private static final Pattern ADD_FILE_AND_JAR_PATTERN = Pattern.compile("(?i).*add\\s+file\\s+.+|(?i).*add\\s+jar\\s+.+");
6160

6261
/**
6362
* flink support sql syntax
@@ -78,7 +77,7 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
7877
.replace("\t", " ").trim();
7978

8079
List<String> sqlArr = DtStringUtil.splitIgnoreQuota(sql, SQL_DELIMITER);
81-
sqlArr = removeAddFileStmt(sqlArr);
80+
sqlArr = removeAddFileAndJarStmt(sqlArr);
8281
SqlTree sqlTree = new SqlTree();
8382
AbstractTableInfoParser tableInfoParser = new AbstractTableInfoParser();
8483
for(String childSql : sqlArr){
@@ -166,12 +165,12 @@ public static SqlTree parseSql(String sql, String pluginLoadMode) throws Excepti
166165
}
167166

168167
/**
169-
* remove add file with statment etc. add file /etc/krb5.conf;
168+
* remove add file and jar with statment etc. add file /etc/krb5.conf, add jar xxx.jar;
170169
*/
171-
private static List<String> removeAddFileStmt(List<String> stmts) {
172-
List<String> cleanedStmts = new ArrayList<>();
170+
private static List<String> removeAddFileAndJarStmt(List<String> stmts) {
171+
List<String> cleanedStmts = Lists.newArrayList();
173172
for (String stmt : stmts) {
174-
Matcher matcher = ADD_FIlE_PATTERN.matcher(stmt);
173+
Matcher matcher = ADD_FILE_AND_JAR_PATTERN.matcher(stmt);
175174
if(!matcher.matches()) {
176175
cleanedStmts.add(stmt);
177176
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void asyncInvoke(Row row, ResultFuture<Row> resultFuture) throws Exceptio
191191
}
192192

193193
private Map<String, Object> parseInputParam(Row input) {
194-
Map<String, Object> inputParams = Maps.newHashMap();
194+
Map<String, Object> inputParams = Maps.newLinkedHashMap();
195195
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
196196
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
197197
Object equalObj = input.getField(conValIndex);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+48-12
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import org.apache.calcite.sql.parser.SqlParseException;
4545
import org.apache.commons.collections.CollectionUtils;
4646
import org.apache.commons.lang3.StringUtils;
47+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
48+
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
49+
import org.apache.flink.api.common.typeinfo.TypeHint;
4750
import org.apache.flink.api.common.typeinfo.TypeInformation;
4851
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4952
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -53,16 +56,30 @@
5356
import org.apache.flink.table.api.java.StreamTableEnvironment;
5457
import org.apache.flink.table.catalog.ObjectIdentifier;
5558
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
59+
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
60+
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
61+
import org.apache.flink.table.types.logical.DecimalType;
62+
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5663
import org.apache.flink.table.types.logical.LogicalType;
64+
import org.apache.flink.table.types.logical.TimestampType;
5765
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5866
import org.apache.flink.types.Row;
5967
import org.slf4j.Logger;
6068
import org.slf4j.LoggerFactory;
6169

70+
import java.sql.Timestamp;
6271
import java.time.LocalDateTime;
63-
import java.util.*;
64-
65-
import static org.apache.calcite.sql.SqlKind.*;
72+
import java.util.Arrays;
73+
import java.util.LinkedList;
74+
import java.util.List;
75+
import java.util.Map;
76+
import java.util.Queue;
77+
import java.util.Set;
78+
79+
import static org.apache.calcite.sql.SqlKind.AS;
80+
import static org.apache.calcite.sql.SqlKind.INSERT;
81+
import static org.apache.calcite.sql.SqlKind.SELECT;
82+
import static org.apache.calcite.sql.SqlKind.WITH_ITEM;
6683

6784
/**
6885
* Reason:
@@ -293,10 +310,7 @@ private Table getTableFromCache(Map<String, Table> localTableCache, String table
293310
*/
294311
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) {
295312
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
296-
if (CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))) {
297-
return true;
298-
}
299-
return false;
313+
return CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo));
300314
}
301315

302316
private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
@@ -439,8 +453,17 @@ private void joinFun(Object pollObj,
439453

440454
int length = leftTable.getSchema().getFieldDataTypes().length;
441455
LogicalType[] logicalTypes = new LogicalType[length];
442-
for(int i=0; i<length; i++){
456+
for (int i = 0; i < length; i++) {
443457
logicalTypes[i] = leftTable.getSchema().getFieldDataTypes()[i].getLogicalType();
458+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
459+
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
460+
logicalTypes[i] = new DecimalType(38, 18);
461+
}
462+
463+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
464+
(((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(LegacyLocalDateTimeTypeInfo.class))) {
465+
logicalTypes[i] = new TimestampType(TimestampType.MAX_PRECISION);
466+
}
444467
}
445468

446469
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -479,7 +502,22 @@ private void joinFun(Object pollObj,
479502
targetTable = localTableCache.get(joinInfo.getLeftTableName());
480503
}
481504

482-
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
505+
TypeInformation<?>[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
506+
for (int i = 0; i < fieldDataTypes.length; i++) {
507+
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
508+
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
509+
}
510+
511+
if (fieldDataTypes[i].getClass().equals(LegacyLocalDateTimeTypeInfo.class)) {
512+
fieldDataTypes[i] = LocalTimeTypeInfo.LOCAL_DATE_TIME;
513+
}
514+
515+
if (fieldDataTypes[i].getClass().equals(TimeIndicatorTypeInfo.class)) {
516+
fieldDataTypes[i] = TypeInformation.of(new TypeHint<Timestamp>() {});
517+
}
518+
}
519+
520+
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
483521

484522
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
485523
.filter(f -> f.f0)
@@ -551,9 +589,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
551589
String fieldType = filed[filed.length - 1].trim();
552590
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
553591
Class tableField = table.getSchema().getFieldType(i).get().getTypeClass();
554-
if (fieldClass == tableField) {
555-
continue;
556-
} else {
592+
if (fieldClass != tableField) {
557593
return false;
558594
}
559595
}

0 commit comments

Comments
 (0)