Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Data Skipping Index Part 3-2: Rule #482

Merged
merged 10 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ThisBuild / Test / fork := true

ThisBuild / Test / javaOptions += "-Xmx1024m"

// Needed to test both non-codegen and codegen parts of expressions
ThisBuild / Test / envVars += "SPARK_TESTING" -> "1"
clee704 marked this conversation as resolved.
Show resolved Hide resolved

ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim"

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.util
package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.First

trait ExprMatcher {

/**
* Returns true if the given expression matches the expression this matcher
* is associated with.
*/
def apply(e: Expression): Boolean
object FirstNullSafe {
def apply(child: Expression): First = First(child, Literal(false))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.First

object FirstNullSafe {
def apply(child: Expression): First = First(child, false)
}
11 changes: 1 addition & 10 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -189,15 +189,6 @@ class Hyperspace(spark: SparkSession) {
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
f
} finally {
ApplyHyperspace.disableForIndexMaintenance.set(false)
}
}
}

object Hyperspace extends ActiveSparkSession {
Expand Down
18 changes: 18 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,22 @@ object IndexConstants {
val DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE =
"spark.hyperspace.index.dataskipping.targetIndexDataFileSize"
val DATASKIPPING_TARGET_INDEX_DATA_FILE_SIZE_DEFAULT = "268435456" // 256 MiB

/**
* Maximum number of index data files.
*
* The number of index data files determined by targetIndexFileSize is
* capped by this value.
*/
val DATASKIPPING_MAX_INDEX_DATA_FILE_COUNT =
"spark.hyperspace.index.dataskipping.maxIndexDataFileCount"
val DATASKIPPING_MAX_INDEX_DATA_FILE_COUNT_DEFAULT = "10000"

/**
* If set to true, partition sketches for partition columns are included when
* creating data skipping indexes. This does not affect existing indexes.
*/
val DATASKIPPING_AUTO_PARTITION_SKETCH =
"spark.hyperspace.index.dataskipping.autoPartitionSketch"
val DATASKIPPING_AUTO_PARTITION_SKETCH_DEFAULT = "true"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, InMemoryFileIndex}

import com.microsoft.hyperspace.index.plananalysis.FilterReason

Expand Down Expand Up @@ -68,4 +69,17 @@ object IndexLogEntryTags {
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")

// DATASKIPPING_INDEX_DATA_PREDICATE stores the index predicate translated
// from the plan's filter or join condition.
val DATASKIPPING_INDEX_PREDICATE: IndexLogEntryTag[Option[Expression]] =
IndexLogEntryTag[Option[Expression]]("dataskippingIndexPredicate")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the index data.
val DATASKIPPING_INDEX_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("dataskippingIndexRelation")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the source data.
val DATASKIPPING_SOURCE_FILEINDEX: IndexLogEntryTag[FileIndex] =
IndexLogEntryTag[FileIndex]("dataskippingSourceRelation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index

import java.net.URLDecoder

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf

Expand Down Expand Up @@ -64,4 +65,12 @@ object IndexUtils {
*/
lazy val decodeInputFileName = udf(
(p: String) => URLDecoder.decode(p.replace("+", "%2B"), "UTF-8"))
clee704 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Returns the path part of the URI-like string.
*
* This can be used to compare the results of input_file_name() and the paths
* stored in FileIdTracker.
*/
lazy val getPath = udf((p: String) => new Path(p).toUri.getPath)
}
Loading