Skip to content

Commit

Permalink
HIVE-11139: Emit more lineage information (Jimmy, reviewed by Szehon)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jimmy Xiang committed Jul 2, 2015
1 parent 470d9c8 commit cdd1c7b
Show file tree
Hide file tree
Showing 122 changed files with 7,398 additions and 498 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
<felix.version>2.4.0</felix.version>
<curator.version>2.6.0</curator.version>
<jsr305.version>3.0.0</jsr305.version>
<gson.version>2.2.4</gson.version>
</properties>

<repositories>
Expand Down
5 changes: 5 additions & 0 deletions ql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@
<artifactId>JavaEWAH</artifactId>
<version>${javaewah.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.iq80.snappy</groupId>
<artifactId>snappy</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public int compile(String command, boolean resetTaskIds) {
String operationName = ctx.getExplain() ?
HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType();
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
operationName);
operationName, getSchema(sem, conf));

conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);

Expand Down
9 changes: 8 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class QueryPlan implements Serializable {
protected LineageInfo linfo;
private TableAccessInfo tableAccessInfo;
private ColumnAccessInfo columnAccessInfo;
private Schema resultSchema;

private HashMap<String, String> idToTableNameMap;

Expand All @@ -111,7 +113,7 @@ public QueryPlan() {
}

public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
String operationName) {
String operationName, Schema resultSchema) {
this.queryString = queryString;

rootTasks = new ArrayList<Task<? extends Serializable>>();
Expand All @@ -133,6 +135,7 @@ public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, S
queryProperties = sem.getQueryProperties();
queryStartTime = startTime;
this.operationName = operationName;
this.resultSchema = resultSchema;
}

public String getQueryStr() {
Expand Down Expand Up @@ -683,6 +686,10 @@ public void setOutputs(HashSet<WriteEntity> outputs) {
this.outputs = outputs;
}

public Schema getResultSchema() {
return resultSchema;
}

public HashMap<String, String> getIdToTableNameMap() {
return idToTableNameMap;
}
Expand Down
12 changes: 12 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -47,6 +48,7 @@ static public enum HookType {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private LineageInfo linfo;
private Index depMap;
private UserGroupInformation ugi;
private HookType hookType;
final private Map<String, ContentSummary> inputPathToContentSummary;
Expand All @@ -67,8 +69,10 @@ public HookContext(QueryPlan queryPlan, HiveConf conf,
outputs = queryPlan.getOutputs();
ugi = Utils.getUGI();
linfo= null;
depMap = null;
if(SessionState.get() != null){
linfo = SessionState.get().getLineageState().getLineageInfo();
depMap = SessionState.get().getLineageState().getIndex();
}
this.userName = userName;
this.ipAddress = ipAddress;
Expand Down Expand Up @@ -127,6 +131,14 @@ public void setLinfo(LineageInfo linfo) {
this.linfo = linfo;
}

public Index getIndex() {
return depMap;
}

public void setIndex(Index depMap) {
this.depMap = depMap;
}

public UserGroupInformation getUgi() {
return ugi;
}
Expand Down
96 changes: 96 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections.SetUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
Expand Down Expand Up @@ -260,6 +263,25 @@ public void setColumn(FieldSchema column) {
public String toString() {
return tabAlias + ":" + column;
}

@Override
public int hashCode() {
return (column != null ? column.hashCode() : 7)
+ (tabAlias != null ? tabAlias.hashCode() : 11);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof BaseColumnInfo)) {
return false;
}
BaseColumnInfo ci = (BaseColumnInfo) obj;
return (column == null ? ci.column == null : column.equals(ci.column))
&& (tabAlias == null ? ci.tabAlias == null : tabAlias.equals(ci.tabAlias));
}
}

public static class TableAliasInfo implements Serializable {
Expand Down Expand Up @@ -311,6 +333,25 @@ public void setTable(Table table) {
public String toString() {
return table.getDbName() + "." + table.getTableName() + "(" + alias + ")";
}

@Override
public int hashCode() {
return (alias != null ? alias.hashCode() : 7)
+ (table != null ? table.hashCode() : 11);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof TableAliasInfo)) {
return false;
}
TableAliasInfo tabAlias = (TableAliasInfo) obj;
return StringUtils.equals(alias, tabAlias.alias)
&& (table == null ? tabAlias.table == null : table.equals(tabAlias.table));
}
}

/**
Expand Down Expand Up @@ -386,6 +427,61 @@ public String toString() {
}
}

/**
* This class tracks the predicate information for an operator.
*/
public static class Predicate {

/**
* Expression string for the predicate.
*/
private String expr;

/**
* The set of base columns that the predicate depends on.
*/
private Set<BaseColumnInfo> baseCols = new LinkedHashSet<BaseColumnInfo>();

/**
* @return the expr
*/
public String getExpr() {
return expr;
}

/**
* @param expr the expr to set
*/
public void setExpr(String expr) {
this.expr = expr;
}

/**
* @return the baseCols
*/
public Set<BaseColumnInfo> getBaseCols() {
return baseCols;
}

@Override
public int hashCode() {
return baseCols.hashCode() + (expr != null ? expr.hashCode() : 11);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof Predicate)) {
return false;
}
Predicate cond = (Predicate) obj;
return StringUtils.equals(cond.expr, expr)
&& SetUtils.isEqualSet(cond.baseCols, baseCols);
}
}

/**
* The map contains an index from the (datacontainer, columnname) to the
* dependency vector for that tuple. This is used to generate the
Expand Down
Loading

0 comments on commit cdd1c7b

Please sign in to comment.