Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Refactoring for an extensible Index API #443

Merged
merged 23 commits into from
Jun 9, 2021
Merged

Conversation

clee704
Copy link

@clee704 clee704 commented May 17, 2021

TODO

What is the context for this pull request?

What changes were proposed in this pull request?

  • Introduce common interfaces for indexes with which Hyperspace can
    manage various types of indexes.
  • Adjust IndexStatistics so that implementation-specific fields can be
    added. For instance, included columns are one of such fields now.
  • Actions work with generic indexes, not just covering indexes which are
    the only type we support at the moment.
  • Existing rules only work with covering indexes. New rules will be
    added along with new index types.

Does this PR introduce any user-facing change?

  • Serialization format of CoveringIndex, and thus IndexLogEntry, is changed.
  • The format of IndexStatistics is changed. It means the format of the
    dataframe returned by Hyperspace.indexes is also changed.
  • The user interface for users to create/manage indexes is not changed.

How was this patch tested?

With existing unit tests

@clee704 clee704 changed the title [WIP] Data skipping index [WIP] Data skipping indexes May 17, 2021
@clee704 clee704 force-pushed the ds branch 2 times, most recently from c90d93d to bfeddf1 Compare May 28, 2021 12:55
@clee704 clee704 changed the title [WIP] Data skipping indexes Refactoring for an extensible Index API May 28, 2021
@clee704 clee704 requested a review from sezruby May 28, 2021 13:41
@clee704 clee704 marked this pull request as ready for review May 28, 2021 13:52
Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No major comment other than naming :)

Could you rebase the change & update PR description to capture the public API change? e.g.

before:
hs.createIndex(df, IndexConfig("indexName", Seq("indexedCol"), Seq("includedCol")
after:
hs.createIndex(df, CoveringIndexConfig( ...

@@ -97,7 +97,8 @@ object RuleUtils {
val deletedBytesRatio = 1 - commonBytes / entry.sourceFilesSizeInBytes.toFloat

val deletedCnt = entry.sourceFileInfoSet.size - commonCnt
val isAppendAndDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn &&
val isAppendAndDeleteCandidate = hybridScanDeleteEnabled &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi change moved to CandidateIndexCollector

Copy link
Contributor

@andrei-ionescu andrei-ionescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very big PR with lots of changes and mixed concerns. I tried to cover it as best as I could.

Could you split it in multiple PRs?

I suggest at least create a separate PR for Python code. There are also the changes related to additional stats that can be a separate PR.

WDYT?

- Introduce common interfaces for indexes with which Hyperspace can
  manage various types of indexes.
- Adjust IndexStatistics so that implementation-specific fields can be
  added. For instance, included columns are one of such fields now.
- Actions work with generic indexes, not just covering indexes which are
  the only type we support at the moment.
- Existing rules only work with covering indexes. New rules will be
  added along with new index types.

Breaking changes:
- Serialization format of CoveringIndex is changed.
- IndexConfig is now a trait. To create a covering index, use
  CoveringIndexConfig.
- The format of IndexStatistics is changed. It means the format of the
  dataframe returned by Hyperspace.indexes is also changed.
@clee704
Copy link
Author

clee704 commented Jun 3, 2021

This is very big PR with lots of changes and mixed concerns. I tried to cover it as best as I could.

Could you split it in multiple PRs?

I suggest at least create a separate PR for Python code. There are also the changes related to additional stats that can be a separate PR.

WDYT?

I tried to split, but changes for Python code and additional stats are too small (< 100 lines) so it didn't make this PR much smaller. Also, to extract Python changes, the renaming of IndexConfig to CoveringIndexConfig should be done together. But it creates more changes than needed because every occurrence of IndexConfig should be changed unless there is the IndexConfig trait which is only in this PR.

@andrei-ionescu
Copy link
Contributor

@clee704

A PR with 100 lines is the perfect PR to review 😁.

Anyway, I understand that is hard to split and I'll try harder to understand the changes. In the mean time could you at least mark the pieces of code that were just moved from a place to another with some comments? Thanks.

@clee704
Copy link
Author

clee704 commented Jun 4, 2021

@clee704

A PR with 100 lines is the perfect PR to review 😁.

Anyway, I understand that is hard to split and I'll try harder to understand the changes. In the mean time could you at least mark the pieces of code that were just moved from a place to another with some comments? Thanks.

Actually, ~20 lines and ~60 lines respectively, and the main point was it didn't reduce the size of this PR much. It's like making 1100 lines of changes to 1000 lines.

I'll add some comments to help reviewers.

Chungmin Lee added 2 commits June 4, 2021 16:29
And restore CoveringIndexConfig to IndexConfig, to keep the user
interface compatibility
@clee704
Copy link
Author

clee704 commented Jun 4, 2021

Decided to keep IndexConfig and renamed the IndexConfig trait to IndexConfigTrait. Names might be changed in v1.0.

build.sbt Outdated Show resolved Hide resolved
build.sbt Outdated Show resolved Hide resolved
project/plugins.sbt Outdated Show resolved Hide resolved
python/run-tests.py Outdated Show resolved Hide resolved
Comment on lines -111 to -131
protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

val (indexDataFrame, resolvedIndexedColumns, _) =
prepareIndexDataFrame(spark, df, indexConfig)

// Run job
val repartitionedIndexDataFrame = {
// We are repartitioning with normalized columns (e.g., flattened nested column).
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(_.toNormalizedColumn): _*)
}

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
.saveWithBuckets(
repartitionedIndexDataFrame,
indexDataPath.toString,
numBuckets,
resolvedIndexedColumns.map(_.normalizedName),
SaveMode.Overwrite)
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to CoveringIndex.write

Comment on lines 208 to 281
def createIndexData(
ctx: IndexerContext,
sourceData: DataFrame,
indexedColumns: Seq[String],
includedColumns: Seq[String],
hasLineageColumn: Boolean): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = ctx.spark
val (resolvedIndexedColumns, resolvedIncludedColumns) =
resolveConfig(sourceData, indexedColumns, includedColumns)
val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn)

val indexData =
if (hasLineageColumn) {
val relation = IndexUtils.getRelation(spark, sourceData.queryExecution.optimizedPlan)

// Lineage is captured using two sets of columns:
// 1. DATA_FILE_ID_COLUMN column contains source data file id for each index record.
// 2. If source data is partitioned, all partitioning key(s) are added to index schema
// as columns if they are not already part of the schema.
val partitionColumnNames = relation.partitionSchema.map(_.name)
val resolvedColumnNames = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.name)
val missingPartitionColumns =
partitionColumnNames
.filter(ResolverUtils.resolve(spark, _, resolvedColumnNames).isEmpty)
.map(col)

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
// Long data type value. Each source data file has a unique file id, assigned by
// Hyperspace. We populate lineage column by joining these file ids with index records.
// The normalized path of source data file for each record is the join key.
// We normalize paths by removing extra preceding `/` characters in them,
// similar to the way they are stored in Content in an IndexLogEntry instance.
// Path normalization example:
// - Original raw path (output of input_file_name() udf, before normalization):
// + file:///C:/hyperspace/src/test/part-00003.snappy.parquet
// - Normalized path (used in join):
// + file:/C:/hyperspace/src/test/part-00003.snappy.parquet
import spark.implicits._
val dataPathColumn = "_data_path"
val lineagePairs = relation.lineagePairs(ctx.fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)

sourceData
.withColumn(dataPathColumn, input_file_name())
.join(lineageDF.hint("broadcast"), dataPathColumn)
.select(projectColumns ++ missingPartitionColumns :+ col(
IndexConstants.DATA_FILE_NAME_ID): _*)
} else {
sourceData.select(projectColumns: _*)
}

(indexData, resolvedIndexedColumns, resolvedIncludedColumns)
}

private def resolveConfig(
df: DataFrame,
indexedColumns: Seq[String],
includedColumns: Seq[String]): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan)

(resolvedIndexedColumns, resolvedIncludedColumns) match {
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns:\n${df.schema.treeString}")
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from CreateActionBase

Comment on lines 550 to 549
def hasLineageColumn: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean
}

def hasParquetAsSourceFormat: Boolean = {
relations.head.fileFormat.equals("parquet") ||
derivedDataset.properties.properties.getOrElse(
IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to CoveringIndex

Comment on lines -518 to -512
def bucketSpec: BucketSpec =
BucketSpec(
numBuckets = numBuckets,
bucketColumnNames = indexedColumns,
sortColumnNames = indexedColumns)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to CoveringIndex

Comment on lines -447 to -438
def schema: StructType =
DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to CoveringIndex

content.root.equals(that.content.root) &&
source.equals(that.source) &&
properties.equals(that.properties) &&
state.equals(that.state)
case _ => false
}

def numBuckets: Int = derivedDataset.properties.numBuckets
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to CoveringIndex

import org.apache.spark.sql.DataFrame

import com.microsoft.hyperspace.util.HyperspaceConf

/**
* IndexConfig specifies the configuration of an index.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revise the comment to indicate that IndexConfig is only for Covering index?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this IndexConfig (& trait) under com.microsoft.hyperspace.index.configs (or com.microsoft.hyperspace.index.types? not sure which way is better)
We can define below package object for backward compatibility if we use a new package name

package com.microsoft.hyperspace

package object index {
  val IndexConfig = configs.IndexConfig
  type IndexConfig = configs.IndexConfig
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revise the comment to indicate that IndexConfig is only for Covering index?

Done

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the namespace, please see my comment below.

@@ -131,6 +131,7 @@ object RuleUtils {
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
val ci = index.derivedDataset.asInstanceOf[CoveringIndex]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we extend these transformPlanToUse* functions for other index types?
Can we rename RuleUtils to CoveringIndexApplicator or CoveringIndexApplier?
or move these functions to CoveringIndex? or any other ideas?

Copy link
Author

@clee704 clee704 Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR I'm trying to minimize the size of changes. In another PR, I'll move remaining covering index related code close to CoveringIndex.

import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn

case class CoveringIndex(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a new package? com.microsoft.hyperspace.index.types?

Copy link
Author

@clee704 clee704 Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think horizontal packaging is better: putting CoveringIndex and IndexConfig (CoveringIndexConfig) in the same package because they are closely related.

In my working PR for data skipping indexes, I've created com.microsoft.hyperspace.index.dataskipping for classes such as DataSkippingIndex and DataSkippingIndexConfig.

This is similar to how source implementations are put in their own packages, e.g. com.microsoft.hyperspace.index.sources.delta has Delta Lake related implementations.

@clee704
Copy link
Author

clee704 commented Jun 7, 2021

Sorry, there were comments I've missed. Resolved some outdated comments regarding existing/moved code.

override val indexedColumns: Seq[String],
includedColumns: Seq[String],
// See schemaJson for more information about the annotation.
@JsonDeserialize(converter = classOf[StructTypeConverter.T]) schema: StructType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use schemaString, as discussed in #456? Other than this, LGTM! + please rebase onto master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if @clee704 is also fine with it :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that those macros are optional for this to work. Do you still prefer schemaString over just schema?

It seems schemaString has been used instead of just schema because of the Jackson serialization issue. Shouldn't we fix the issue in the right way, instead of working around it? In the entire code base, the string is constantly being converted to StructType with DataType.fromJson. With a few simple annotations, this is not necessary. Having a string in the field and converting it to a typed object every time it is accessed seems unusual and raises my eyebrows whenever I think about it. There should be other reasons than the Jackson issue to justify it.

I admit the practical gain might not be great, but it costs almost nothing. Also, however small, the gain is on the user's side, whereas the cost is on our side. Since the user base is orders of magnitude larger than us developing Hyperspace, shouldn't we prioritize them?

We already have many shim files, and they don't contribute much to the overall complexity of the project as they are independent of each other and we can inspect them individually. It's like linear complexity is cheap compared to quadratic or exponential complexities, which correspond to other parts of the codebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Relation.dataSchemaJson? Do we want to make it consistent? I would let @sezruby make the final decision.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Relation.dataSchemaJson? I'm okay w/ Shim + StructType if you prefer that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's handle that in another PR as it is not related.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for the great work, @clee704!

Would be good to capture the list of remaining tasks for refactoring in PR description, for reference.

@clee704 clee704 merged commit 9e1f702 into microsoft:master Jun 9, 2021
@clee704 clee704 deleted the ds branch June 9, 2021 08:38
@sezruby sezruby added this to the v0.5.0 milestone Jun 10, 2021
@clee704 clee704 added the enhancement New feature or request label Jun 15, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
breaking changes enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants