Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add a notebook for ZOrderCoveringIndex #523

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 281 additions & 0 deletions notebooks/scala/Hyperspace ZOrderCoveringIndex.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Hyperspace ZOrderCoveringIndex"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"val sessionId = scala.util.Random.nextInt(1000000)\r\n",
"val dataPath = s\"/hyperspacetest/data-$sessionId\"\r\n",
"val indexPath = s\"/hyperspacetest/index-$sessionId\"\r\n",
"spark.conf.set(\"spark.hyperspace.system.path\", indexPath)\r\n",
"\r\n",
"val numFiles = 100"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"microsoft": {},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Data preparation"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.range(50000000).map { _ =>\r\n",
" (scala.util.Random.nextInt(10000000).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(10000000))\r\n",
"}.toDF(\"colA\", \"colB\", \"colC\").repartition(numFiles).write.mode(\"overwrite\").format(\"parquet\").save(dataPath)\r\n",
"\r\n",
"// 50M rows with random integers stored in numFiles parquet files"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"### Create index"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import com.microsoft.hyperspace.index.zordercovering._\r\n",
"import com.microsoft.hyperspace._\r\n",
"import com.microsoft.hyperspace.util.FileUtils\r\n",
"import org.apache.hadoop.fs.Path\r\n",
"\r\n",
"val totalSizeInBytes = FileUtils.getDirectorySize(new Path(dataPath))\r\n",
"val sizePerPartition = totalSizeInBytes / numFiles \r\n",
"spark.conf.set(\"spark.hyperspace.index.zorder.targetSourceBytesPerPartition\", sizePerPartition) // Default: 1G\r\n",
"// Changed per file size for z-order index for demonstration\r\n",
"\r\n",
"val df = spark.read.parquet(dataPath)\r\n",
"val hs = new Hyperspace(spark)\r\n",
"hs.createIndex(df, ZOrderCoveringIndexConfig(\"zorderTestIndex\", Seq(\"colA\", \"colB\"), Seq(\"colC\")))"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"def measureDuration(f : => Unit) {\r\n",
" val start = System.nanoTime\r\n",
" f\r\n",
" val durationInMS = (System.nanoTime - start) / 1000 / 1000\r\n",
" println(\"duration(ms): \" + durationInMS)\r\n",
"}"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Check performance with and without ZOrderCoveringIndex\r\n",
"\r\n",
"NOTE: performance gain will be different depending on query type, data size and computing environment. \r\n",
"As the test data is not huge, use small computing resource to see the improvement from Z-ordering."
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.disableHyperspace\r\n",
"val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n",
"println(filterQuery.queryExecution.sparkPlan)\r\n",
"measureDuration(filterQuery.count)\r\n",
"measureDuration(filterQuery.count)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.enableHyperspace\r\n",
"val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n",
"println(filterQuery.queryExecution.sparkPlan)\r\n",
"measureDuration(filterQuery.count)\r\n",
"measureDuration(filterQuery.count)"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Utility function for min/max skipping analysis\r\n",
"\r\n",
"We provide min/max based analysis utility function for any DataFrame.\r\n",
"The analysis function only works for numeric columns. \r\n",
"It'll collect min/max for each data file and generate analysis result.\r\n"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"import com.microsoft.hyperspace.util.MinMaxAnalysisUtil\r\n",
"val df = spark.read.parquet(dataPath) \r\n",
"\r\n",
"// Since source data is randomly generated, we need to check all files to find a value.\r\n",
"displayHTML(MinMaxAnalysisUtil.analyze(df, Seq(\"colA\", \"colB\"), format = \"html\")) // format \"text\" and \"html\" are available.\r\n"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"// As the index data is Z-ordered, we can skip reading unnecessary files based on min/max statistics.\r\n",
"displayHTML(MinMaxAnalysisUtil.analyzeIndex(spark, \"zorderTestIndex\", Seq(\"colA\", \"colB\"), format = \"html\")) "
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"language": "Python",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "scala"
},
"kernel_info": {
"name": "synapse_pyspark"
},
"description": null,
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}