From a112411efed8487680d1e2e2b4ed4d4298cea523 Mon Sep 17 00:00:00 2001 From: Dax Pryce Date: Wed, 30 Aug 2023 13:51:22 -0700 Subject: [PATCH] Fixes #432, bug in using openmp with gcc and omp_get_num_threads() (#445) * Fixes #432, bug in using openmp with gcc and omp_get_num_threads() only reporting the number of threads collaborating on the current code region not available overall. I made this error and transitioned us from omp_get_num_procs() about 5 or 6 months ago and only with bug #432 did I really get to see how problematic my naive expectations were. * Removed cosine distance metric from disk index until we can properly fix it in pqflashindex. Documented what distance metrics can be used with what vector dtypes in tables in the documentation. --- include/parameters.h | 2 +- python/src/_builder.py | 28 +++++++++++++++++++++++ python/src/dynamic_memory_index.cpp | 3 +-- python/src/static_disk_index.cpp | 5 ++-- python/src/static_memory_index.cpp | 6 ++--- python/tests/test_dynamic_memory_index.py | 24 +++++++++++++++++++ python/tests/test_static_disk_index.py | 19 ++++++++++++++- python/tests/test_static_memory_index.py | 21 +++++++++++++++++ src/index.cpp | 2 +- 9 files changed, 100 insertions(+), 10 deletions(-) diff --git a/include/parameters.h b/include/parameters.h index 209b9128c..4fec9ae08 100644 --- a/include/parameters.h +++ b/include/parameters.h @@ -83,7 +83,7 @@ class IndexWriteParametersBuilder IndexWriteParametersBuilder &with_num_threads(const uint32_t num_threads) { - _num_threads = num_threads == 0 ? omp_get_num_threads() : num_threads; + _num_threads = num_threads == 0 ? omp_get_num_procs() : num_threads; return *this; } diff --git a/python/src/_builder.py b/python/src/_builder.py index 18e9e9fa0..db2b200db 100644 --- a/python/src/_builder.py +++ b/python/src/_builder.py @@ -70,6 +70,15 @@ def build_disk_index( in the format DiskANN's PQ Flash Index builder requires. This temp folder is deleted upon index creation completion or error. + ## Distance Metric and Vector Datatype Restrictions + | Metric \ Datatype | np.float32 | np.uint8 | np.int8 | + |-------------------|------------|----------|---------| + | L2 | ✅ | ✅ | ✅ | + | MIPS | ✅ | ❌ | ❌ | + | Cosine [^bug-in-disk-cosine] | ❌ | ❌ | ❌ | + + [^bug-in-disk-cosine]: For StaticDiskIndex, Cosine distances are not currently supported. + ### Parameters - **data**: Either a `str` representing a path to a DiskANN vector bin file, or a numpy.ndarray, of a supported dtype, in 2 dimensions. Note that `vector_dtype` must be provided if data is a `str` @@ -119,6 +128,12 @@ def build_disk_index( vector_bin_path, vector_dtype_actual = _valid_path_and_dtype( data, vector_dtype, index_directory, index_prefix ) + _assert(dap_metric != _native_dap.COSINE, "Cosine is currently not supported in StaticDiskIndex") + if dap_metric == _native_dap.INNER_PRODUCT: + _assert( + vector_dtype_actual == np.float32, + "Integral vector dtypes (np.uint8, np.int8) are not supported with distance metric mips" + ) num_points, dimensions = vectors_metadata_from_file(vector_bin_path) @@ -176,6 +191,14 @@ def build_memory_index( `diskannpy.DynamicMemoryIndex`, you **must** supply a valid value for the `tags` parameter. **Do not supply tags if the index is intended to be `diskannpy.StaticMemoryIndex`**! + ## Distance Metric and Vector Datatype Restrictions + + | Metric \ Datatype | np.float32 | np.uint8 | np.int8 | + |-------------------|------------|----------|---------| + | L2 | ✅ | ✅ | ✅ | + | MIPS | ✅ | ❌ | ❌ | + | Cosine | ✅ | ✅ | ✅ | + ### Parameters - **data**: Either a `str` representing a path to an existing DiskANN vector bin file, or a numpy.ndarray of a @@ -232,6 +255,11 @@ def build_memory_index( vector_bin_path, vector_dtype_actual = _valid_path_and_dtype( data, vector_dtype, index_directory, index_prefix ) + if dap_metric == _native_dap.INNER_PRODUCT: + _assert( + vector_dtype_actual == np.float32, + "Integral vector dtypes (np.uint8, np.int8) are not supported with distance metric mips" + ) num_points, dimensions = vectors_metadata_from_file(vector_bin_path) diff --git a/python/src/dynamic_memory_index.cpp b/python/src/dynamic_memory_index.cpp index f92f4157e..3add2aa5c 100644 --- a/python/src/dynamic_memory_index.cpp +++ b/python/src/dynamic_memory_index.cpp @@ -34,8 +34,7 @@ diskann::Index dynamic_index_builder(const diskann:: const uint32_t initial_search_threads, const bool concurrent_consolidation) { - const uint32_t _initial_search_threads = - initial_search_threads != 0 ? initial_search_threads : omp_get_num_threads(); + const uint32_t _initial_search_threads = initial_search_threads != 0 ? initial_search_threads : omp_get_num_procs(); auto index_search_params = diskann::IndexSearchParams(initial_search_complexity, _initial_search_threads); return diskann::Index( diff --git a/python/src/static_disk_index.cpp b/python/src/static_disk_index.cpp index 654f8ec30..9e86b0ad5 100644 --- a/python/src/static_disk_index.cpp +++ b/python/src/static_disk_index.cpp @@ -14,7 +14,8 @@ StaticDiskIndex
::StaticDiskIndex(const diskann::Metric metric, const std::st const uint32_t cache_mechanism) : _reader(std::make_shared()), _index(_reader, metric) { - int load_success = _index.load(num_threads, index_path_prefix.c_str()); + const uint32_t _num_threads = num_threads != 0 ? num_threads : omp_get_num_procs(); + int load_success = _index.load(_num_threads, index_path_prefix.c_str()); if (load_success != 0) { throw std::runtime_error("index load failed."); @@ -22,7 +23,7 @@ StaticDiskIndex
::StaticDiskIndex(const diskann::Metric metric, const std::st if (cache_mechanism == 1) { std::string sample_file = index_path_prefix + std::string("_sample_data.bin"); - cache_sample_paths(num_nodes_to_cache, sample_file, num_threads); + cache_sample_paths(num_nodes_to_cache, sample_file, _num_threads); } else if (cache_mechanism == 2) { diff --git a/python/src/static_memory_index.cpp b/python/src/static_memory_index.cpp index 0dbb24dc3..23a349fac 100644 --- a/python/src/static_memory_index.cpp +++ b/python/src/static_memory_index.cpp @@ -17,7 +17,7 @@ diskann::Index static_index_builder(const diskann::Me { throw std::runtime_error("initial_search_complexity must be a positive uint32_t"); } - auto index_search_params = diskann::IndexSearchParams(initial_search_complexity, omp_get_num_threads()); + auto index_search_params = diskann::IndexSearchParams(initial_search_complexity, omp_get_num_procs()); return diskann::Index
(m, dimensions, num_points, nullptr, // index write params std::make_shared(index_search_params), // index search params @@ -36,7 +36,7 @@ StaticMemoryIndex
::StaticMemoryIndex(const diskann::Metric m, const std::str const uint32_t initial_search_complexity) : _index(static_index_builder
(m, num_points, dimensions, initial_search_complexity)) { - const uint32_t _num_threads = num_threads != 0 ? num_threads : omp_get_num_threads(); + const uint32_t _num_threads = num_threads != 0 ? num_threads : omp_get_num_procs(); _index.load(index_prefix.c_str(), _num_threads, initial_search_complexity); } @@ -56,7 +56,7 @@ NeighborsAndDistances StaticMemoryIndex
::batch_search( py::array_t &queries, const uint64_t num_queries, const uint64_t knn, const uint64_t complexity, const uint32_t num_threads) { - const uint32_t _num_threads = num_threads != 0 ? num_threads : omp_get_num_threads(); + const uint32_t _num_threads = num_threads != 0 ? num_threads : omp_get_num_procs(); py::array_t ids({num_queries, knn}); py::array_t dists({num_queries, knn}); std::vector
empty_vector; diff --git a/python/tests/test_dynamic_memory_index.py b/python/tests/test_dynamic_memory_index.py index 48c05443c..13d9b08db 100644 --- a/python/tests/test_dynamic_memory_index.py +++ b/python/tests/test_dynamic_memory_index.py @@ -40,6 +40,7 @@ def setUpClass(cls) -> None: build_random_vectors_and_memory_index(np.float32, "cosine", with_tags=True), build_random_vectors_and_memory_index(np.uint8, "cosine", with_tags=True), build_random_vectors_and_memory_index(np.int8, "cosine", with_tags=True), + build_random_vectors_and_memory_index(np.float32, "mips", with_tags=True), ] cls._example_ann_dir = cls._test_matrix[0][4] @@ -442,4 +443,27 @@ def _tiny_index(): warnings.simplefilter("error") # turns warnings into raised exceptions index.batch_insert(rng.random((2, 10), dtype=np.float32), np.array([15, 25], dtype=np.uint32)) + def test_zero_threads(self): + for ( + metric, + dtype, + query_vectors, + index_vectors, + ann_dir, + vector_bin_file, + generated_tags, + ) in self._test_matrix: + with self.subTest(msg=f"Testing dtype {dtype}"): + index = dap.DynamicMemoryIndex( + distance_metric="l2", + vector_dtype=dtype, + dimensions=10, + max_vectors=11_000, + complexity=64, + graph_degree=32, + num_threads=0, # explicitly asking it to use all available threads. + ) + index.batch_insert(vectors=index_vectors, vector_ids=generated_tags, num_threads=0) + k = 5 + ids, dists = index.batch_search(query_vectors, k_neighbors=k, complexity=5, num_threads=0) diff --git a/python/tests/test_static_disk_index.py b/python/tests/test_static_disk_index.py index c36c581d2..35015276e 100644 --- a/python/tests/test_static_disk_index.py +++ b/python/tests/test_static_disk_index.py @@ -25,7 +25,7 @@ def _build_random_vectors_and_index(dtype, metric): complexity=32, search_memory_maximum=0.00003, build_memory_maximum=1, - num_threads=1, + num_threads=0, pq_disk_bytes=0, ) return metric, dtype, query_vectors, index_vectors, ann_dir @@ -38,6 +38,7 @@ def setUpClass(cls) -> None: _build_random_vectors_and_index(np.float32, "l2"), _build_random_vectors_and_index(np.uint8, "l2"), _build_random_vectors_and_index(np.int8, "l2"), + _build_random_vectors_and_index(np.float32, "mips"), ] cls._example_ann_dir = cls._test_matrix[0][4] @@ -149,3 +150,19 @@ def test_value_ranges_batch_search(self): index.batch_search( queries=np.array([[]], dtype=np.single), **kwargs ) + + def test_zero_threads(self): + for metric, dtype, query_vectors, index_vectors, ann_dir in self._test_matrix: + with self.subTest(msg=f"Testing dtype {dtype}"): + index = dap.StaticDiskIndex( + distance_metric="l2", + vector_dtype=dtype, + index_directory=ann_dir, + num_threads=0, # Issue #432 + num_nodes_to_cache=10, + ) + + k = 5 + ids, dists = index.batch_search( + query_vectors, k_neighbors=k, complexity=5, beam_width=2, num_threads=0 + ) \ No newline at end of file diff --git a/python/tests/test_static_memory_index.py b/python/tests/test_static_memory_index.py index ce12ed3bf..a04f98928 100644 --- a/python/tests/test_static_memory_index.py +++ b/python/tests/test_static_memory_index.py @@ -20,6 +20,7 @@ def setUpClass(cls) -> None: build_random_vectors_and_memory_index(np.float32, "cosine"), build_random_vectors_and_memory_index(np.uint8, "cosine"), build_random_vectors_and_memory_index(np.int8, "cosine"), + build_random_vectors_and_memory_index(np.float32, "mips"), ] cls._example_ann_dir = cls._test_matrix[0][4] @@ -165,3 +166,23 @@ def test_value_ranges_batch_search(self): index.batch_search( queries=np.array([[]], dtype=np.single), **kwargs ) + + def test_zero_threads(self): + for ( + metric, + dtype, + query_vectors, + index_vectors, + ann_dir, + vector_bin_file, + _, + ) in self._test_matrix: + with self.subTest(msg=f"Testing dtype {dtype}"): + index = dap.StaticMemoryIndex( + index_directory=ann_dir, + num_threads=0, + initial_search_complexity=32, + ) + + k = 5 + ids, dists = index.batch_search(query_vectors, k_neighbors=k, complexity=5, num_threads=0) \ No newline at end of file diff --git a/src/index.cpp b/src/index.cpp index 0b10cc9a0..478b86273 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -2370,7 +2370,7 @@ consolidation_report Index::consolidate_deletes(const IndexWrit const uint32_t range = params.max_degree; const uint32_t maxc = params.max_occlusion_size; const float alpha = params.alpha; - const uint32_t num_threads = params.num_threads == 0 ? omp_get_num_threads() : params.num_threads; + const uint32_t num_threads = params.num_threads == 0 ? omp_get_num_procs() : params.num_threads; uint32_t num_calls_to_process_delete = 0; diskann::Timer timer;