-
Notifications
You must be signed in to change notification settings - Fork 114
[PROPOSAL]: Data Skipping Indexes #441
Comments
Thanks @andrei-ionescu for the info. I just took a quick look at it and it doesn't seem straightforward to plug it in (@clee704 / @sezruby please correct me if I am wrong here), and since the project is fairly new, I am not sure if it's a good idea to have a dependency on it at this stage. Also, the index maintainence scenario seems different (no support for concurrent updates, etc.) But we will revisit when the project becomes mature. This proposal is a scoped down version of #335 (and adding few more variations) to support filter queries first. |
@andrei-ionescu I've skimmed overall XSkipper codebase and I don't think we could easily plug it in. We are planning to support similar features - BF, (min/max), partition elimination index (similar to valueList). FYI @clee704 is working on this data skipping feature now. |
Implementation added as #443. |
This a good direction to work on. Skipping indexes are very helpful in the context of very big datasets: terabytes of data spanning across millions of files. The start of the proposal is good but it needs more information and we need to have at least some parial answers to the following questions:
Some more information would be very helpful on how the plans will look. Can you provide some snippets of such plans altered to make use of skip indexes? |
No, because the index data is expected to be very small compared to the source data and a full scan of index data (or using a cached one) should be fine. For example, suppose we have 100 TB of data, stored in files, each file being 1 GB. It means there are 100,000 files and our index data has 100,000 rows. It might seem large, but actually it's not compared to the 100 TB of data. If a full scan turns out to be too slow, then we might consider adding indexes to the index data. For example, we can index min/max sketches to speed up range queries.
There is no order, and having the index data ordered by file names won't help much because we don't support mutable files. All we support is deleted files and newly added files. For incremental refreshes, we just ignore deleted files and add new index data files for newly appended files. If deletion is a frequent operation, then we might consider partitioning the index data in the same way as the source data for a faster deletion of index data when an entire partition of source data is deleted, which seems to be a common operation in the field. For ordering by min/max values, please refer to the first answer above.
We'll start with a naive approach which is just applying the index regardless of the effectiveness. Except for the index processing time, the query plan doesn't get worse and the index processing time would be dwarfed by the query processing time. Later, we might add mechanisms to help users, such as notifying the expected effectiveness of an index based on the selected sketches.
In Hyperspace, a file is considered to be equal to another file only if their paths, sizes, and modified times are the same. The tuple (path, size, modified time) can identify files in cases like the one you mentioned. I'll update the examples.
For users, the only difference is that they provide a different type of
Adding new files is easy. New index data will be stored in new parquet files. Spark will be unusably slow with 10x millions of files to begin with, even without Hyperspace. In the first version of data skipping indexes, we will focus on a smaller number of files and will consider improving the performance for a large number of files.
What does SLA stand for? Anyway, incremental refresh can add files and optimize can reduce the number of files. This is more or less the same as covering indexes.
For
A basic example is given in the proposal. Please refer to the section "Data Skipping Index Basics". The transformed plan will be the same as the original plan, except that the relation node in the transformed plan will have fewer files than the original one. |
Can you link me to the updated example? I couldn't find them. Thanks
So, it will add a new skipping index entry in the index with
Depending on the Spark cluster size, 10 millions of files can be easily processed in parallel. Given the biggest Spark cluster size can hold up Now, Hyperspace is the key. If the data skipping is well defined and implemented, then instead of using such a big cluster, it may reduce to a small cluster to achieve the same result. We do have many of such datasets, PetaBytes of data with millions of files. Data Skipping index working for small datasets is a helper but having it work for very big datasets will be a game changer. So, please think big and try to accommodate the heavy scenarios.
SLA = Service Level Agreement. My question is what is the time from a dataset being changed and the index being brought up to date with the new data? Or, what's the time for a incremental refresh in the case of Data Skipping Index? Is this update time keeping constant over the time while the data grows? I'm bringing these questions up because if my data update/append takes
I did read the example and make sense. I'm more interested in the Spark Plan itself. Like how the @rapoth, @imback82, @sezruby: I'm CC'ing you as the discussion is in part leaning on the product definition side. |
It is in the PR description: #461. I've also updated the issue description.
That's right. Incremental refresh will only add index data files unless there are deleted source data files. If there are deleted source data files, the index data is rewritten without entries for those deleted files, but this should be a cheap operation compare to reading all source data files again.
Data skipping index per se will be scalable with >1 million files. The problem here is that the way we process files in Hyperspace is not optimized for it. We'll keep improving things in this regard.
Incremental refresh should be fast, because it only looks at new data and appends new index data files without accessing previous source data or index data.
At the query time, the plan won't be much different from the original plan. The only difference would be that file scan nodes in the optimized plan will access fewer files. |
Problem Statement
Add support for data skipping indexes.
Background and Motivation
Hyperspace has been supporting hash-partitioned covering indexes only. Covering indexes are good for some workloads, but we should have more index types to facilitate a broader range of use cases.
Proposed Solution
Refactor existing code by moving covering index specific code into a single implementation for a new
Index
API and implementing data skipping indexes for the API.Alternatives
The proposed solution requires heavy refactoring, as there is a lot of code specific to covering indexes scattered throughout the codebase. Alternatively, we can make
DataSkippingIndex
without introducing a new API and write many if-elses depending on the index type. It might be quick to implement the required features for now in this way, but it will make the code hard to maintain in the long run.Known/Potential Compatibility Issues
Regarding data compatibility, it is very likely that this will make the new version of Hyperspace incompatible with indexes created in the older versions. This is unfortunate but can be excused as Hyperspace is still in version zero. If a strong need arises, this can be mitigated by a migration utility that can be designed separately.
The API compatibility can be retained if we introduce new optional arguments to
IndexConfig.
However,includedColumns
doesn't make sense for data skipping indexes, and newly added arguments won't for covering indexes. Therefore, it might be wise to break compatibility now and make things clean for the future.Design
Data Skipping Index Basics
Data skipping indexes can filter out certain files that cannot have the desired values in indexed columns based on statistics or sketches on the values in files. Examples of statistics or sketches include min/max values and bloom filters.
Suppose that we have sample data like below. There is a relation
df
which has two columns,a
andb
, and consists of two files,/data/p0
and/data/p1
:/data/p0
:/data/p1
:A min-max data skipping index on a column
a
will look like this:The file ID column is used to map files to unique identifiers. Identifiers are created based on the file path, size, and modification time.
Then, when a query like
df.filter($"a" < 4)
is given, we know that we can skip the/data/p1
file.Note that the real implementation will contain file size and modified time columns for files, to distinguish files with the same name but different sizes/modified times to support deleted/added source files.
Storing Index Data
The index data can be stored as parquet files like it is done for covering indexes. The difference here is that we don't need bucketing. Also, the data size is a lot less because there is only a single row for each file.
To speed up range queries like the last example, it might be necessary that the data is cached in the main memory as sorted sequences ordered by
min
andmax
. This should be cheap to compute as the data size is expected to be small.If there are too many files, we might need to consider structures like B-tree to access relevant files only. But this might not be a real issue; the index data should be orders of magnitude smaller than the source data.
Index Application
Data skipping indexes can be applied in three steps:
Unlike covering indexes, different data skipping indexes can be applied for the same relation multiple times.
Examples of constraints include
a < 4
,a = 10
, ora in (1, 2, 3)
. Combined with statistics or sketches like min/max values and/or bloom filters, this information can be used to weed out irrelevant files.Theoretically, applying data skipping indexes for joins is possible. Still, the implementation will be complex, and benefits are unclear over the traditional broadcast hash join, especially when the indexed columns have a low correlation with the partitioning columns. Therefore, we will focus on filters for now.
Hybrid scan is trivial and requires no special handling for data skipping indexes, as we only filter out irrelevant files and the remaining files are not touched.
The estimated cost/benefit of applying data skipping indexes - TODO
Index Maintenance
Given a dataframe (a single relation consisting of files), an index name, a column/columns to index, a statistics/sketch type (min/max, bloom filter, etc.), we can create a data skipping index by either scanning every row in the files and building the specified statistics or looking up existing statistics in the files.
Incremental refresh is possible, and in fact, is not much different from a full refresh. During refresh, we drop rows for deleted files from the index data and add rows for added files. If there are no deleted files, incremental refresh can store new rows in new files, not touching old files. Full refresh can then merge such files later.
Implementation
PRs
TODO
Check if Spark gives you the partition-pruned file lists when we apply data skipping indexes. If it doesn't, then we should do partition-pruning ourselves prior to filtering out files to speed up the process.doneThe text was updated successfully, but these errors were encountered: