From b7e4aa9558671fbd331790586febc24ee3c244e3 Mon Sep 17 00:00:00 2001 From: tiezhu Date: Fri, 8 May 2020 14:58:18 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix=20standalone=E4=BB=BB=E5=8A=A1=E6=8F=90?= =?UTF-8?q?=E4=BA=A4-pluginLoadMode=20=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flink/sql/launcher/executor/StandaloneExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/StandaloneExecutor.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/StandaloneExecutor.java index 686337633..05b3f198c 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/StandaloneExecutor.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/StandaloneExecutor.java @@ -47,7 +47,7 @@ public StandaloneExecutor(JobParamsInfo jobParamsInfo) { public void exec() throws Exception { - Preconditions.checkArgument(!StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.CLASSPATH.name()), + Preconditions.checkArgument(StringUtils.equalsIgnoreCase(jobParamsInfo.getPluginLoadMode(), EPluginLoadMode.CLASSPATH.name()), "standalone only supports classpath mode"); JobGraph jobGraph = JobGraphBuildUtil.buildJobGraph(jobParamsInfo); From 5c6a06255b8e1ba51ee5791a0d741a372f62562b Mon Sep 17 00:00:00 2001 From: tiezhu Date: Fri, 8 May 2020 15:45:12 +0800 Subject: [PATCH 2/4] =?UTF-8?q?fix=20=E4=BF=AE=E5=A4=8Dhbase-side=20?= =?UTF-8?q?=E5=88=AB=E5=90=8D=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/sql/exec/ExecuteProcessHelper.java | 21 +++++++------- .../sql/side/hbase/HbaseAsyncReqRow.java | 29 ++++++++++++++----- .../flink/sql/side/hbase/RowKeyBuilder.java | 2 +- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java index 0c010fa47..43fbe2f59 100644 --- a/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java +++ b/core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java @@ -56,7 +56,6 @@ import org.apache.commons.io.Charsets; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -65,7 +64,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,9 +79,10 @@ import java.util.Set; /** - * 任务执行时的流程方法 + * 任务执行时的流程方法 * Date: 2020/2/17 * Company: www.dtstack.com + * * @author maqi */ public class ExecuteProcessHelper { @@ -126,11 +125,11 @@ public static ParamsInfo parseParams(String[] args) throws Exception { .setConfProp(confProperties) .setJarUrlList(jarUrlList) .build(); - } /** - * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 + * 非local模式或者shipfile部署模式,remoteSqlPluginPath必填 + * * @param remoteSqlPluginPath * @param deployMode * @param pluginLoadMode @@ -147,7 +146,7 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception { StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode()); - StreamTableEnvironment tableEnv = getStreamTableEnv(env,paramsInfo.getConfProp()); + StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp()); SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath()); SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql()); @@ -188,7 +187,7 @@ public static List getExternalJarUrls(String addJarListStr) throws java.io. private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv, - SqlTree sqlTree,Map sideTableMap, + SqlTree sqlTree, Map sideTableMap, Map registerTableCache) throws Exception { SideSqlExec sideSqlExec = new SideSqlExec(); @@ -251,13 +250,14 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List jarUrl } /** - * 向Flink注册源表和结果表,返回执行时插件包的全路径 + * 向Flink注册源表和结果表,返回执行时插件包的全路径 + * * @param sqlTree * @param env * @param tableEnv * @param localSqlPluginPath * @param remoteSqlPluginPath - * @param pluginLoadMode 插件加载模式 classpath or shipfile + * @param pluginLoadMode 插件加载模式 classpath or shipfile * @param sideTableMap * @param registerTableCache * @return @@ -322,7 +322,8 @@ public static Set registerTable(SqlTree sqlTree, StreamExecutionEnvironment } /** - * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph + * perjob模式将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph + * * @param env * @param classPathSet */ diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index e99137021..64c366cdf 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -16,8 +16,6 @@ * limitations under the License. */ - - package com.dtstack.flink.sql.side.hbase; import com.dtstack.flink.sql.enums.ECacheContentType; @@ -42,9 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -73,9 +69,9 @@ public class HbaseAsyncReqRow extends BaseAsyncReqRow { private transient AbstractRowKeyModeDealer rowKeyMode; - private String tableName; + private final String tableName; - private String[] colNames; + private final String[] colNames; public HbaseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, AbstractSideTableInfo sideTableInfo) { super(new HbaseAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo)); @@ -131,7 +127,7 @@ public void asyncInvoke(Tuple2 input, ResultFuture physicalFields) { + Collection values = physicalFields.values(); + Set keySet = physicalFields.keySet(); + if (!values.contains(realFieldName)) { + // TODO Error ? or Warn ? + LOG.warn(realFieldName + "不存在别名"); + } else { + for (String key : keySet) { + if (physicalFields.get(key).equals(realFieldName)) { + return key; + } + } + } + return realFieldName; + } + @Override public void close() throws Exception { super.close(); diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java index 6acfcb760..f80bf9f22 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/RowKeyBuilder.java @@ -101,7 +101,7 @@ public static String[] splitIgnoreQuotaBrackets(String str, String delimiter){ public ReplaceInfo getReplaceInfo(String field){ field = field.trim(); - if(field.length() <= 2){ + if(field.length() <= 0){ throw new RuntimeException(field + " \n" + "Format defined exceptions"); } From b2c9d7c2ef950dcc7b2dcb2eba35ecfa60441c75 Mon Sep 17 00:00:00 2001 From: liubin Date: Thu, 11 Jun 2020 19:56:48 +0800 Subject: [PATCH 3/4] =?UTF-8?q?local=E6=A8=A1=E5=BC=8Ftaskmanager.numberOf?= =?UTF-8?q?TaskSlots=E6=8C=87=E5=AE=9A=E5=A4=B1=E6=95=88=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flink/sql/environment/MyLocalStreamEnvironment.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java index 769f9f462..863bb7796 100644 --- a/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java +++ b/core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java @@ -105,13 +105,13 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "512M"); - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS.key(), jobGraph.getMaximumParallelism()); // add (and override) the settings with what the user defined configuration.addAll(this.conf); MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder(); configBuilder.setConfiguration(configuration); + configBuilder.setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism()); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); From 03473ede0aca3ee883d4a2db00c1a174110e20f6 Mon Sep 17 00:00:00 2001 From: zihjiang Date: Wed, 21 Oct 2020 18:32:26 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix-385=20=E4=BF=AE=E5=A4=8Des5=E5=92=8Ces?= =?UTF-8?q?=E7=9A=84sink=E6=8F=92=E4=BB=B6id=E5=8F=82=E6=95=B0=E4=B8=8D?= =?UTF-8?q?=E5=A1=AB=E6=97=B6,=20=E6=8A=A5=E7=A9=BA=E6=8C=87=E9=92=88?= =?UTF-8?q?=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sink/elasticsearch/ElasticsearchSink.java | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java index c503dc7e9..3466a4bcc 100644 --- a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java +++ b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * table output elastic5plugin @@ -172,24 +173,17 @@ public void setBulkFlushMaxActions(int bulkFlushMaxActions) { @Override public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { - ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo; - esTableInfo = elasticsearchTableInfo; - clusterName = elasticsearchTableInfo.getClusterName(); - String address = elasticsearchTableInfo.getAddress(); - String[] addr = StringUtils.split(address, ","); - esAddressList = Arrays.asList(addr); - index = elasticsearchTableInfo.getIndex(); - type = elasticsearchTableInfo.getEsType(); - String id = elasticsearchTableInfo.getId(); - String[] idField = StringUtils.split(id, ","); - idIndexList = new ArrayList<>(); - - for(int i = 0; i < idField.length; ++i) { - idIndexList.add(Integer.valueOf(idField[i])); + esTableInfo = (ElasticsearchTableInfo) targetTableInfo; + clusterName = esTableInfo.getClusterName(); + index = esTableInfo.getIndex(); + type = esTableInfo.getEsType(); + columnTypes = esTableInfo.getFieldTypes(); + esAddressList = Arrays.asList(esTableInfo.getAddress().split(",")); + String id = esTableInfo.getId(); + + if (!org.apache.commons.lang.StringUtils.isEmpty(id)) { + idIndexList = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList()); } - - columnTypes = elasticsearchTableInfo.getFieldTypes(); - return this; } }