Skip to content

Commit

Permalink
Allow venice driver to load multiple clusters under the same schema (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan authored Feb 13, 2025
1 parent bf2d446 commit 51a6e16
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 42 deletions.
10 changes: 5 additions & 5 deletions deploy/samples/venicedb.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: venice-cluster0
name: venice
spec:
schema: VENICE-CLUSTER0
url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777
schema: VENICE
url: jdbc:venice://clusters=venice-cluster0;router.url=http://localhost:7777
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: venice-template-cluster0
name: venice-template
spec:
databases:
- venice-cluster0
- venice
connector: |
connector = venice
storeName = {{table}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.hoptimator.venice;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -35,8 +37,10 @@ public ClusterSchema(Properties properties) {

public void populate() throws InterruptedException, ExecutionException, IOException {
tableMap.clear();
String cluster = properties.getProperty("cluster");
log.info("Loading Venice stores for cluster {}", cluster);
String clusterStr = properties.getProperty("clusters");
List<String> clusters = Arrays.asList(clusterStr.split(","));

log.info("Loading Venice stores for cluster {}", clusters);

String sslConfigPath = properties.getProperty("ssl-config-path");
Optional<SSLFactory> sslFactory = Optional.empty();
Expand All @@ -47,12 +51,14 @@ public void populate() throws InterruptedException, ExecutionException, IOExcept
sslFactory = Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
}

try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) {
String[] stores = controllerClient.queryStoreList(false).getStores();
log.info("Loaded {} Venice stores.", stores.length);
for (String store : stores) {
StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
tableMap.put(store, createVeniceStore(storeSchemaFetcher));
for (String cluster : clusters) {
try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) {
String[] stores = controllerClient.queryStoreList(false).getStores();
log.info("Loaded {} Venice stores.", stores.length);
for (String store : stores) {
StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
tableMap.put(store, createVeniceStore(storeSchemaFetcher));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Locale;
import java.util.Properties;

import org.apache.calcite.avatica.ConnectStringParser;
Expand All @@ -16,7 +15,6 @@
/** JDBC driver for Venice stores. */
public class VeniceDriver extends Driver {
public static final String CATALOG_NAME = "VENICE";
public static final String CONFIG_NAME = "venice.config";

static {
new VeniceDriver().register();
Expand All @@ -41,14 +39,7 @@ public Connection connect(String url, Properties props) throws SQLException {
Properties properties = new Properties();
properties.putAll(props); // in case the driver is loaded via getConnection()
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));
String cluster = properties.getProperty("cluster");
if (cluster == null) {
throw new IllegalArgumentException("Missing required cluster property. Need: jdbc:venice://cluster=...");
}
cluster = cluster.toUpperCase(Locale.ROOT);
if (!cluster.startsWith(CATALOG_NAME)) {
cluster = CATALOG_NAME + "-" + cluster;
}

try {
Connection connection = super.connect(url, props);
if (connection == null) {
Expand All @@ -60,7 +51,7 @@ public Connection connect(String url, Properties props) throws SQLException {
SchemaPlus rootSchema = calciteConnection.getRootSchema();
ClusterSchema schema = createClusterSchema(properties);
schema.populate();
rootSchema.add(cluster.toUpperCase(Locale.ROOT), schema);
rootSchema.add(CATALOG_NAME, schema);
return connection;
} catch (Exception e) {
throw new SQLException("Problem loading " + url, e);
Expand Down
14 changes: 7 additions & 7 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-all.id
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
!set outputformat mysql
!use k8s

insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store";
insert into "VENICE"."test-store-1" select * from "VENICE"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
name: venice-test-store-1
namespace: flink
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
!set outputformat mysql
!use k8s

insert into "VENICE-CLUSTER0"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE-CLUSTER0"."test-store";
insert into "VENICE"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
name: venice-test-store-1
namespace: flink
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store`
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE`.`test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
8 changes: 4 additions & 4 deletions hoptimator-venice/src/test/resources/venice-ddl-select.id
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
!set outputformat mysql
!use k8s

select * from "VENICE-CLUSTER0"."test-store-1";
select * from "VENICE"."test-store-1";
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
Expand All @@ -12,11 +12,11 @@ spec:
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH ()
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ()
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store-1`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down

0 comments on commit 51a6e16

Please sign in to comment.