Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[WIP] Bloom filter Quick Implementation #363

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Hyperspace(spark: SparkSession) {
* @param df the DataFrame object to build index on.
* @param indexConfig the configuration of index to be created.
*/
def createIndex(df: DataFrame, indexConfig: IndexConfig): Unit = {
def createIndex(df: DataFrame, indexConfig: HyperSpaceIndexConfig): Unit = {
indexManager.create(df, indexConfig)
}

Expand Down
34 changes: 22 additions & 12 deletions src/main/scala/com/microsoft/hyperspace/actions/CreateAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.microsoft.hyperspace.actions

import scala.util.Try

import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
Expand All @@ -29,7 +28,7 @@ import com.microsoft.hyperspace.util.ResolverUtils
class CreateAction(
spark: SparkSession,
df: DataFrame,
indexConfig: IndexConfig,
indexConfig: HyperSpaceIndexConfig,
final override protected val logManager: IndexLogManager,
dataManager: IndexDataManager)
extends CreateActionBase(dataManager)
Expand Down Expand Up @@ -64,14 +63,19 @@ class CreateAction(
}
}

private def isValidIndexSchema(config: IndexConfig, dataFrame: DataFrame): Boolean = {
// Resolve index config columns from available column names present in the dataframe.
ResolverUtils
.resolve(
spark,
config.indexedColumns ++ config.includedColumns,
dataFrame.queryExecution.analyzed)
.isDefined
private def isValidIndexSchema(config: HyperSpaceIndexConfig, dataFrame: DataFrame): Boolean = {
// Resolve index config columns from available column names present in the schema.
config match {
case indexConfig: IndexConfig =>
ResolverUtils
.resolve(
spark,
indexConfig.indexedColumns ++ indexConfig.includedColumns,
dataFrame.queryExecution.analyzed)
.isDefined
case indexConfig: BloomFilterIndexConfig =>
ResolverUtils.resolve(spark, Seq(indexConfig.indexedColumn), dataFrame.queryExecution.analyzed).isDefined
}
}

// TODO: The following should be protected, but RefreshAction is calling CreateAction.op().
Expand All @@ -80,7 +84,13 @@ class CreateAction(

final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
// LogEntry instantiation may fail if index config is invalid. Hence the 'Try'.

val index = Try(logEntry.asInstanceOf[IndexLogEntry]).toOption
CreateActionEvent(appInfo, indexConfig, index, df.queryExecution.logical.toString, message)
CreateActionEvent(
appInfo,
IndexConfigBundle(indexConfig),
index,
df.queryExecution.logical.toString,
message)
}
}
157 changes: 135 additions & 22 deletions src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package com.microsoft.hyperspace.actions

import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.functions.{approx_count_distinct, col, input_file_name, udf}
import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.util.sketch.BloomFilter

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
Expand Down Expand Up @@ -56,7 +59,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
protected def getIndexLogEntry(
spark: SparkSession,
df: DataFrame,
indexConfig: IndexConfig,
indexConfig: HyperSpaceIndexConfig,
path: Path,
versionId: Int): IndexLogEntry = {
val hadoopConf = spark.sessionState.newHadoopConf()
Expand Down Expand Up @@ -90,13 +93,26 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

IndexLogEntry(
indexConfig.indexName,
CoveringIndex(
CoveringIndex.Properties(
CoveringIndex.Properties
.Columns(resolvedIndexedColumns, resolvedIncludedColumns),
IndexLogEntry.schemaString(indexDataFrame.schema),
numBuckets,
coveringIndexProperties)),
indexConfig match {
case _: IndexConfig =>
HyperSpaceIndex.CoveringIndex(
HyperSpaceIndex.Properties.Covering(
HyperSpaceIndex.Properties.CommonProperties
.Columns(resolvedIndexedColumns, resolvedIncludedColumns),
IndexLogEntry.schemaString(indexDataFrame.schema),
numBuckets,
coveringIndexProperties))
case _: BloomFilterIndexConfig =>
val bloomIndexProperties =
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)).toMap
HyperSpaceIndex.BloomFilterIndex(
HyperSpaceIndex.Properties.BloomFilter(
HyperSpaceIndex.Properties.CommonProperties
.Columns(resolvedIndexedColumns, resolvedIndexedColumns),
IndexLogEntry.schemaString(indexDataFrame.schema),
bloomIndexProperties))
case _ => throw HyperspaceException("Invalid Index Config.")
},
Content.fromDirectory(absolutePath, fileIdTracker, hadoopConf),
Source(SparkPlan(sourcePlanProperties)),
Map())
Expand All @@ -105,6 +121,57 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

protected def write(
spark: SparkSession,
df: DataFrame,
indexConfig: HyperSpaceIndexConfig): Unit = {
indexConfig match {
case ind: IndexConfig => write(spark, df, ind)
case ind: BloomFilterIndexConfig => write(spark, df, ind)
case _ => HyperspaceException("No write op supported")
}
}

private def write(
spark: SparkSession,
df: DataFrame,
indexConfig: BloomFilterIndexConfig): Unit = {
val (_, resolvedIndexedColumn, _) =
prepareIndexDataFrame(spark, df, indexConfig)

require(
resolvedIndexedColumn.size == 1,
"Resolved indexed columns for Bloom Filter can only be 1")

val resolvedNumBits = indexConfig.numBits match {
case -1 => BloomFilter.create(indexConfig.expectedNumItems).bitSize()
case _ => indexConfig.numBits
}

// TODO Begin has this op as relation is created there
// TODO Maybe use lineage to make file smaller
val relations = getRelation(spark, df).createRelationMetadata(fileIdTracker)
val bloomData = relations.rootPaths.par.map(
path => {
val bfByteStream = new ByteArrayOutputStream()
val localBF = spark.read.schema(df.schema)
.format(relations.fileFormat)
.options(relations.options)
.load(path)
.select(resolvedIndexedColumn.head)
.stat
.bloomFilter(resolvedIndexedColumn.head, indexConfig.expectedNumItems, resolvedNumBits)
localBF.writeTo(bfByteStream)
bfByteStream.close()
(path, bfByteStream.toByteArray.map(_.toChar).mkString)
}
)
val bloomDF = spark.createDataFrame(
bloomData.seq
).toDF("FileName", "Data")
bloomDF.write.parquet(new Path(indexDataPath, "bf.parquet").toString)
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

Expand Down Expand Up @@ -179,19 +246,29 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

private def prepareIndexDataFrame(
spark: SparkSession,
private def resolveBloomFilterIndexConfig(
df: DataFrame,
indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = {
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
val columnsFromIndexConfig =
resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1)
indexConfig: BloomFilterIndexConfig): Seq[(String, Boolean)] = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val indexedColumn = Seq(indexConfig.indexedColumn)
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumn, plan)

val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns
resolvedIndexedColumns match {
case Some(indexed) => indexed
case _ =>
throw HyperspaceException(
s"Columns '${indexedColumn}' could not be resolved " +
s"from available source columns '${df.schema.treeString}'")
}
}

val indexDF = if (hasLineage(spark)) {
private def resolveDataFrameForLineage(
spark: SparkSession,
df: DataFrame,
providedIndexColumns: Seq[String],
prefixedColumnsFromIndexConfig: Seq[String]): DataFrame = {
if (hasLineage(spark)) {
val relation = getRelation(spark, df)

// Lineage is captured using two sets of columns:
Expand All @@ -200,8 +277,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// as columns if they are not already part of the schema.
val partitionColumns = relation.partitionSchema.map(_.name)
val missingPartitionColumns =
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns
partitionColumns.filter(ResolverUtils.resolve(spark, _, providedIndexColumns).isEmpty)
val allIndexColumns = providedIndexColumns ++ missingPartitionColumns

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
// Long data type value. Each source data file has a unique file id, assigned by
Expand All @@ -225,10 +302,46 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
.select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+
col(IndexConstants.DATA_FILE_NAME_ID): _*)
} else {
df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*)
df.select(prepareColumns(providedIndexColumns, prefixedColumnsFromIndexConfig): _*)
}
}

(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
private def prepareIndexDataFrame(
spark: SparkSession,
df: DataFrame,
indexConfig: HyperSpaceIndexConfig): (DataFrame, Seq[String], Seq[String]) = {
indexConfig match {
case coveringIndexConfig: IndexConfig =>
val (resolvedIndexedColumns, resolvedIncludedColumns) =
resolveConfig(df, coveringIndexConfig)
val columnsFromIndexConfig =
resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1)

val prefixedIndexedColumns =
SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
val prefixedIncludedColumns =
SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
val prefixedColumnsFromIndexConfig =
prefixedIndexedColumns ++ prefixedIncludedColumns

val indexDF = resolveDataFrameForLineage(
spark, df, columnsFromIndexConfig, prefixedColumnsFromIndexConfig)

(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
case bloomIndexConfig: BloomFilterIndexConfig =>
val resolvedIndexColumn = resolveBloomFilterIndexConfig(df, bloomIndexConfig)

val columnsFromIndexConfig = resolvedIndexColumn.map(_._1)
val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexColumn)

val indexDF = resolveDataFrameForLineage(
spark, df, columnsFromIndexConfig, prefixedIndexedColumns)

(indexDF, prefixedIndexedColumns, df.columns)
case _ =>
throw HyperspaceException(
s"Cannot prepare dataframe ${df.schema} for ${indexConfig.indexName}")
}
}

private def prepareColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,21 @@ class OptimizeAction(
val hadoopConf = spark.sessionState.newHadoopConf()
val absolutePath = PathUtils.makeAbsolute(indexDataPath.toString, hadoopConf)
val newContent = Content.fromDirectory(absolutePath, fileIdTracker, hadoopConf)
val updatedDerivedDataset = previousIndexLogEntry.derivedDataset.copy(
properties = previousIndexLogEntry.derivedDataset.properties
.copy(
properties = Hyperspace
.getContext(spark)
.sourceProviderManager
.enrichIndexProperties(
previousIndexLogEntry.relations.head,
prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> endId.toString))))
val updatedDerivedDataset = {
previousIndexLogEntry.derivedDataset match {
case coveringIndex: HyperSpaceIndex.CoveringIndex =>
coveringIndex.copy(
coveringProperties =
coveringIndex.properties.asInstanceOf[HyperSpaceIndex.Properties.Covering]
.copy(
properties = Hyperspace
.getContext(spark)
.sourceProviderManager
.enrichIndexProperties(
previousIndexLogEntry.relations.head,
prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> endId.toString))))
}
}

if (filesToIgnore.nonEmpty) {
val filesToIgnoreDirectory = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class CachingIndexCollectionManager(
indexCache.clear()
}

override def create(df: DataFrame, indexConfig: IndexConfig): Unit = {
override def create(df: DataFrame, indexConfig: HyperSpaceIndexConfig): Unit = {
clearCache()
super.create(df, indexConfig)
}
Expand Down
Loading