-
Notifications
You must be signed in to change notification settings - Fork 114
Index nested fields #365
base: master
Are you sure you want to change the base?
Index nested fields #365
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,15 @@ package com.microsoft.hyperspace.actions | |
import org.apache.hadoop.fs.Path | ||
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} | ||
import org.apache.spark.sql.catalyst.plans.logical.LeafNode | ||
import org.apache.spark.sql.functions.input_file_name | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} | ||
import org.apache.spark.sql.functions.{col, input_file_name} | ||
import org.apache.spark.sql.types.StructType | ||
|
||
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} | ||
import com.microsoft.hyperspace.index._ | ||
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer | ||
import com.microsoft.hyperspace.index.sources.FileBasedRelation | ||
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} | ||
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils} | ||
|
||
/** | ||
* CreateActionBase provides functionality to write dataframe as covering index. | ||
|
@@ -73,7 +75,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s))))) | ||
|
||
val coveringIndexProperties = | ||
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)).toMap | ||
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation) ++ | ||
usesNestedFieldsProperty(indexConfig)).toMap | ||
|
||
IndexLogEntry( | ||
indexConfig.indexName, | ||
|
@@ -109,6 +112,14 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
} | ||
} | ||
|
||
private def usesNestedFieldsProperty(indexConfig: IndexConfig): Option[(String, String)] = { | ||
if (SchemaUtils.hasNestedFields(indexConfig.indexedColumns ++ indexConfig.includedColumns)) { | ||
Some(IndexConstants.USES_NESTED_FIELDS_PROPERTY -> "true") | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { | ||
val numBuckets = numBucketsForIndex(spark) | ||
|
||
|
@@ -117,7 +128,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
|
||
// run job | ||
val repartitionedIndexDataFrame = | ||
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*) | ||
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"$c")): _*) | ||
|
||
// Save the index with the number of buckets specified. | ||
repartitionedIndexDataFrame.write | ||
|
@@ -144,9 +155,9 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
df: DataFrame, | ||
indexConfig: IndexConfig): (Seq[String], Seq[String]) = { | ||
val spark = df.sparkSession | ||
val dfColumnNames = df.schema.fieldNames | ||
val indexedColumns = indexConfig.indexedColumns | ||
val includedColumns = indexConfig.includedColumns | ||
val dfColumnNames = SchemaUtils.flatten(df.schema) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some comment here? for |
||
val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some comment why "unescapeFieldNames" required here? e.g. nested column names are stored as escaped in index log entry. |
||
val includedColumns = SchemaUtils.unescapeFieldNames(indexConfig.includedColumns) | ||
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames) | ||
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames) | ||
|
||
|
@@ -177,8 +188,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
// 2. If source data is partitioned, all partitioning key(s) are added to index schema | ||
// as columns if they are not already part of the schema. | ||
val partitionColumns = relation.partitionSchema.map(_.name) | ||
val missingPartitionColumns = partitionColumns.filter( | ||
ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty) | ||
val missingPartitionColumns = | ||
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty) | ||
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns | ||
|
||
// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a | ||
|
@@ -202,10 +213,16 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) | |
.select( | ||
allIndexColumns.head, | ||
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*) | ||
.toDF( | ||
SchemaUtils.escapeFieldNames(allIndexColumns) :+ IndexConstants.DATA_FILE_NAME_ID: _*) | ||
} else { | ||
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*) | ||
.toDF(SchemaUtils.escapeFieldNames(columnsFromIndexConfig): _*) | ||
} | ||
|
||
(indexDF, resolvedIndexedColumns, resolvedIncludedColumns) | ||
val escapedIndexedColumns = SchemaUtils.escapeFieldNames(resolvedIndexedColumns) | ||
val escapedIncludedColumns = SchemaUtils.escapeFieldNames(resolvedIncludedColumns) | ||
|
||
(indexDF, escapedIndexedColumns, escapedIncludedColumns) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,4 +109,7 @@ object IndexConstants { | |
// To provide multiple paths in the globbing pattern, separate them with commas, e.g. | ||
// "/temp/1/*, /temp/2/*" | ||
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern" | ||
|
||
// Indicate whether the index has been built over a nested field. | ||
private[hyperspace] val USES_NESTED_FIELDS_PROPERTY = "hasNestedFields" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you move this up around 104 lines? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,12 +110,23 @@ private[hyperspace] case class BucketUnionExec(children: Seq[SparkPlan], bucketS | |
override def output: Seq[Attribute] = children.head.output | ||
|
||
override def outputPartitioning: Partitioning = { | ||
assert(children.map(_.outputPartitioning).toSet.size == 1) | ||
assert(children.head.outputPartitioning.isInstanceOf[HashPartitioning]) | ||
assert( | ||
children.head.outputPartitioning | ||
.asInstanceOf[HashPartitioning] | ||
.numPartitions == bucketSpec.numBuckets) | ||
children.head.outputPartitioning | ||
val parts = children.map(_.outputPartitioning).distinct | ||
assert(parts.forall(_.isInstanceOf[HashPartitioning])) | ||
assert(parts.forall(_.numPartitions == bucketSpec.numBuckets)) | ||
|
||
val reduced = parts.reduceLeft { (a, b) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some comments about |
||
val h1 = a.asInstanceOf[HashPartitioning] | ||
val h2 = b.asInstanceOf[HashPartitioning] | ||
val h1Name = h1.references.head.name | ||
val h2Name = h2.references.head.name | ||
val same = h1Name.contains(h2Name) || h2Name.contains(h1Name) | ||
assert(same) | ||
if (h1Name.length > h2Name.length) { | ||
h1 | ||
} else { | ||
h2 | ||
} | ||
} | ||
reduced | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,19 +16,22 @@ | |
|
||
package com.microsoft.hyperspace.index.rules | ||
|
||
import scala.util.Try | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.catalyst.analysis.CleanupAliases | ||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} | ||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, GetStructField} | ||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.types.{DataType, StructType} | ||
|
||
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} | ||
import com.microsoft.hyperspace.actions.Constants | ||
import com.microsoft.hyperspace.index.IndexLogEntry | ||
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker | ||
import com.microsoft.hyperspace.index.sources.FileBasedRelation | ||
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} | ||
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} | ||
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils, SchemaUtils} | ||
|
||
/** | ||
* FilterIndex rule looks for opportunities in a logical plan to replace | ||
|
@@ -113,8 +116,8 @@ object FilterIndexRule | |
|
||
val candidateIndexes = allIndexes.filter { index => | ||
indexCoversPlan( | ||
outputColumns, | ||
filterColumns, | ||
SchemaUtils.escapeFieldNames(outputColumns), | ||
SchemaUtils.escapeFieldNames(filterColumns), | ||
index.indexedColumns, | ||
index.includedColumns) | ||
} | ||
|
@@ -168,9 +171,19 @@ object ExtractFilterNode { | |
val projectColumnNames = CleanupAliases(project) | ||
.asInstanceOf[Project] | ||
.projectList | ||
.map(_.references.map(_.asInstanceOf[AttributeReference].name)) | ||
.map(PlanUtils.extractNamesFromExpression) | ||
.flatMap(_.toSeq) | ||
val filterColumnNames = condition.references.map(_.name).toSeq | ||
val filterColumnNames = PlanUtils | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some comments here? like - if there are both a nested column and its parent(?) column in filter condition, only output the nested column name. (+ reason would be good) |
||
.extractNamesFromExpression(condition) | ||
.toSeq | ||
.sortBy(-_.length) | ||
.foldLeft(Seq.empty[String]) { (acc, e) => | ||
if (!acc.exists(i => i.startsWith(e))) { | ||
acc :+ e | ||
} else { | ||
acc | ||
} | ||
} | ||
|
||
Some(project, filter, projectColumnNames, filterColumnNames) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.