通过建表语句中的 PERIOD FOR SYSTEM_TIME
将表标识为维表,其中PRIMARY KEY(keyInfo)
中的keyInfo,表示用来和源表进行关联的字段,
维表JOIN的条件必须与keyInfo
字段一致。
此外, 针对oracle char类型自动补齐的特性,我们允许为char指定长度,确保维表查询时能够匹配数据。
注意:Oracle维表使用的字段大小写,需要和Oracle中定义的保持一致。
CREATE TABLE tableName(
colName cloType,
...
PRIMARY KEY(keyInfo),
PERIOD FOR SYSTEM_TIME
)WITH(
type='oracle',
url='jdbcUrl',
userName='dbUserName',
password='dbPwd',
tableName='tableName',
cache ='LRU',
schema = 'MQTEST',
parallelism ='1',
partitionedJoin='false'
);
10g 11g
oracle独有的参数配置:
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | 维表类型, oracle | 是 | |
url | 连接数据库 jdbcUrl | 是 | |
userName | 连接用户名 | 是 | |
password | 连接密码 | 是 | |
schema | 表空间 | 否 |
CREATE TABLE sideTable(
ID char(20), // oracle定义了char(20)
NAME varchar,
PRIMARY KEY (ID),
PERIOD FOR SYSTEM_TIME
)WITH(
type='oracle',
url = 'jdbc:oracle:thin:@172.16.8.178:1521:xe',
userName = 'system',
password = 'oracle',
tableName = 'SIDETEST1',
schema = 'dtstack',
cache = 'ALL',
cacheTTLMs ='60000'
);
create table sideTable(
channel char,
xccount int,
PRIMARY KEY(channel),
PERIOD FOR SYSTEM_TIME
)WITH(
type='oracle',
url='jdbc:oracle:thin:@xx.xx.xx.xx:1521:orcl',
userName='xx',
password='xx',
tableName='sidetest',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
cacheMode='unordered',
asyncCapacity='1000',
asyncTimeout='10000'
parallelism ='1',
partitionedJoin='false',
schema = 'MQTEST'
);
CREATE TABLE MyTable(
id varchar,
name varchar
--ts timestamp,
--tsDate Date
)WITH(
type ='kafka11',
bootstrapServers ='172.16.8.107:9092',
zookeeperQuorum ='172.16.8.107:2181/kafka',
offsetReset ='latest',
topic ='mqTest01',
timezone='Asia/Shanghai',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE sideTable(
ID char(20),
NAME varchar,
PRIMARY KEY (ID),
PERIOD FOR SYSTEM_TIME
)WITH(
type='oracle',
url = 'jdbc:oracle:thin:@172.16.8.178:1521:xe',
userName = 'system',
password = 'oracle',
tableName = 'SIDETEST1',
--schema = 'dtstack',
cache = 'LRU',
asyncPoolSize ='3'
);
CREATE TABLE MyResult(
NAME varchar,
ID char(20),
PRIMARY KEY (ID)
)WITH(
--type ='console',
type='oracle',
url = 'jdbc:oracle:thin:@172.16.8.178:1521:xe',
userName = 'system',
password = 'oracle',
tableName = 'SINK_TEST',
batchSize='1'
);
INSERT INTO MyResult
SELECT
s.ID as ID,
m.name as NAME
FROM MyTable m
LEFT JOIN
sideTable s
ON
m.id=s.ID