Skip to content

Commit 1bb71a8

Browse files
committed
ITN coverage for SQl engine
1 parent ef32134 commit 1bb71a8

20 files changed

+582
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
# Copyright © 2024 Cask Data, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4+
# use this file except in compliance with the License. You may obtain a copy of
5+
# the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations under
13+
# the License.
14+
15+
@BigQuery_Sink
16+
Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer
17+
18+
@BQ_SOURCE_JOINER_TEST @BQ_SOURCE_JOINER2_TEST @BQ_DELETE_JOIN @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
19+
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using Join
20+
Given Open Datafusion Project to configure pipeline
21+
When Expand Plugin group in the LHS plugins list: "Source"
22+
When Select plugin: "BigQuery" from the plugins list as: "Source"
23+
When Expand Plugin group in the LHS plugins list: "Analytics"
24+
When Select plugin: "Joiner" from the plugins list as: "Analytics"
25+
Then Navigate to the properties page of plugin: "BigQuery"
26+
Then Click plugin property: "switch-useConnection"
27+
Then Click on the Browse Connections button
28+
Then Select connection: "bqConnectionName"
29+
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
30+
And Replace input plugin property: "dataset" with value: "dataset"
31+
And Replace input plugin property: "table" with value: "bqSourceTable"
32+
Then Click on the Get Schema button
33+
Then Validate "BigQuery" plugin properties
34+
And Close the Plugin Properties page
35+
When Expand Plugin group in the LHS plugins list: "Source"
36+
When Select plugin: "BigQuery" from the plugins list as: "Source"
37+
When Expand Plugin group in the LHS plugins list: "Sink"
38+
When Select plugin: "BigQuery" from the plugins list as: "Sink"
39+
Then Connect plugins: "BigQuery" and "Joiner" to establish connection
40+
Then Connect plugins: "BigQuery2" and "Joiner" to establish connection
41+
Then Connect plugins: "Joiner" and "BigQuery3" to establish connection
42+
Then Navigate to the properties page of plugin: "BigQuery2"
43+
Then Click plugin property: "useConnection"
44+
Then Click on the Browse Connections button
45+
Then Select connection: "bqConnectionName"
46+
Then Enter input plugin property: "referenceName" with value: "BQRefName"
47+
Then Enter input plugin property: "dataset" with value: "dataset"
48+
Then Enter input plugin property: "table" with value: "bqSourceTable2"
49+
Then Validate "BigQuery2" plugin properties
50+
And Close the Plugin Properties page
51+
Then Navigate to the properties page of plugin: "Joiner"
52+
Then Select radio button plugin property: "conditionType" with value: "basic"
53+
Then Click on the Get Schema button
54+
Then Validate "Joiner" plugin properties
55+
Then Close the Plugin Properties page
56+
Then Navigate to the properties page of plugin: "BigQuery3"
57+
Then Click plugin property: "useConnection"
58+
Then Click on the Browse Connections button
59+
Then Select connection: "bqConnectionName"
60+
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
61+
Then Enter input plugin property: "dataset" with value: "dataset"
62+
Then Enter input plugin property: "table" with value: "bqTargetTable"
63+
Then Validate "BigQuery3" plugin properties
64+
Then Close the Plugin Properties page
65+
Then Save the pipeline
66+
Then Preview and run the pipeline
67+
Then Wait till pipeline preview is in running state
68+
Then Open and capture pipeline preview logs
69+
Then Verify the preview run status of pipeline in the logs is "succeeded"
70+
Then Close the pipeline logs
71+
Then Close the preview
72+
Then Deploy the pipeline
73+
Then Click on "Configure" button
74+
Then Click on "Transformation Pushdown" button
75+
Then Click on "Enable Transformation Pushdown" button
76+
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
77+
Then Click on "Advanced" button
78+
Then Click plugin property: "useConnection"
79+
Then Click on the Browse Connections button
80+
Then Select connection: "bqConnectionName"
81+
Then Click on "Save" button
82+
Then Run the Pipeline in Runtime
83+
Then Wait till pipeline is in running state
84+
Then Open and capture logs
85+
Then Close the pipeline logs
86+
Then Verify the pipeline status is "Succeeded"
87+
Then Validate The Data From BQ To BQ With Actual And Expected File for: "bqExpectedFileJoin"
88+
89+
@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
90+
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using group by
91+
Given Open Datafusion Project to configure pipeline
92+
When Expand Plugin group in the LHS plugins list: "Source"
93+
When Select plugin: "BigQuery" from the plugins list as: "Source"
94+
When Expand Plugin group in the LHS plugins list: "Sink"
95+
When Select plugin: "BigQuery" from the plugins list as: "Sink"
96+
When Expand Plugin group in the LHS plugins list: "Analytics"
97+
When Select plugin: "Group By" from the plugins list as: "Analytics"
98+
Then Navigate to the properties page of plugin: "BigQuery"
99+
Then Click plugin property: "switch-useConnection"
100+
Then Click on the Browse Connections button
101+
Then Select connection: "bqConnectionName"
102+
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
103+
And Replace input plugin property: "dataset" with value: "dataset"
104+
And Replace input plugin property: "table" with value: "bqSourceTable"
105+
Then Click on the Get Schema button
106+
Then Validate "BigQuery" plugin properties
107+
And Close the Plugin Properties page
108+
Then Connect plugins: "BigQuery" and "Group By" to establish connection
109+
Then Connect plugins: "Group By" and "BigQuery2" to establish connection
110+
Then Navigate to the properties page of plugin: "Group By"
111+
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidFirstField"
112+
Then Press Escape Key
113+
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidSecondField"
114+
Then Press Escape Key
115+
Then Enter GroupBy plugin Fields to be Aggregate "groupByGcsAggregateFields"
116+
Then Click on the Get Schema button
117+
Then Click on the Validate button
118+
Then Close the Plugin Properties page
119+
Then Navigate to the properties page of plugin: "BigQuery2"
120+
Then Click plugin property: "useConnection"
121+
Then Click on the Browse Connections button
122+
Then Select connection: "bqConnectionName"
123+
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
124+
Then Enter input plugin property: "dataset" with value: "dataset"
125+
Then Enter input plugin property: "table" with value: "bqTargetTable"
126+
Then Validate "BigQuery" plugin properties
127+
And Close the Plugin Properties page
128+
Then Save the pipeline
129+
Then Preview and run the pipeline
130+
Then Wait till pipeline preview is in running state
131+
Then Open and capture pipeline preview logs
132+
Then Verify the preview run status of pipeline in the logs is "succeeded"
133+
Then Close the pipeline logs
134+
Then Close the preview
135+
Then Deploy the pipeline
136+
Then Click on "Configure" button
137+
Then Click on "Transformation Pushdown" button
138+
Then Click on "Enable Transformation Pushdown" button
139+
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
140+
Then Click on "Advanced" button
141+
Then Click plugin property: "useConnection"
142+
Then Click on the Browse Connections button
143+
Then Select connection: "bqConnectionName"
144+
Then Click on "Save" button
145+
Then Run the Pipeline in Runtime
146+
Then Wait till pipeline is in running state
147+
Then Open and capture logs
148+
Then Close the pipeline logs
149+
Then Verify the pipeline status is "Succeeded"
150+
Then Validate The Data From BQ To BQ With Actual And Expected File for: "groupByTestOutputFile"
151+
152+
@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
153+
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using deduplicate
154+
Given Open Datafusion Project to configure pipeline
155+
When Expand Plugin group in the LHS plugins list: "Source"
156+
When Select plugin: "BigQuery" from the plugins list as: "Source"
157+
When Expand Plugin group in the LHS plugins list: "Sink"
158+
When Select plugin: "BigQuery" from the plugins list as: "Sink"
159+
When Expand Plugin group in the LHS plugins list: "Analytics"
160+
When Select plugin: "Deduplicate" from the plugins list as: "Analytics"
161+
Then Navigate to the properties page of plugin: "BigQuery"
162+
Then Click plugin property: "switch-useConnection"
163+
Then Click on the Browse Connections button
164+
Then Select connection: "bqConnectionName"
165+
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
166+
And Replace input plugin property: "dataset" with value: "dataset"
167+
And Replace input plugin property: "table" with value: "bqSourceTable"
168+
Then Click on the Get Schema button
169+
Then Validate "BigQuery" plugin properties
170+
And Close the Plugin Properties page
171+
Then Connect plugins: "BigQuery" and "Deduplicate" to establish connection
172+
Then Connect plugins: "Deduplicate" and "BigQuery2" to establish connection
173+
Then Navigate to the properties page of plugin: "Deduplicate"
174+
Then Select dropdown plugin property: "uniqueFields" with option value: "DeduplicateValidFirstField"
175+
Then Press Escape Key
176+
Then Click on the Validate button
177+
Then Close the Plugin Properties page
178+
Then Navigate to the properties page of plugin: "BigQuery2"
179+
Then Click plugin property: "useConnection"
180+
Then Click on the Browse Connections button
181+
Then Select connection: "bqConnectionName"
182+
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
183+
Then Enter input plugin property: "dataset" with value: "dataset"
184+
Then Enter input plugin property: "table" with value: "bqTargetTable"
185+
Then Validate "BigQuery" plugin properties
186+
And Close the Plugin Properties page
187+
Then Save the pipeline
188+
Then Preview and run the pipeline
189+
Then Wait till pipeline preview is in running state
190+
Then Open and capture pipeline preview logs
191+
Then Verify the preview run status of pipeline in the logs is "succeeded"
192+
Then Close the pipeline logs
193+
Then Close the preview
194+
Then Deploy the pipeline
195+
Then Click on "Configure" button
196+
Then Click on "Transformation Pushdown" button
197+
Then Click on "Enable Transformation Pushdown" button
198+
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
199+
Then Click on "Advanced" button
200+
Then Click plugin property: "useConnection"
201+
Then Click on the Browse Connections button
202+
Then Select connection: "bqConnectionName"
203+
Then Click on "Save" button
204+
Then Run the Pipeline in Runtime
205+
Then Wait till pipeline is in running state
206+
Then Open and capture logs
207+
Then Close the pipeline logs
208+
Then Verify the pipeline status is "Succeeded"
209+
Then Validate The Data From BQ To BQ With Actual And Expected File for: "deduplicateTestOutputFile"

src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
@CucumberOptions(
2727
features = {"src/e2e-test/features"},
2828
glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
29-
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
29+
"stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.groupby.actions",
30+
"io.cdap.plugin.groupby.locators", "io.cdap.plugin.groupby.stepsdesign"},
3031
tags = {"@BigQuery_Sink and not @CDAP-20830"},
3132
//TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830
3233
monochrome = true,

src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
@CucumberOptions(
2727
features = {"src/e2e-test/features"},
2828
glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
29-
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
29+
"stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.groupby.actions",
30+
"io.cdap.plugin.groupby.locators", "io.cdap.plugin.groupby.stepsdesign"},
3031
tags = {"@BigQuery_Sink_Required"},
3132
monochrome = true,
3233
//TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830

src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java

+10
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import stepsdesign.BeforeActions;
2626

2727
import java.io.IOException;
28+
import java.net.URISyntaxException;
2829

2930
/**
3031
* BigQuery Plugin validation common step design.
@@ -44,4 +45,13 @@ public void validateTheValuesOfRecordsTransferredToBQsinkIsEqualToTheValuesFromS
4445
Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " +
4546
"of the records in the source table", recordsMatched);
4647
}
48+
49+
@Then("Validate The Data From BQ To BQ With Actual And Expected File for: {string}")
50+
public void validateTheDataFromBQToBQWithActualAndExpectedFileFor(String expectedFile) throws IOException,
51+
InterruptedException, URISyntaxException {
52+
boolean recordsMatched = ValidationHelperSqlEngine.validateActualDataToExpectedData(
53+
PluginPropertyUtils.pluginProp("bqTargetTable"),
54+
PluginPropertyUtils.pluginProp(expectedFile));
55+
Assert.assertTrue("Value of records in actual and expected file is equal", recordsMatched);
56+
}
4757
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.bigquery.stepsdesign;
17+
18+
import com.esotericsoftware.minlog.Log;
19+
import com.google.cloud.bigquery.FieldValueList;
20+
import com.google.cloud.bigquery.TableResult;
21+
import com.google.gson.Gson;
22+
import com.google.gson.JsonElement;
23+
import com.google.gson.JsonObject;
24+
import io.cdap.e2e.utils.BigQueryClient;
25+
import io.cdap.e2e.utils.PluginPropertyUtils;
26+
import io.cucumber.core.logging.Logger;
27+
import io.cucumber.core.logging.LoggerFactory;
28+
29+
import java.io.BufferedReader;
30+
import java.io.FileReader;
31+
import java.io.IOException;
32+
import java.net.URISyntaxException;
33+
import java.nio.file.Path;
34+
import java.nio.file.Paths;
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
38+
/**
39+
* Validation Helper.
40+
*/
41+
public class ValidationHelperSqlEngine {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(ValidationHelperSqlEngine.class);
44+
static Gson gson = new Gson();
45+
46+
/**
47+
* Validates the actual data from a BigQuery table against the expected data from a file.
48+
*
49+
* @param table The name of the BigQuery table to fetch data from
50+
* @param fileName The name of the file containing the expected data
51+
* @return True if the actual data matches the expected data, otherwise false
52+
*/
53+
public static boolean validateActualDataToExpectedData(String table, String fileName) throws IOException,
54+
InterruptedException, URISyntaxException {
55+
// Initialize maps to store data from BigQuery and file
56+
Map<String, JsonObject> bigQueryMap = new HashMap<>();
57+
Map<String, JsonObject> fileMap = new HashMap<>();
58+
// Get the path of the expected file
59+
Path importExpectedFile = Paths.get(ValidationHelperSqlEngine.class.getResource("/" + fileName).toURI());
60+
61+
getBigQueryTableData(table, bigQueryMap);
62+
getFileData(importExpectedFile.toString(), fileMap);
63+
64+
// Compare the data from BigQuery with the data from the file
65+
boolean isMatched = bigQueryMap.equals(fileMap);
66+
67+
return isMatched;
68+
}
69+
70+
public static void getFileData(String fileName, Map<String, JsonObject> fileMap) {
71+
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
72+
String line;
73+
while ((line = br.readLine()) != null) {
74+
JsonObject json = gson.fromJson(line, JsonObject.class);
75+
if (json.has("id")) { // Check if the JSON object has the "id" key
76+
JsonElement idElement = json.get("id");
77+
if (idElement.isJsonPrimitive()) {
78+
String idKey = idElement.getAsString();
79+
fileMap.put(idKey, json);
80+
} else {
81+
Log.error("ID key not found");
82+
}
83+
}
84+
}
85+
} catch (IOException e) {
86+
System.err.println("Error reading the file: " + e.getMessage());
87+
}
88+
}
89+
90+
private static void getBigQueryTableData(String targetTable, Map<String, JsonObject> bigQueryMap)
91+
throws IOException, InterruptedException {
92+
String dataset = PluginPropertyUtils.pluginProp("dataset");
93+
String projectId = PluginPropertyUtils.pluginProp("projectId");
94+
String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + targetTable + "` AS t";
95+
TableResult result = BigQueryClient.getQueryResult(selectQuery);
96+
97+
for (FieldValueList row : result.iterateAll()) {
98+
JsonObject json = gson.fromJson(row.get(0).getStringValue(), JsonObject.class);
99+
if (json.has("id")) { // Check if the JSON object has the "id" key
100+
JsonElement idElement = json.get("id");
101+
if (idElement.isJsonPrimitive()) {
102+
String idKey = idElement.getAsString();
103+
bigQueryMap.put(idKey, json);
104+
} else {
105+
LOG.error("Data Mismatched");
106+
}
107+
} else {
108+
LOG.error("ID Key not found in JSON object");
109+
}
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)