Skip to content

Commit 766d74d

Browse files
committed
Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
1 parent a29c06c commit 766d74d

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3737
import org.apache.flink.configuration.Configuration;
3838
import org.apache.flink.streaming.api.functions.async.ResultFuture;
39+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3940
import org.apache.flink.types.Row;
4041
import org.hbase.async.HBaseClient;
4142
import org.slf4j.Logger;
@@ -159,9 +160,12 @@ protected Row fillData(Row input, Object sideInput){
159160
Row row = new Row(sideInfo.getOutFieldInfoList().size());
160161
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
161162
Object obj = input.getField(entry.getValue());
162-
if(obj instanceof Timestamp){
163+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
164+
165+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
163166
obj = ((Timestamp)obj).getTime();
164167
}
168+
165169
row.setField(entry.getKey(), obj);
166170
}
167171

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
1515
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
1616
import org.apache.flink.configuration.Configuration;
17+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
1718
import org.apache.flink.types.Row;
1819
import org.apache.flink.util.Collector;
1920
import org.slf4j.Logger;
@@ -66,7 +67,10 @@ protected Row fillData(Row input, Object sideInput) {
6667
Row row = new Row(sideInfo.getOutFieldInfoList().size());
6768
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
6869
Object obj = input.getField(entry.getValue());
69-
if(obj instanceof Timestamp){
70+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
71+
72+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
73+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
7074
obj = ((Timestamp)obj).getTime();
7175
}
7276
row.setField(entry.getKey(), obj);

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
import io.vertx.ext.jdbc.JDBCClient;
3636
import io.vertx.ext.sql.SQLClient;
3737
import io.vertx.ext.sql.SQLConnection;
38+
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
3839
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3940
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4041
import org.apache.flink.configuration.Configuration;
4142
import org.apache.flink.streaming.api.functions.async.ResultFuture;
43+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4244
import org.apache.flink.types.Row;
4345
import org.slf4j.Logger;
4446
import org.slf4j.LoggerFactory;
@@ -185,9 +187,12 @@ public Row fillData(Row input, Object line){
185187
Row row = new Row(sideInfo.getOutFieldInfoList().size());
186188
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
187189
Object obj = input.getField(entry.getValue());
188-
if(obj instanceof Timestamp){
190+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
191+
192+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
189193
obj = ((Timestamp)obj).getTime();
190194
}
195+
191196
row.setField(entry.getKey(), obj);
192197
}
193198

0 commit comments

Comments
 (0)