|
| 1 | +## 1.格式: |
| 2 | +``` |
| 3 | +CREATE TABLE tableName( |
| 4 | + colName colType, |
| 5 | + bb INT |
| 6 | + )WITH( |
| 7 | + type ='elasticsearch7', |
| 8 | + address ='ip:port[,ip:port]', |
| 9 | + index ='index', |
| 10 | + id = 'field[,field]', |
| 11 | + authMesh = 'true', |
| 12 | + userName = 'userName', |
| 13 | + password = 'password', |
| 14 | + parallelism ='1' |
| 15 | + ) |
| 16 | +``` |
| 17 | +## 2.支持的版本 |
| 18 | + elasticsearch `7.x` |
| 19 | + |
| 20 | +## 3.表结构定义 |
| 21 | + |
| 22 | +|参数名称|含义| |
| 23 | +|----|---| |
| 24 | +|tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称| |
| 25 | +|colName|列名称| |
| 26 | +|colType|列类型 [colType支持的类型](../colType.md)| |
| 27 | + |
| 28 | +## 4.参数: |
| 29 | +|参数名称|含义|是否必填|默认值| |
| 30 | +|----|---|---|----| |
| 31 | +|type|表明 输出表类型[elasticsearch7]|是|| |
| 32 | +|address | 连接ES Http地址|是|| |
| 33 | +|index | 选择的ES上的index名称,支持静态索引和动态索引,动态索引示例: `{user_name}`|是|| |
| 34 | +|index_definition| 为ES定义索引的字段类型、别名以及shard数量|否| |
| 35 | +|id | 生成id的规则,根据字段名称定义文档ID|否|uuid| |
| 36 | +|authMesh | 是否进行用户名密码认证(xpack) | 否 | false| |
| 37 | +|userName | 用户名 | 否,authMesh='true'时为必填 || |
| 38 | +|password | 密码 | 否,authMesh='true'时为必填 || |
| 39 | +|parallelism | 并行度设置|否|1| |
| 40 | + |
| 41 | +## 5.完整样例: |
| 42 | +``` |
| 43 | +CREATE TABLE MyTable( |
| 44 | + channel varchar, |
| 45 | + pv int, |
| 46 | + )WITH( |
| 47 | + type ='kafka11', |
| 48 | + bootstrapServers ='172.16.8.107:9092', |
| 49 | + zookeeperQuorum ='172.16.8.107:2181/kafka', |
| 50 | + offsetReset ='latest', |
| 51 | + topic ='es_test', |
| 52 | + timezone='Asia/Shanghai', |
| 53 | + updateMode ='append', |
| 54 | + enableKeyPartitions ='false', |
| 55 | + topicIsPattern ='false', |
| 56 | + parallelism ='1' |
| 57 | + ); |
| 58 | +
|
| 59 | +CREATE TABLE MyResult( |
| 60 | + channel varchar, |
| 61 | + pv int |
| 62 | + )WITH( |
| 63 | + type ='elasticsearch7', |
| 64 | + address ='172.16.8.193:9200', |
| 65 | + authMesh='true', |
| 66 | + username='elastic', |
| 67 | + password='abc123', |
| 68 | + estype ='external', |
| 69 | + cluster ='docker-cluster', |
| 70 | + index ='myresult', |
| 71 | +-- index = '{pv}' # 动态索引写法 |
| 72 | + id ='pv', |
| 73 | + parallelism ='1' |
| 74 | + ); |
| 75 | +
|
| 76 | +CREATE TABLE sideTable( |
| 77 | + a varchar, |
| 78 | + b varchar, |
| 79 | + PRIMARY KEY(a) , |
| 80 | + PERIOD FOR SYSTEM_TIME |
| 81 | + )WITH( |
| 82 | + type ='elasticsearch7', |
| 83 | + address ='172.16.8.193:9200', |
| 84 | + index ='sidetest', |
| 85 | + authMesh='true', |
| 86 | + username='elastic', |
| 87 | + password='abc123', |
| 88 | + cache ='LRU', |
| 89 | + cacheSize ='10000', |
| 90 | + cacheTTLMs ='60000', |
| 91 | + partitionedJoin ='false', |
| 92 | + parallelism ='1' |
| 93 | + ); |
| 94 | +
|
| 95 | +insert |
| 96 | +into |
| 97 | + MyResult |
| 98 | + select |
| 99 | + w.b as pv, |
| 100 | + s.channel as channel |
| 101 | + from |
| 102 | + MyTable s |
| 103 | + join |
| 104 | + sideTable w |
| 105 | + on s.pv = w.a |
| 106 | + where |
| 107 | + w.a = '10' |
| 108 | + and s.channel='xc'; |
| 109 | + ``` |
| 110 | +## 6.结果数据 |
| 111 | +### 输入数据 |
| 112 | +``` |
| 113 | +{"channel":"xc26","pv":10,"xctime":1232312} |
| 114 | +``` |
| 115 | +### 输出数据 |
| 116 | +``` |
| 117 | +http://localhost:9200/myresult/_search |
| 118 | +{"_index":"myresult","_type":"external","_id":"8aX_DHIBn3B7OBuqFl-i","_score":1.0,"_source":{"pv":"10","channel":"xc26"}} |
| 119 | +``` |
0 commit comments