Skip to content

Commit 73983ad

Browse files
committed
Enhancing NuFacet with spark.job.resolvedInputsMap spark session config property
1 parent 78946e4 commit 73983ad

File tree

3 files changed

+64
-21
lines changed

3 files changed

+64
-21
lines changed

integration/spark/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ bin/
44
*/spark-warehouse
55

66
.sdkmanrc
7+
8+
integration/spark/docker/notebooks/

integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java

+29-21
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,56 @@
66
package io.openlineage.spark.agent.facets;
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
9+
import com.fasterxml.jackson.core.JsonProcessingException;
910
import io.openlineage.client.OpenLineage;
1011
import io.openlineage.spark.agent.Versions;
1112

12-
import java.util.NoSuchElementException;
13-
import java.util.Properties;
13+
import java.util.*;
14+
1415
import lombok.Getter;
1516
import lombok.NonNull;
1617
import io.openlineage.spark.api.OpenLineageContext;
1718
import lombok.extern.slf4j.Slf4j;
1819
import org.apache.spark.sql.SparkSession;
1920

21+
import static io.openlineage.spark.agent.util.NuFacetsUtils.getConfigValue;
22+
import static io.openlineage.spark.agent.util.NuFacetsUtils.parseJsonToMap;
23+
2024
/** Captures information related to the Apache Spark job. */
2125
@Getter
2226
@Slf4j
2327
public class NuFacet extends OpenLineage.DefaultRunFacet {
24-
// @JsonProperty("jobId")
25-
// @NonNull
26-
// private Integer jobId;
27-
28-
// @JsonProperty("jobDescription")
29-
// private String jobDescription;
3028

3129
@JsonProperty("jobNurn")
3230
private String jobNurn;
3331

34-
private String fetchJobNurn(OpenLineageContext olContext) {
35-
if (olContext.getSparkSession().isPresent()) {
36-
SparkSession sparkSession = olContext.getSparkSession().get();
37-
try {
38-
return sparkSession.conf().get("spark.job.name");
39-
} catch (NoSuchElementException e) {
40-
log.warn("spark.job.name property not found in the context");
41-
return null;
42-
}
43-
}
32+
/**
33+
* Resolved inputs for the job.
34+
* Map of input dataset NURNs by their location path
35+
*/
36+
@JsonProperty("resolvedInputs")
37+
private Map<String, String> resolvedInputs;
4438

45-
log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext");
46-
return null;
39+
private String getJobNurn(SparkSession sparkSession) {
40+
return getConfigValue("spark.job.name", sparkSession);
41+
}
42+
43+
private Map<String, String> getResolvedInputs(SparkSession sparkSession) {
44+
String resolvedInputsJson = getConfigValue("spark.job.resolvedInputsMap", sparkSession);
45+
try {
46+
return parseJsonToMap(resolvedInputsJson);
47+
} catch (JsonProcessingException e) {
48+
log.warn("Error parsing resolvedInputsJson JSON", e);
49+
return null;
50+
}
4751
}
4852

4953
public NuFacet(@NonNull OpenLineageContext olContext) {
5054
super(Versions.OPEN_LINEAGE_PRODUCER_URI);
51-
this.jobNurn = fetchJobNurn(olContext);
55+
if (olContext.getSparkSession().isPresent()) {
56+
SparkSession sparkSession = olContext.getSparkSession().get();
57+
this.jobNurn = getJobNurn(sparkSession);
58+
this.resolvedInputs = getResolvedInputs(sparkSession);
59+
}
5260
}
5361
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.openlineage.spark.agent.util;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.spark.sql.SparkSession;
8+
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import java.util.NoSuchElementException;
12+
import java.util.Objects;
13+
14+
@Slf4j
15+
public class NuFacetsUtils {
16+
public static Map<String, String> parseJsonToMap(String jsonString) throws JsonProcessingException {
17+
if (Objects.isNull(jsonString)) {
18+
return null;
19+
}
20+
ObjectMapper mapper = new ObjectMapper();
21+
TypeReference<HashMap<String, String>> typeRef = new TypeReference<HashMap<String, String>>() {};
22+
return mapper.readValue(jsonString, typeRef);
23+
}
24+
25+
public static String getConfigValue(String key, SparkSession sparkSession) {
26+
try {
27+
return sparkSession.conf().get(key);
28+
} catch (NoSuchElementException e) {
29+
log.warn("Property {} not found in the context", key);
30+
return null;
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)