Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Data Skipping Index Part 2: Basics #461

Merged
merged 21 commits into from
Aug 9, 2021
Merged

Data Skipping Index Part 2: Basics #461

merged 21 commits into from
Aug 9, 2021

Conversation

clee704
Copy link

@clee704 clee704 commented Jun 10, 2021

This PR has dependency on other PRs - if you want to review this PR only, please look at the last commit.

What is the context for this pull request?

What changes were proposed in this pull request?

Implement DataSkippingIndex and MinMaxSketch.

The index data looks like this:

_data_file_id min_A_ max_A_
146 1834 1845
763 9585 9596
351 4409 4421
148 1859 1870
758 9522 9534

The following data is index creation times of CoveringIndex and DataSkippingIndex for varying numbers of files for the same dataset (TPC-H 1TB). Index configs are chosen so that they will be useful for TPC-H Q6. Measurements are done on Azure Synapse with 199 executors.

Index configurations:

  • CoveringIndex (indexed: l_shipdate, l_discount, l_quantity, included: l_extendedprice)
  • DataSkippingIndex (MinMaxSketch/ValueListSketch on l_shipdate/l_discount/l_quantity)
Index type 1K files 10K files 100K files
CoveringIndex 3m 5s 3m 12s 4m 54s
DataSkippingIndex 27s 54s 2m 24s

Does this PR introduce any user-facing change?

Kind of. Users can create data skipping indexes by hs.createIndex(df, DataSkippingIndexConfig(...)). However, they will not be utilized because there are no rules for them.

How was this patch tested?

With unit tests

@clee704 clee704 self-assigned this Jun 10, 2021
@clee704 clee704 added the enhancement New feature or request label Jun 15, 2021
@clee704 clee704 added this to the v0.5.0 milestone Jun 15, 2021
@clee704
Copy link
Author

clee704 commented Jun 15, 2021

TODO: check if existing rules work in the presence of unsupported indexes... (they should, as if there are no such indexes at all)

@clee704 clee704 changed the title [WIP] Data Skipping Index Part 1 Data Skipping Index Part 1 Jun 15, 2021
@sezruby
Copy link
Collaborator

sezruby commented Jun 15, 2021

TODO: check if existing rules work in the presence of unsupported indexes... (they should, as if there are no such indexes at all)

I think we should add IndexTypeFilter at the beginning of the index filter list of FilterIndexRule / JoinIndexRule. WDYT?

@sezruby
Copy link
Collaborator

sezruby commented Jun 15, 2021

Could you add Implementation section and write the list of PR-level TODOs in the proposal?
So that we could check the remaining work & upcoming changes for Data skipping indexes.

@clee704 clee704 marked this pull request as ready for review June 20, 2021 13:50
Copy link
Contributor

@andrei-ionescu andrei-ionescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some feedback.

The main inconvenience for me was the fact that important pieces of core code do not have scaladoc and I couldn't easily understand what that piece of code or class does.

Please, don't exclude the heavy use cases, aka the datasets with millions of files. Although this first PR may not be the place to address them, I'm just letting you know about the fact that those heavy use cases are real and very close to come.

@clee704 clee704 changed the title Data Skipping Index Part 1 [WIP] Data Skipping Index Part 1 Jun 28, 2021
@clee704 clee704 marked this pull request as draft June 28, 2021 14:41
@clee704 clee704 force-pushed the ds1 branch 4 times, most recently from b3c04b4 to 8346880 Compare July 5, 2021 08:52
@clee704 clee704 changed the title [WIP] Data Skipping Index Part 1 Data Skipping Index Part 1 Jul 5, 2021
@clee704 clee704 marked this pull request as ready for review July 5, 2021 12:24
@clee704 clee704 requested a review from sezruby July 5, 2021 12:24
@sezruby
Copy link
Collaborator

sezruby commented Jul 6, 2021

No major comment, but I'd like to wait for Part 2 PR just in case you may want to fix index creation code after that.

@clee704 clee704 changed the title Data Skipping Index Part 1 Data Skipping Index Part 2: Basics Jul 28, 2021
@clee704 clee704 marked this pull request as draft July 28, 2021 11:49
Implemented DataSkippingIndex and MinMaxSketch.
@clee704 clee704 requested a review from sezruby August 2, 2021 16:05
Comment on lines 141 to 145
val minRowCountPerFileData = indexData
.groupBy(spark_partition_id())
.count()
.agg(min("count"))
.collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems spark_partition_id() is non-deterministic. Is it worth to get the min count of partitions here?
Could you explain why don't you use the length of indexData.inputFiles here?

Copy link
Author

@clee704 clee704 Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's computing the minimum value of the row counts of index data files that would be written. Grouping by spark_partition_id() is the same as grouping by the file to be written. groupBy() followed by count() computes the number of rows per group. agg(min("count")) computes the min.

indexData.inputFiles is equal to the number of source data files, and not related to the number of index data files to be written here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But can we just use indexData.inputFiles / HyperspaceConf.DataSkipping.minRecordsPerIndexDataFile ?
Or we could use spark.sql.files.maxRecordsPerFile config though I haven't tested yet.

Copy link
Author

@clee704 clee704 Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically running lines 152-153 unconditionally, and it can actually increase the number of index data files, which is I want to avoid.

I don't understand how maxRecordsPerFile can be used. Can you elaborate on that?

Copy link
Collaborator

@sezruby sezruby Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically running lines 152-153 unconditionally, and it can actually increase the number of index data files, which is I want to avoid.

I don't get this. Could you explain more? if it's a max config, not min, then I think numIndexFiles = inputFiles / maxRecords should work . .

Below test code generated 4 parquet files (but I haven't checked the exact behavior. maybe repartition is better for distribution):

spark.conf.set("spark.sql.files.maxRecordsPerFile", 1)
Seq(1, 2, 3, 4).toDF("a").write.parquet("recordtest")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we want to avoid too many files. Suppose that minRecordsPerIndexDataFile = 100 and there are 1000 files with 10 records each. Because 10 < 100, we repartition the data by 1000 / 10 = 100, so that there will be 100 files with 100 records each. It is also possible that 10 files with 1000 records each. In this case, we don't want to repartition.

I've changed repartition to coalesce and the meaning of the config to make it simpler.

@clee704 clee704 requested a review from sezruby August 4, 2021 02:12
indexData.cache()
val minRowCountPerFileConfig =
HyperspaceConf.DataSkipping.minRecordsPerIndexDataFile(ctx.spark)
val maxNumFiles = toIntExact(math.max(1, indexData.count() / minRowCountPerFileConfig))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference between indexData.count and indexData.inputFiles?
if not, we don't need indexData.cache

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using indexData.inputFiles.length can silently break the code if we change the way we create index data in the future, whereas indexData.count() will be always correct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But indexData.count() requires indexData.cache which is not free and it will take longer than inputFiles. . . ?

Copy link
Author

@clee704 clee704 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't have to worry too much about the performance here, as index creation/refresh are less frequent operations and index data is usually small. cache() and count() should be fast enough.

indexData.inputFiles.length being equal to indexData.count() is a mere coincidence, happen to be true for the current implementation, but we have no such design constraint. We could have, but again I can't find the need at the moment. We can consider it if such a need arises.

Copy link
Collaborator

@sezruby sezruby Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could put the constraint(indexData.count == indexData.inputfiles.length) as an unit test. (+ some comment here)
Dataframe caching is not cheaper & simpler than you think; it involves many components like optimizer, scheduler, block management .. etc. Of course, problems rarely happen with this kind of small dataset, I'd like to avoid to using it if possible..

Copy link
Author

@clee704 clee704 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My data says cache() doesn't involve noticeable overhead for 1TB/10K files source data. It'd be really helpful if you can quantify your concern about caching.

val minRowCountPerFileConfig =
HyperspaceConf.DataSkipping.minRecordsPerIndexDataFile(ctx.spark)
val maxNumFiles = toIntExact(math.max(1, indexData.count() / minRowCountPerFileConfig))
val repartitionedIndexData = indexData.coalesce(maxNumFiles)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok one question: the index data - is it already partitioned? Why are you worry about small files?
If it's maxRecordPerFiles, we don't need to care about it.

I think coalesce is not a good idea. it just control the number of files, not the size or number of record for each files.
And the performance is sometimes worse than repartition.

Copy link
Author

@clee704 clee704 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because often data skipping index data is very small and Spark is bad at working with many small files. Without repartition, the tiny index data is stored in too many files like 200 files. It's no good for performance.

I think coalesce is fine - it doesn't increase the number of files and I've verified that it evenly distributes rows over files. Also, the index data is much smaller than the source data. Even if there is a performance issue, it would be minor.

Copy link
Collaborator

@sezruby sezruby Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the join, it's partitioned (default 200). coalesce merges partitions, not distributing the rows as it's optimized for that purpose.
So.. as you said performance doesn't matter for small index dataset, I would prefer repartition(maxNumFiles)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, then we need to bring back the branch because it shouldn't increase the number of files.

Copy link
Collaborator

@sezruby sezruby Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to repartition with numPartitions based on maxRowCountPerFiles config as users can adjust it and the default value(100000) looks reasonable. indexData.count() (+ for minRowCountPerFileData groupby/count) incurs spark jobs & caching operation though it's faster enough.
If a user changed the value as smaller one, which is increasing file number, then we could just let them do that. With the default value, it won't happen as I don't think Spark can handle more than 20M files.

So I would avoid unnecessary operations if possible. cc @imback82 WDYT?

This is my preference:

    val maxRowCountPerFileConfig =
      HyperspaceConf.DataSkipping.maxRecordsPerIndexDataFile(ctx.spark)
    val maxNumFiles = toIntExact(math.max(1, indexData.inputFiles.length / maxRowCountPerFileConfig))
    val repartitionedIndexData = indexData.repartition(maxNumFiles)

Copy link
Collaborator

@sezruby sezruby Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clee704 I agree with your point, but what I'm trying to say is your approach and suggested approach do the same thing, but yours has additional operations that I'd like to avoid.

Copy link
Author

@clee704 clee704 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not the same, as the suggested approach fixes the number of files, which can be larger than it was before the repartition.

Copy link
Author

@clee704 clee704 Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, this row count based repartitioning thing was only an approximation for a better approach which is based on the data size. I'll push a new commit to do that.

Copy link
Collaborator

@sezruby sezruby Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the default config (100,000) and the shuffle partition config (200), it only happens when it has more than (200 * 100,000) files. Considering smaller shuffle partition number, let say 1, numFiles would be between 1-10 for 100K-1M files.

And if a user wants to reduce the index file size, maybe because of large BFs, they might try a smaller config val, but it won't work because of the condition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I said it was a suboptimal approach. For the best index application performance, index data files should be evenly distributed over files in terms of data size because index data filtering is mostly I/O bound.

@clee704 clee704 requested a review from sezruby August 4, 2021 03:27
Comment on lines 141 to 145
indexData.cache()
indexData.count() // force cache
// Note: the actual index data size can be smaller due to compression.
val indexDataSize = indexData.queryExecution.optimizedPlan.stats.sizeInBytes
indexData.unpersist()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems don't need to cache here; optimizedPlan.stats.sizeInBytes doesn't read from the cached data, but just estimate the plan.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't cache, the reported size is close to the size of the source data because it doesn't take the aggregations into account. The difference is huge.

Even if you cache, I found that optimizePlan.stats.sizeInBytes is somewhat too rough for the purpose. Please refer to the new commit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your concern about cache(). I'll check the performance with some real data.

Copy link
Collaborator

@sezruby sezruby Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clee704 Can we use the max record limit or spark.hyperspace.index.dataskipping.numPartitions config? To me, these looks unnecessary complexity.
In most cases, DS index data will be stored in 1 file. Even it's not evenly distributed, it's okay. We don't need to measure the exact size; repartition distributes rows by round robin and compression rate will be different as you already mentioned

The reason I made the comment - #461 (comment) - is.. I just wanted to make it configurable.

Can we create an issue(backlog) for this and deliver a simple initial version first?

Copy link
Author

@clee704 clee704 Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is so complex about the code here? It's just a few lines of code. It's for the best user experience. If we fix the number of files with such a config, users should set the config value by themselves every time they create an index. Most of the time, they won't know the final index size and should create the index twice. If they set the value to 1 and don't change it, there is a possibility of OOM or slow write because write is done by a single task. Therefore, I cannot accept using such a config. It could be used as a supplementary config in addition to the current implementation, though.

I don't know where you are getting such strong concern for caching, but here are actual numbers:

Data: catalog_sales (from TPC-DS 1TB)
Stored size (parquet files): 138.4 GB
Row count: 1,439,980,416
Number of files: 1,837

10 executors / 40 cores:

cache sketch # index data files index creation time index data size
with cache MinMaxSketch 1 1m 4s 30 KB
without cache MinMaxSketch 1 1m 2s 30 KB
with cache BloomFilterSketch 4 1m 30s 1 GB
without cache BloomFilterSketch 1 1m 40s 1 GB
without cache BloomFilterSketch 4 1m 26s 1 GB

2 executors / 8 cores:

cache sketch # index data files index creation time index data size
with cache MinMaxSketch 1 4m 32s 30 KB
without cache MinMaxSketch 1 4m 23s 30 KB
with cache BloomFilterSketch 4 5m 15s 1 GB
without cache BloomFilterSketch 1 5m 27s 1 GB
without cache BloomFilterSketch 4 5m 11s 1 GB

"with cache" indicates the current PR's implementation is used. "without cache" is repartition by a fixed value controlled by a config without caching.

As you can see, the overhead is minimal and less than 5% of the total creation time. The reality would be worse for the "without cache" case - you have to try multiple times to create an index because you don't know the index size, and always repartitioning to a single partition doesn't work for all workloads (OOM/too slow write).

If you still don't think caching isn't right, could you please state more clearly what's so bad about it?

Copy link
Collaborator

@sezruby sezruby Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share the cluster size and result using a small cluster like 2 small nodes?

cc @imback82 @andrei-ionescu @rapoth @paryoja Could you add some comment on this?
I think cache() for the exact RDD byte size to determine the number of DS index files is unnecessary. Since in most cases it would be 1.

So I suggested to use maxRecordsPerFile config or index.dataskipping.numPartitions
config which does not need an additional spark job and cache.

Just my opinion and I'm not saying it's a must; I'd like to hear from other folks.

+I'm not worry about caching such a small dataset.. a problem will rarely happen.
+But what if there's no available storage memory on executors? it will spill into the disk though it will still work. Or what if it evicts user's cached rdd? I'd like to avoid these kind of unexpected problems if possible

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data was for 10 executors / 40 cores. Most of the index creation time comes from reading and aggregating the source data. Using a small cluster won't change the overall results regarding caching. But I'll give the results with 2 executors.

Index data sizes can range from a few KB to tens of GB or more, depending on the number of the source data files, types of sketches, number of distinct values for each column, etc. Repartition by 1 wouldn't work for large index data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another option: write the index data without any repartitioning, and check if the written index data files are too small (smaller than the configured target index data file size). If it's the case, read the files again to merge them. How about this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! Without repartition, each file would usually be small.
Seems there's no major concern from others, I'll approve the change.

@clee704 clee704 requested a review from sezruby August 5, 2021 09:31
@sezruby sezruby merged commit f94fda8 into microsoft:master Aug 9, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants