diff --git a/notebooks/scala/Hyperspace ZOrderCoveringIndex.ipynb b/notebooks/scala/Hyperspace ZOrderCoveringIndex.ipynb new file mode 100644 index 000000000..8eb634f30 --- /dev/null +++ b/notebooks/scala/Hyperspace ZOrderCoveringIndex.ipynb @@ -0,0 +1,281 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Hyperspace ZOrderCoveringIndex" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "val sessionId = scala.util.Random.nextInt(1000000)\r\n", + "val dataPath = s\"/hyperspacetest/data-$sessionId\"\r\n", + "val indexPath = s\"/hyperspacetest/index-$sessionId\"\r\n", + "spark.conf.set(\"spark.hyperspace.system.path\", indexPath)\r\n", + "\r\n", + "val numFiles = 100" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "microsoft": {}, + "collapsed": true + } + }, + { + "cell_type": "markdown", + "source": [ + "### Data preparation" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "spark.range(50000000).map { _ =>\r\n", + " (scala.util.Random.nextInt(10000000).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(10000000))\r\n", + "}.toDF(\"colA\", \"colB\", \"colC\").repartition(numFiles).write.mode(\"overwrite\").format(\"parquet\").save(dataPath)\r\n", + "\r\n", + "// 50M rows with random integers stored in numFiles parquet files" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + }, + "collapsed": true + } + }, + { + "cell_type": "markdown", + "source": [ + "### Create index" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "import com.microsoft.hyperspace.index.zordercovering._\r\n", + "import com.microsoft.hyperspace._\r\n", + "import com.microsoft.hyperspace.util.FileUtils\r\n", + "import org.apache.hadoop.fs.Path\r\n", + "\r\n", + "val totalSizeInBytes = FileUtils.getDirectorySize(new Path(dataPath))\r\n", + "val sizePerPartition = totalSizeInBytes / numFiles \r\n", + "spark.conf.set(\"spark.hyperspace.index.zorder.targetSourceBytesPerPartition\", sizePerPartition) // Default: 1G\r\n", + "// Changed per file size for z-order index for demonstration\r\n", + "\r\n", + "val df = spark.read.parquet(dataPath)\r\n", + "val hs = new Hyperspace(spark)\r\n", + "hs.createIndex(df, ZOrderCoveringIndexConfig(\"zorderTestIndex\", Seq(\"colA\", \"colB\"), Seq(\"colC\")))" + ], + "outputs": [], + "execution_count": null, + "metadata": {} + }, + { + "cell_type": "code", + "source": [ + "def measureDuration(f : => Unit) {\r\n", + " val start = System.nanoTime\r\n", + " f\r\n", + " val durationInMS = (System.nanoTime - start) / 1000 / 1000\r\n", + " println(\"duration(ms): \" + durationInMS)\r\n", + "}" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "markdown", + "source": [ + "### Check performance with and without ZOrderCoveringIndex\r\n", + "\r\n", + "NOTE: performance gain will be different depending on query type, data size and computing environment. \r\n", + "As the test data is not huge, use small computing resource to see the improvement from Z-ordering." + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "spark.disableHyperspace\r\n", + "val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n", + "println(filterQuery.queryExecution.sparkPlan)\r\n", + "measureDuration(filterQuery.count)\r\n", + "measureDuration(filterQuery.count)" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "spark.enableHyperspace\r\n", + "val filterQuery = df.filter(\"colA > 758647 AND colA < 779999 AND colB > 10537919 AND colB < 10599715\")\r\n", + "println(filterQuery.queryExecution.sparkPlan)\r\n", + "measureDuration(filterQuery.count)\r\n", + "measureDuration(filterQuery.count)" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "markdown", + "source": [ + "### Utility function for min/max skipping analysis\r\n", + "\r\n", + "We provide min/max based analysis utility function for any DataFrame.\r\n", + "The analysis function only works for numeric columns. \r\n", + "It'll collect min/max for each data file and generate analysis result.\r\n" + ], + "metadata": { + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "import com.microsoft.hyperspace.util.MinMaxAnalysisUtil\r\n", + "val df = spark.read.parquet(dataPath) \r\n", + "\r\n", + "// Since source data is randomly generated, we need to check all files to find a value.\r\n", + "displayHTML(MinMaxAnalysisUtil.analyze(df, Seq(\"colA\", \"colB\"), format = \"html\")) // format \"text\" and \"html\" are available.\r\n" + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + }, + { + "cell_type": "code", + "source": [ + "// As the index data is Z-ordered, we can skip reading unnecessary files based on min/max statistics.\r\n", + "displayHTML(MinMaxAnalysisUtil.analyzeIndex(spark, \"zorderTestIndex\", Seq(\"colA\", \"colB\"), format = \"html\")) " + ], + "outputs": [], + "execution_count": null, + "metadata": { + "jupyter": { + "source_hidden": false, + "outputs_hidden": false + }, + "nteract": { + "transient": { + "deleting": false + } + } + } + } + ], + "metadata": { + "kernelspec": { + "name": "synapse_pyspark", + "language": "Python", + "display_name": "Synapse PySpark" + }, + "language_info": { + "name": "scala" + }, + "kernel_info": { + "name": "synapse_pyspark" + }, + "description": null, + "save_output": true, + "synapse_widget": { + "version": "0.1", + "state": {} + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} \ No newline at end of file