CREATE TABLE tableName(
colName cloType,
...
PRIMARY KEY(keyInfo),
PERIOD FOR SYSTEM_TIME
)WITH(
type='impala',
url='jdbcUrl',
userName='dbUserName',
password='dbPwd',
tableName='tableName',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1',
partitionedJoin='false'
);
2.10.0-cdh5.13.0
impala独有的参数配置
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | 表明维表的类型[impala] | 是 | |
url | 连接postgresql数据库 jdbcUrl | 是 | |
userName | postgresql连接用户名 | 是 | |
password | postgresql连接密码 | 是 | |
tableName | postgresql表名称 | 是 | |
authMech | 身份验证机制 (0, 1, 2, 3), 暂不支持kerberos | 是 | 0 |
principal | kerberos用于登录的principal(authMech=1时独有) | authMech=1为必填 | |
keyTabFilePath | keytab文件的路径(authMech=1时独有) | authMech=1为必填 | |
krb5FilePath | krb5.conf文件路径(authMech=1时独有) | authMech=1为必填 | |
krbServiceName | Impala服务器的Kerberos principal名称(authMech=1时独有) | authMech=1为必填 | |
krbRealm | Kerberos的域名(authMech=1时独有) | 否 | HADOOP.COM |
enablePartition | 是否支持分区 | 否 | false |
partitionfields | 分区字段名 | 否,enablePartition='true'时为必填 | |
partitionFieldTypes | 分区字段类型 | 否,enablePartition='true'时为必填 | |
partitionValues | 分区值 | 否,enablePartition='true'时为必填 | |
cache | 维表缓存策略(NONE/LRU/ALL) | 否 | NONE |
partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量) | 否 | false |
// 定义全量维表
CREATE TABLE sideTable(
id INT,
name VARCHAR,
PRIMARY KEY(id) ,
PERIOD FOR SYSTEM_TIME
)WITH(
type ='mysql',
url ='jdbc:impala://localhost:21050/mqtest',
userName ='dtstack',
password ='1abc123',
tableName ='test_impala_all',
authMech='3',
cache ='ALL',
cacheTTLMs ='60000',
parallelism ='2',
partitionedJoin='false'
);
create table sideTable(
channel varchar,
xccount int,
PRIMARY KEY(channel),
PERIOD FOR SYSTEM_TIME
)WITH(
type='impala',
url='jdbc:impala://localhost:21050/mytest',
userName='dtstack',
password='abc123',
tableName='sidetest',
authMech='3',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1',
partitionedJoin='false'
);
CREATE TABLE MyTable(
id int,
name varchar
)WITH(
type ='kafka11',
bootstrapServers ='172.16.8.107:9092',
zookeeperQuorum ='172.16.8.107:2181/kafka',
offsetReset ='latest',
topic ='cannan_yctest01',
timezone='Asia/Shanghai',
enableKeyPartitions ='false',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE MyResult(
id INT,
name VARCHAR
)WITH(
type='impala',
url='jdbc:impala://localhost:21050/mytest',
userName='dtstack',
password='abc123',
tableName ='test_impala_zf',
updateMode ='append',
parallelism ='1',
batchSize ='100',
batchWaitInterval ='1000'
);
CREATE TABLE sideTable(
id INT,
name VARCHAR,
PRIMARY KEY(id) ,
PERIOD FOR SYSTEM_TIME
)WITH(
type='impala',
url='jdbc:impala://localhost:21050/mytest',
userName='dtstack',
password='abc123',
tableName ='test_impala_10',
partitionedJoin ='false',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
asyncPoolSize ='3',
parallelism ='1'
);
insert
into
MyResult
select
m.id,
s.name
from
MyTable m
join
sideTable s
on m.id=s.id;
注:分区字段放在最后面,如下,name是分区字段,放在channel,xccount字段的后面
create table sideTable(
channel varchar,
xccount int,
name varchar,
PRIMARY KEY(channel),
PERIOD FOR SYSTEM_TIME
)WITH(
type='impala',
url='jdbc:impala://localhost:21050/mytest',
userName='dtstack',
password='abc123',
tableName='sidetest',
authMech='3',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1',
enablePartition='true',
partitionfields='name',
partitionFieldTypes='varchar',
partitionValues='{"name":["tom","jeck"]}',
partitionedJoin='false'
);