Skip to content

Commit e0a7da9

Browse files
committed
Merge remote-tracking branch 'git_flinkStreamSQL/1.10_release' into 1.10_release
2 parents 5d2d320 + d333331 commit e0a7da9

File tree

10 files changed

+44
-17
lines changed

10 files changed

+44
-17
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,15 @@ public abstract class BaseSideInfo implements Serializable{
7575

7676
protected AbstractSideCache sideCache;
7777

78+
protected JoinInfo joinInfo;
79+
7880
public BaseSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
7981
AbstractSideTableInfo sideTableInfo){
8082
this.rowTypeInfo = rowTypeInfo;
8183
this.outFieldInfoList = outFieldInfoList;
8284
this.joinType = joinInfo.getJoinType();
8385
this.sideTableInfo = sideTableInfo;
86+
this.joinInfo = joinInfo;
8487
parseSelectFields(joinInfo);
8588
buildEqualInfo(joinInfo, sideTableInfo);
8689
}
@@ -178,9 +181,7 @@ private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTa
178181
*/
179182
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
180183
String tableName = identifier.getComponent(0).getSimple();
181-
String sideTableName = sideTableInfo.getName();
182-
String errorMsg = "only support set side table constant field, error field " + identifier;
183-
Preconditions.checkState(tableName.equals(sideTableName), errorMsg);
184+
checkSupport(identifier);
184185
String fieldName = identifier.getComponent(1).getSimple();
185186
Object constant = literal.getValue();
186187
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
@@ -194,6 +195,22 @@ private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier)
194195
predicateInfos.add(predicate);
195196
}
196197

198+
private void checkSupport(SqlIdentifier identifier) {
199+
String tableName = identifier.getComponent(0).getSimple();
200+
String sideTableName;
201+
String sideTableAlias;
202+
if (joinInfo.isLeftIsSideTable()) {
203+
sideTableName = joinInfo.getLeftTableName();
204+
sideTableAlias = joinInfo.getLeftTableAlias();
205+
} else {
206+
sideTableName = joinInfo.getRightTableName();
207+
sideTableAlias = joinInfo.getRightTableAlias();
208+
}
209+
boolean isSide = tableName.equals(sideTableName) || tableName.equals(sideTableAlias);
210+
String errorMsg = "only support set side table constant field, error field " + identifier;
211+
Preconditions.checkState(isSide, errorMsg);
212+
}
213+
197214
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
198215
String errorMsg = "can't deal equal field: " + sqlNode;
199216
equalFieldList.add(sideTableField);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ public class JoinInfo implements Serializable {
5555

5656
private String rightTableAlias;
5757

58-
private SqlNode leftNode;
58+
private transient SqlNode leftNode;
5959

60-
private SqlNode rightNode;
60+
private transient SqlNode rightNode;
6161

62-
private SqlNode condition;
62+
private transient SqlNode condition;
6363

64-
private SqlNode selectFields;
64+
private transient SqlNode selectFields;
6565

66-
private SqlNode selectNode;
66+
private transient SqlNode selectNode;
6767

6868
private JoinType joinType;
6969

core/src/main/java/com/dtstack/flink/sql/util/KrbUtils.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020

2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.security.UserGroupInformation;
23+
import org.apache.hadoop.security.authentication.util.KerberosName;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
26+
import sun.security.krb5.Config;
27+
import sun.security.krb5.KrbException;
2528

2629
import java.io.IOException;
2730

@@ -40,9 +43,16 @@ public class KrbUtils {
4043
// public static final String FALSE_STR = "false";
4144
// public static final String SUBJECT_ONLY_KEY = "javax.security.auth.useSubjectCredsOnly";
4245

43-
public static UserGroupInformation getUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
46+
public static UserGroupInformation loginAndReturnUgi(String principal, String keytabPath, String krb5confPath) throws IOException {
4447
LOG.info("Kerberos login with principal: {} and keytab: {}", principal, keytabPath);
4548
System.setProperty(KRB5_CONF_KEY, krb5confPath);
49+
// 不刷新会读/etc/krb5.conf
50+
try {
51+
Config.refresh();
52+
KerberosName.resetDefaultRealm();
53+
} catch (KrbException e) {
54+
LOG.warn("resetting default realm failed, current default realm will still be used.", e);
55+
}
4656
// TODO 尚未探索出此选项的意义,以后研究明白方可打开
4757
// System.setProperty(SUBJECT_ONLY_KEY, FALSE_STR);
4858
Configuration configuration = new Configuration();

core/src/test/java/com/dtstack/flink/sql/util/KrbUtilsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
**/
3131
public class KrbUtilsTest {
3232
@Test
33-
public void testGetUgi() throws IOException {
33+
public void testLoginAndReturnUgi() throws IOException {
3434
String principal = "";
3535
String keytabPath = "";
3636
String krb5confPath = "";
3737
try {
38-
KrbUtils.getUgi(principal, keytabPath, krb5confPath);
38+
KrbUtils.loginAndReturnUgi(principal, keytabPath, krb5confPath);
3939
} catch (IllegalArgumentException e) {
4040
Assert.assertEquals(e.getMessage(), "Can't get Kerberos realm");
4141
}

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Connection getConn(String dbUrl, String userName, String password) {
7171
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7272
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7373
String principal = impalaSideTableInfo.getPrincipal();
74-
UserGroupInformation ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
74+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
7575
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
7676
@Override
7777
public Connection run() throws SQLException {

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void open(Configuration parameters) throws Exception {
7070
String keyTabFilePath = impalaSideTableInfo.getKeyTabFilePath();
7171
String krb5FilePath = impalaSideTableInfo.getKrb5FilePath();
7272
String principal = impalaSideTableInfo.getPrincipal();
73-
ugi = KrbUtils.getUgi(principal, keyTabFilePath, krb5FilePath);
73+
ugi = KrbUtils.loginAndReturnUgi(principal, keyTabFilePath, krb5FilePath);
7474
openJdbc(parameters);
7575
} else {
7676
openJdbc(parameters);

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private void initScheduledTask(Long batchWaitInterval) {
208208

209209
private void openConnect() throws IOException {
210210
if (authMech == 1) {
211-
UserGroupInformation ugi = KrbUtils.getUgi(principal, keytabPath, krb5confPath);
211+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(principal, keytabPath, krb5confPath);
212212
try {
213213
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
214214
openJdbc();

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private KuduClient getClient(KuduSideTableInfo tableInfo) throws IOException {
210210
}
211211

212212
if (tableInfo.isEnableKrb()) {
213-
UserGroupInformation ugi = KrbUtils.getUgi(tableInfo.getPrincipal(), tableInfo.getKeytab(), tableInfo.getKrb5conf());
213+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(tableInfo.getPrincipal(), tableInfo.getKeytab(), tableInfo.getKrb5conf());
214214
return ugi.doAs(new PrivilegedAction<KuduClient>() {
215215
@Override
216216
public KuduClient run() {

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private AsyncKuduClient getClient() throws IOException {
119119
}
120120

121121
if (kuduSideTableInfo.isEnableKrb()) {
122-
UserGroupInformation ugi = KrbUtils.getUgi(
122+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
123123
kuduSideTableInfo.getPrincipal(),
124124
kuduSideTableInfo.getKeytab(),
125125
kuduSideTableInfo.getKrb5conf()

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private void establishConnection() throws IOException {
116116
}
117117

118118
if (enableKrb) {
119-
UserGroupInformation ugi = KrbUtils.getUgi(
119+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
120120
principal,
121121
keytab,
122122
krb5conf

0 commit comments

Comments
 (0)