diff --git a/include/index.h b/include/index.h index bb694e39e..4ba3d6dbd 100644 --- a/include/index.h +++ b/include/index.h @@ -45,22 +45,31 @@ namespace diskann { }; status_code _status; size_t _active_points, _max_points, _empty_slots, _slots_released, - _delete_set_size; + _delete_set_size, _num_calls_to_process_delete; double _time; consolidation_report(status_code status, size_t active_points, size_t max_points, size_t empty_slots, size_t slots_released, size_t delete_set_size, - double time_secs) + size_t num_calls_to_process_delete, double time_secs) : _status(status), _active_points(active_points), _max_points(max_points), _empty_slots(empty_slots), _slots_released(slots_released), _delete_set_size(delete_set_size), + _num_calls_to_process_delete(num_calls_to_process_delete), _time(time_secs) { } }; template class Index { + /************************************************************************** + * + * Public functions acquire one or more of _update_lock, _consolidate_lock, + * _tag_lock, _delete_lock before calling protected functions which DO NOT + * acquire these locks. They might acquire locks on _locks[i] + * + **************************************************************************/ + public: // Constructor for Bulk operations and for creating the index object solely // for loading a prexisting index. @@ -125,7 +134,7 @@ namespace diskann { // Set starting point to a random point on a sphere of certain radius DISKANN_DLLEXPORT void set_start_point_at_random(T radius); - // For Bulk Index FastL2 search, we interleave the data with graph + // For FastL2 search on a static index, we interleave the data with graph DISKANN_DLLEXPORT void optimize_index_layout(); // For FastL2 search on optimized layout @@ -183,9 +192,9 @@ namespace diskann { // memory should be allocated for vec before calling this function DISKANN_DLLEXPORT int get_vector_by_tag(TagT &tag, T *vec); - DISKANN_DLLEXPORT void print_status() const; + DISKANN_DLLEXPORT void print_status(); - DISKANN_DLLEXPORT void count_nodes_at_bfs_levels() const; + DISKANN_DLLEXPORT void count_nodes_at_bfs_levels(); // This variable MUST be updated if the number of entries in the metadata // change. @@ -203,6 +212,7 @@ namespace diskann { Index &operator=(const Index &) = delete; // Use after _data and _nd have been populated + // Acquire exclusive _update_lock before calling void build_with_data_populated(Parameters ¶meters, const std::vector &tags); @@ -210,7 +220,7 @@ namespace diskann { // This is not visible to the user int generate_frozen_point(); - // determines navigating node of the graph by calculating medoid of data + // determines navigating node of the graph by calculating medoid of datafopt unsigned calculate_entry_point(); std::pair iterate_to_fixed_point( @@ -218,8 +228,9 @@ namespace diskann { const std::vector &init_ids, InMemQueryScratch *scratch, bool ret_frozen = true, bool search_invocation = false); - void search_for_point_and_add_links(int location, _u32 Lindex, - InMemQueryScratch *scratch); + void search_for_point_and_prune(int location, _u32 Lindex, + std::vector &pruned_list, + InMemQueryScratch *scratch); void prune_neighbors(const unsigned location, std::vector &pool, std::vector &pruned_list, @@ -230,6 +241,8 @@ namespace diskann { const float alpha, std::vector &pruned_list, InMemQueryScratch *scratch); + // Prunes candidates in @pool to a shorter list @result + // @pool must be sorted before calling void occlude_list( const unsigned location, std::vector &pool, const float alpha, const unsigned degree, const unsigned maxc, @@ -243,30 +256,34 @@ namespace diskann { void inter_insert(unsigned n, std::vector &pruned_list, InMemQueryScratch *scratch); + // Acquire exclusive _update_lock before calling void link(Parameters ¶meters); - // Acquire _tag_lock before calling - int reserve_location(); + // Acquire exclusive _tag_lock and _delete_lock before calling + int reserve_location(); + + // Acquire exclusive _tag_lock before calling size_t release_location(int location); - size_t release_locations(tsl::robin_set &locations); + size_t release_locations(const tsl::robin_set &locations); // Resize the index when no slots are left for insertion. - // MUST acquire _num_points_lock and _update_lock before calling. + // Acquire exclusive _update_lock and _tag_lock before calling. void resize(size_t new_max_points); - // Take an unique lock on _update_lock and _consolidate_lock - // before calling these functions. + // Acquire unique lock on _update_lock, _consolidate_lock, _tag_lock + // and _delete_lock before calling these functions. // Renumber nodes, update tag and location maps and compact the // graph, mode = _consolidated_order in case of lazy deletion and // _compacted_order in case of eager deletion DISKANN_DLLEXPORT void compact_data(); DISKANN_DLLEXPORT void compact_frozen_point(); - // Remove deleted nodes from adj list of node i and absorb edges from - // deleted neighbors Acquire _locks[i] prior to calling for thread-safety + // Remove deleted nodes from adjacency list of node loc + // Replace removed neighbors with second order neighbors. + // Also acquires _locks[i] for i = loc and out-neighbors of loc. void process_delete(const tsl::robin_set &old_delete_set, - size_t i, const unsigned &range, const unsigned &maxc, - const float &alpha, InMemQueryScratch *scratch); + size_t loc, const unsigned range, const unsigned maxc, + const float alpha, InMemQueryScratch *scratch); void initialize_query_scratch(uint32_t num_threads, uint32_t search_l, uint32_t indexing_l, uint32_t r, @@ -299,7 +316,7 @@ namespace diskann { // Data T *_data = nullptr; - char *_opt_graph; + char *_opt_graph = nullptr; // Graph related data structures std::vector> _final_graph; @@ -335,13 +352,6 @@ namespace diskann { // Query scratch data structures ConcurrentQueue *> _query_scratch; - // data structures, flags and locks for dynamic indexing - tsl::sparse_map _tag_to_location; - natural_number_map _location_to_tag; - - tsl::robin_set _delete_set; - natural_number_set _empty_slots; - // Flags for PQ based distance calculation bool _pq_dist = false; bool _use_opq = false; @@ -350,23 +360,38 @@ namespace diskann { bool _pq_generated = false; FixedChunkPQTable _pq_table; - bool _lazy_done = false; // true if lazy deletions have been made + // + // Data structures, locks and flags for dynamic indexing and tags + // + + // lazy_delete removes entry from _location_to_tag and _tag_to_location. If + // _location_to_tag does not resolve a location, infer that it was deleted. + tsl::sparse_map _tag_to_location; + natural_number_map _location_to_tag; + + // _empty_slots has unallocated slots and those freed by consolidate_delete. + // _delete_set has locations marked deleted by lazy_delete. Will not be + // immediately available for insert. consolidate_delete will release these + // slots to _empty_slots. + natural_number_set _empty_slots; + std::unique_ptr> _delete_set; + bool _data_compacted = true; // true if data has been compacted bool _is_saved = false; // Gopal. Checking if the index is already saved. bool _conc_consolidate = false; // use _lock while searching - // Per node lock, cardinality=max_points_ - std::vector _locks; - - // If acquiring multiple locks below, acquire locks in the order below + // Acquire locks in the order below when acquiring multiple locks std::shared_timed_mutex // RW mutex between save/load (exclusive lock) and _update_lock; // search/inserts/deletes/consolidate (shared lock) - std::shared_timed_mutex - _consolidate_lock; // Ensure only one consolidate is ever active - std::shared_timed_mutex - _tag_lock; // RW lock for _tag_to_location and _location_to_tag - std::shared_timed_mutex - _delete_lock; // RW Lock on _delete_set and _empty_slots + std::shared_timed_mutex // Ensure only one consolidate or compact_data is + _consolidate_lock; // ever active + std::shared_timed_mutex // RW lock for _tag_to_location, + _tag_lock; // _location_to_tag, _empty_slots, _nd, _max_points + std::shared_timed_mutex // RW Lock on _delete_set and _data_compacted + _delete_lock; // variable + + // Per node lock, cardinality=_max_points + std::vector _locks; static const float INDEX_GROWTH_FACTOR; }; diff --git a/include/scratch.h b/include/scratch.h index bf45f8936..89f116b67 100644 --- a/include/scratch.h +++ b/include/scratch.h @@ -48,14 +48,12 @@ namespace diskann { inline uint32_t get_maxc() { return _maxc; } - inline T *aligned_query() { return _aligned_query; } inline PQScratch *pq_scratch() { return _pq_scratch; } - inline std::vector &pool() { return _pool; } @@ -65,7 +63,6 @@ namespace diskann { inline std::vector &occlude_factor() { return _occlude_factor; } - inline tsl::robin_set &inserted_into_pool_rs() { return _inserted_into_pool_rs; } @@ -78,6 +75,15 @@ namespace diskann { inline std::vector &dist_scratch() { return _dist_scratch; } + inline tsl::robin_set &expanded_nodes_set() { + return _expanded_nodes_set; + } + inline std::vector &expanded_nodes_vec() { + return _expanded_nghrs_vec; + } + inline std::vector &occlude_list_output() { + return _occlude_list_output; + } private: uint32_t _L; @@ -116,6 +122,11 @@ namespace diskann { // _dist_scratch must be > R*GRAPH_SLACK_FACTOR for iterate_to_fp // _dist_scratch should be at least the size of id_scratch std::vector _dist_scratch; + + // Buffers used in process delete, capacity increases as needed + tsl::robin_set _expanded_nodes_set; + std::vector _expanded_nghrs_vec; + std::vector _occlude_list_output; }; // diff --git a/src/index.cpp b/src/index.cpp index 40c1178d2..bfb5a70f3 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -57,9 +57,10 @@ namespace diskann { const bool use_opq) : _dist_metric(m), _dim(dim), _max_points(max_points), _dynamic_index(dynamic_index), _enable_tags(enable_tags), - _conc_consolidate(concurrent_consolidate), _query_scratch(nullptr), - _pq_dist(pq_dist_build), _num_pq_chunks(num_pq_chunks), - _use_opq(use_opq), _indexingMaxC(DEFAULT_MAXC) { + _indexingMaxC(DEFAULT_MAXC), _query_scratch(nullptr), + _conc_consolidate(concurrent_consolidate), + _delete_set(new tsl::robin_set), _pq_dist(pq_dist_build), + _use_opq(use_opq), _num_pq_chunks(num_pq_chunks) { if (dynamic_index && !enable_tags) { throw ANNException("ERROR: Dynamic Indexing must have tags enabled.", -1, __FUNCSIG__, __FILE__, __LINE__); @@ -149,6 +150,9 @@ namespace diskann { aligned_free(this->_data); this->_data = nullptr; } + if (_opt_graph != nullptr) { + delete[] _opt_graph; + } ScratchStoreManager> manager(_query_scratch); manager.destroy(); @@ -237,22 +241,27 @@ namespace diskann { template _u64 Index::save_delete_list(const std::string &filename) { - if (_delete_set.size() == 0) { + if (_delete_set->size() == 0) { return 0; } std::unique_ptr<_u32[]> delete_list = - std::make_unique<_u32[]>(_delete_set.size()); + std::make_unique<_u32[]>(_delete_set->size()); _u32 i = 0; - for (auto &del : _delete_set) { + for (auto &del : *_delete_set) { delete_list[i++] = del; } - return save_bin<_u32>(filename, delete_list.get(), _delete_set.size(), 1); + return save_bin<_u32>(filename, delete_list.get(), _delete_set->size(), 1); } template void Index::save(const char *filename, bool compact_before_save) { diskann::Timer timer; + std::unique_lock ul(_update_lock); + std::unique_lock cl(_consolidate_lock); + std::unique_lock tl(_tag_lock); + std::unique_lock dl(_delete_lock); + if (compact_before_save) { compact_data(); compact_frozen_point(); @@ -264,11 +273,6 @@ namespace diskann { } } - std::unique_lock ul(_update_lock); - std::unique_lock cl(_consolidate_lock); - std::unique_lock tl(_tag_lock); - std::unique_lock dl(_delete_lock); - if (!_save_as_one_file) { std::string graph_file = std::string(filename); std::string tags_file = std::string(filename) + ".tags"; @@ -341,7 +345,7 @@ namespace diskann { _tag_to_location.reserve(num_data_points); for (_u32 i = 0; i < (_u32) num_data_points; i++) { TagT tag = *(tag_data + i); - if (_delete_set.find(i) == _delete_set.end()) { + if (_delete_set->find(i) == _delete_set->end()) { _location_to_tag.set(i, tag); _tag_to_location[tag] = i; } @@ -418,7 +422,7 @@ namespace diskann { #endif assert(ndim == 1); for (uint32_t i = 0; i < npts; i++) { - _delete_set.insert(delete_list[i]); + _delete_set->insert(delete_list[i]); } return npts; } @@ -434,7 +438,9 @@ namespace diskann { uint32_t search_l) { #endif std::unique_lock ul(_update_lock); + std::unique_lock cl(_consolidate_lock); std::unique_lock tl(_tag_lock); + std::unique_lock dl(_delete_lock); _has_built = true; @@ -485,8 +491,6 @@ namespace diskann { _empty_slots.insert((uint32_t) i); } - _lazy_done = _delete_set.size() != 0; - reposition_frozen_point_to_end(); diskann::cout << "Num frozen points:" << _num_frozen_pts << " _nd: " << _nd << " _start: " << _start @@ -714,10 +718,9 @@ namespace diskann { scratch->inserted_into_pool_rs(); boost::dynamic_bitset<> &inserted_into_pool_bs = scratch->inserted_into_pool_bs(); - auto &id_scratch = scratch->id_scratch(); - auto &dist_scratch = scratch->dist_scratch(); + std::vector &id_scratch = scratch->id_scratch(); + std::vector &dist_scratch = scratch->dist_scratch(); assert(id_scratch.size() == 0); - T *aligned_query = scratch->aligned_query(); memcpy(aligned_query, query, _dim * sizeof(T)); if (_normalize_vecs) { @@ -884,8 +887,9 @@ namespace diskann { } template - void Index::search_for_point_and_add_links( - int location, _u32 Lindex, InMemQueryScratch *scratch) { + void Index::search_for_point_and_prune( + int location, _u32 Lindex, std::vector &pruned_list, + InMemQueryScratch *scratch) { std::vector init_ids; init_ids.emplace_back(_start); @@ -898,43 +902,18 @@ namespace diskann { if (pool[i].id == (unsigned) location) { pool.erase(pool.begin() + i); i--; - } else if (_delete_set.find(pool[i].id) != _delete_set.end()) { - pool.erase(pool.begin() + i); - i--; } } - std::vector pruned_list; + if (pruned_list.size() > 0) { + throw diskann::ANNException("ERROR: non-empty pruned_list passed", -1, + __FUNCSIG__, __FILE__, __LINE__); + } + prune_neighbors(location, pool, pruned_list, scratch); assert(!pruned_list.empty()); assert(_final_graph.size() == _max_points + _num_frozen_pts); - - { - std::shared_lock tlock(_tag_lock, - std::defer_lock); - if (_conc_consolidate) - tlock.lock(); - - LockGuard guard(_locks[location]); - _final_graph[location].clear(); - _final_graph[location].shrink_to_fit(); - _final_graph[location].reserve( - (_u64) (_indexingRange * GRAPH_SLACK_FACTOR * 1.05)); - - for (auto link : pruned_list) { - if (_conc_consolidate) - if (!_location_to_tag.contains(link)) - continue; - _final_graph[location].emplace_back(link); - } - - if (_conc_consolidate) - tlock.unlock(); - } - - assert(_final_graph[location].size() <= _indexingRange); - inter_insert(location, pruned_list, scratch); } template @@ -948,15 +927,17 @@ namespace diskann { // Truncate pool at maxc and initialize scratch spaces assert(std::is_sorted(pool.begin(), pool.end())); + assert(result.size() == 0); if (pool.size() > maxc) pool.resize(maxc); - auto &occlude_factor = scratch->occlude_factor(); + std::vector &occlude_factor = scratch->occlude_factor(); // occlude_list can be called with the same scratch more than once by // search_for_point_and_add_link through inter_insert. occlude_factor.clear(); // Initialize occlude_factor to pool.size() many 0.0f values for correctness occlude_factor.insert(occlude_factor.end(), pool.size(), 0.0f); + float cur_alpha = 1; while (cur_alpha <= alpha && result.size() < degree) { // used for MIPS, where we store a value of eps in cur_alpha to @@ -1027,11 +1008,11 @@ namespace diskann { _max_observed_degree = (std::max)(_max_observed_degree, range); - // If using _pq_build, over-ride the PQ distances with actual distances + // If using _pq_build, over-write the PQ distances with actual distances if (_pq_dist) { - for (auto iter : pool) - iter.distance = _distance->compare( - _data + _aligned_dim * (size_t) iter.id, + for (auto& ngh : pool) + ngh.distance = _distance->compare( + _data + _aligned_dim * (size_t) ngh.id, _data + _aligned_dim * (size_t) location, (unsigned) _aligned_dim); } @@ -1078,14 +1059,15 @@ namespace diskann { des_pool.emplace_back(n); prune_needed = false; } else { + copy_of_neighbors.reserve(des_pool.size() + 1); copy_of_neighbors = des_pool; + copy_of_neighbors.push_back(n); prune_needed = true; } } } // des lock is released by this point if (prune_needed) { - copy_of_neighbors.push_back(n); tsl::robin_set dummy_visited(0); std::vector dummy_pool(0); @@ -1110,10 +1092,7 @@ namespace diskann { { LockGuard guard(_locks[des]); - _final_graph[des].clear(); - for (auto new_nbr : new_out_neighbors) { - _final_graph[des].emplace_back(new_nbr); - } + _final_graph[des] = new_out_neighbors; } } } @@ -1178,7 +1157,18 @@ namespace diskann { ScratchStoreManager> manager(_query_scratch); auto scratch = manager.scratch_space(); - search_for_point_and_add_links(node, _indexingQueueSize, scratch); + std::vector pruned_list; + search_for_point_and_prune(node, _indexingQueueSize, pruned_list, + scratch); + { + LockGuard guard(_locks[node]); + _final_graph[node].reserve( + (_u64) (_indexingRange * GRAPH_SLACK_FACTOR * 1.05)); + _final_graph[node] = pruned_list; + assert(_final_graph[node].size() <= _indexingRange); + } + + inter_insert(node, pruned_list, scratch); if (node_ctr % 100000 == 0) { diskann::cout << "\r" << (100.0 * node_ctr) / (visit_order.size()) @@ -1331,13 +1321,18 @@ namespace diskann { __FUNCSIG__, __FILE__, __LINE__); } - _nd = num_points_to_load; + std::unique_lock ul(_update_lock); + + { + std::unique_lock tl(_tag_lock); + _nd = num_points_to_load; - memcpy((char *) _data, (char *) data, _aligned_dim * _nd * sizeof(T)); + memcpy((char *) _data, (char *) data, _aligned_dim * _nd * sizeof(T)); - if (_normalize_vecs) { - for (uint64_t i = 0; i < num_points_to_load; i++) { - normalize(_data + _aligned_dim * i, _aligned_dim); + if (_normalize_vecs) { + for (uint64_t i = 0; i < num_points_to_load; i++) { + normalize(_data + _aligned_dim * i, _aligned_dim); + } } } @@ -1349,6 +1344,7 @@ namespace diskann { const size_t num_points_to_load, Parameters ¶meters, const std::vector &tags) { + std::unique_lock ul(_update_lock); if (num_points_to_load == 0) throw ANNException("Do not call build with 0 points", -1, __FUNCSIG__, __FILE__, __LINE__); @@ -1442,7 +1438,10 @@ namespace diskann { diskann::cout << "Using only first " << num_points_to_load << " from file.. " << std::endl; - _nd = num_points_to_load; + { + std::unique_lock tl(_tag_lock); + _nd = num_points_to_load; + } build_with_data_populated(parameters, tags); } @@ -1453,6 +1452,7 @@ namespace diskann { std::vector tags; if (_enable_tags) { + std::unique_lock tl(_tag_lock); if (tag_filename == nullptr) { throw ANNException("Tag filename is null, while _enable_tags is set", -1, __FUNCSIG__, __FILE__, __LINE__); @@ -1511,13 +1511,10 @@ namespace diskann { std::vector init_ids; init_ids.push_back(_start); - std::shared_lock lock(_update_lock); - auto retval = iterate_to_fixed_point(query, L, init_ids, scratch, true, true); - - auto &best_L_nodes = scratch->best_l_nodes(); + NeighborPriorityQueue &best_L_nodes = scratch->best_l_nodes(); size_t pos = 0; for (int i = 0; i < best_L_nodes.size(); ++i) { @@ -1610,11 +1607,13 @@ namespace diskann { template size_t Index::get_num_points() { + std::shared_lock tl(_tag_lock); return _nd; } template size_t Index::get_max_points() { + std::shared_lock tl(_tag_lock); return _max_points; } @@ -1652,57 +1651,83 @@ namespace diskann { return -2; } - std::unique_lock update_lock(_update_lock); + std::unique_lock ul(_update_lock); + std::unique_lock tl(_tag_lock); + std::unique_lock dl(_delete_lock); + if (_data_compacted) { for (unsigned slot = (unsigned) _nd; slot < _max_points; ++slot) { _empty_slots.insert(slot); } } - _lazy_done = false; return 0; } template inline void Index::process_delete( - const tsl::robin_set &old_delete_set, size_t i, - const unsigned &range, const unsigned &maxc, const float &alpha, + const tsl::robin_set &old_delete_set, size_t loc, + const unsigned range, const unsigned maxc, const float alpha, InMemQueryScratch *scratch) { - tsl::robin_set candidate_set; - std::vector expanded_nghrs; + tsl::robin_set &expanded_nodes_set = + scratch->expanded_nodes_set(); + std::vector &expanded_nghrs_vec = scratch->expanded_nodes_vec(); - bool modify = false; + // If this condition were not true, deadlock could result + assert(old_delete_set.find(loc) == old_delete_set.end()); + + std::vector adj_list; + { + // Acquire and release lock[loc] before acquiring locks for neighbors + std::unique_lock adj_list_lock; + if (_conc_consolidate) + adj_list_lock = std::unique_lock(_locks[loc]); + adj_list = _final_graph[loc]; + } - for (auto ngh : _final_graph[(_u32) i]) { - if (old_delete_set.find(ngh) != old_delete_set.end()) { + bool modify = false; + for (auto ngh : adj_list) { + if (old_delete_set.find(ngh) == old_delete_set.end()) { + expanded_nodes_set.insert(ngh); + } else { modify = true; - // Add outgoing links from + std::unique_lock ngh_lock; if (_conc_consolidate) - _locks[ngh].lock(); + ngh_lock = std::unique_lock(_locks[ngh]); for (auto j : _final_graph[ngh]) - if (old_delete_set.find(j) == old_delete_set.end()) - candidate_set.insert(j); - if (_conc_consolidate) - _locks[ngh].unlock(); - } else { - candidate_set.insert(ngh); + if (j != loc && old_delete_set.find(j) == old_delete_set.end()) + expanded_nodes_set.insert(j); } } + if (modify) { - for (auto j : candidate_set) { - expanded_nghrs.push_back( - Neighbor(j, _distance->compare(_data + _aligned_dim * i, - _data + _aligned_dim * (size_t) j, - (unsigned) _aligned_dim))); + if (expanded_nodes_set.size() <= range) { + std::unique_lock adj_list_lock(_locks[loc]); + _final_graph[loc].clear(); + for (auto &ngh : expanded_nodes_set) + _final_graph[loc].push_back(ngh); + } else { + // Create a pool of Neighbor candidates from the expanded_nodes_set + expanded_nghrs_vec.reserve(expanded_nodes_set.size()); + for (auto &ngh : expanded_nodes_set) { + expanded_nghrs_vec.emplace_back( + ngh, _distance->compare(_data + _aligned_dim * loc, + _data + _aligned_dim * ngh, + (unsigned) _aligned_dim)); + } + std::sort(expanded_nghrs_vec.begin(), expanded_nghrs_vec.end()); + std::vector &occlude_list_output = + scratch->occlude_list_output(); + occlude_list(loc, expanded_nghrs_vec, alpha, range, maxc, + occlude_list_output, scratch, &old_delete_set); + std::unique_lock adj_list_lock(_locks[loc]); + _final_graph[loc] = occlude_list_output; } - - std::sort(expanded_nghrs.begin(), expanded_nghrs.end()); - occlude_list(i, expanded_nghrs, alpha, range, maxc, _final_graph[i], - scratch, &old_delete_set); } } - + + // Returns number of live points left after consolidation template consolidation_report Index::consolidate_deletes( @@ -1721,13 +1746,13 @@ namespace diskann { throw ANNException(err, -1, __FUNCSIG__, __FILE__, __LINE__); } - if (_location_to_tag.size() + _delete_set.size() != _nd) { + if (_location_to_tag.size() + _delete_set->size() != _nd) { diskann::cerr << "Error: _location_to_tag.size (" - << _location_to_tag.size() << ") + _delete_set.size (" - << _delete_set.size() << ") != _nd(" << _nd << ") "; + << _location_to_tag.size() << ") + _delete_set->size (" + << _delete_set->size() << ") != _nd(" << _nd << ") "; return consolidation_report(diskann::consolidation_report::status_code:: INCONSISTENT_COUNT_ERROR, - 0, 0, 0, 0, 0, 0); + 0, 0, 0, 0, 0, 0, 0); } if (_location_to_tag.size() != _tag_to_location.size()) { @@ -1750,15 +1775,21 @@ namespace diskann { << std::endl; return consolidation_report( diskann::consolidation_report::status_code::LOCK_FAIL, 0, 0, 0, 0, 0, - 0); + 0, 0); } diskann::cout << "Starting consolidate_deletes... "; - tsl::robin_set old_delete_set; + std::unique_ptr> old_delete_set( + new tsl::robin_set); { std::unique_lock dl(_delete_lock); - _delete_set.swap(old_delete_set); + std::swap(_delete_set, old_delete_set); + } + + if (old_delete_set->find(_start) != old_delete_set->end()) { + throw diskann::ANNException("ERROR: start node has been deleted", -1, + __FUNCSIG__, __FILE__, __LINE__); } const unsigned range = params.Get("R"); @@ -1768,45 +1799,46 @@ namespace diskann { ? omp_get_num_threads() : params.Get("num_threads"); + unsigned num_calls_to_process_delete = 0; diskann::Timer timer; -#pragma omp parallel for num_threads(num_threads) schedule(dynamic, 8192) +#pragma omp parallel for num_threads(num_threads) schedule(dynamic, 8192) \ + reduction(+:num_calls_to_process_delete) for (_s64 loc = 0; loc < (_s64) _max_points; loc++) { - if (old_delete_set.find((_u32) loc) == old_delete_set.end() && + if (old_delete_set->find((_u32) loc) == old_delete_set->end() && !_empty_slots.is_in_set((_u32) loc)) { ScratchStoreManager> manager(_query_scratch); auto scratch = manager.scratch_space(); - if (_conc_consolidate) { - LockGuard adj_list_lock(_locks[loc]); - process_delete(old_delete_set, loc, range, maxc, alpha, scratch); - } else { - process_delete(old_delete_set, loc, range, maxc, alpha, scratch); - } + process_delete(*old_delete_set, loc, range, maxc, alpha, scratch); + num_calls_to_process_delete += 1; } } for (_s64 loc = _max_points; loc < (_s64) (_max_points + _num_frozen_pts); loc++) { - LockGuard adj_list_lock(_locks[loc]); ScratchStoreManager> manager(_query_scratch); auto scratch = manager.scratch_space(); - process_delete(old_delete_set, loc, range, maxc, alpha, scratch); + process_delete(*old_delete_set, loc, range, maxc, alpha, scratch); + num_calls_to_process_delete += 1; } std::unique_lock tl(_tag_lock); - size_t ret_nd = release_locations(old_delete_set); + size_t ret_nd = release_locations(*old_delete_set); + size_t max_points = _max_points; + size_t empty_slots_size = _empty_slots.size(); + + std::shared_lock dl(_delete_lock); + size_t delete_set_size = _delete_set->size(); + size_t old_delete_set_size = old_delete_set->size(); if (!_conc_consolidate) { update_lock.unlock(); } - if (_delete_set.size() == 0) - _lazy_done = false; - double duration = timer.elapsed() / 1000000.0; diskann::cout << " done in " << duration << " seconds." << std::endl; return consolidation_report( - diskann::consolidation_report::status_code::SUCCESS, ret_nd, - this->_max_points, _empty_slots.size(), old_delete_set.size(), - _delete_set.size(), duration); + diskann::consolidation_report::status_code::SUCCESS, ret_nd, max_points, + empty_slots_size, old_delete_set_size, delete_set_size, + num_calls_to_process_delete, duration); } template @@ -1850,13 +1882,11 @@ namespace diskann { return; } - std::unique_lock cl(_consolidate_lock); - - if (_delete_set.size() > 0) { + if (_delete_set->size() > 0) { throw ANNException( - "Can not compact data when index has non-trivial _delete_set of " + "Can not compact data when index has non-empty _delete_set of " "size: " + - std::to_string(_delete_set.size()), + std::to_string(_delete_set->size()), -1, __FUNCSIG__, __FILE__, __LINE__); } @@ -1880,44 +1910,32 @@ namespace diskann { new_location[old_location] = old_location; } - // If cur node is removed, replace it. - if (_delete_set.find(_start) != _delete_set.end()) { - diskann::cerr << "Replacing cur node which has been deleted... " - << std::flush; - auto old_ep = _start; - // First active neighbor of old cur node is new cur node - for (auto iter : _final_graph[_start]) - if (_delete_set.find(iter) != _delete_set.end()) { - _start = iter; - break; - } - if (_start == old_ep) { - throw diskann::ANNException( - "ERROR: Did not find a replacement for cur node.", -1, __FUNCSIG__, - __FILE__, __LINE__); - } else { - assert(_delete_set.find(_start) == _delete_set.end()); - } + // If start node is removed, throw an exception + if (_start < _max_points && !_location_to_tag.contains(_start)) { + throw diskann::ANNException("ERROR: Start node deleted.", -1, __FUNCSIG__, + __FILE__, __LINE__); } size_t num_dangling = 0; for (unsigned old = 0; old < _max_points + _num_frozen_pts; ++old) { + std::vector new_adj_list; + if ((new_location[old] < _max_points) // If point continues to exist || (old >= _max_points && old < _max_points + _num_frozen_pts)) { - for (size_t i = 0; i < _final_graph[old].size(); ++i) { - if (empty_locations.find(_final_graph[old][i]) != - empty_locations.end()) { + new_adj_list.reserve(_final_graph[old].size()); + for (auto ngh_iter : _final_graph[old]) { + if (empty_locations.find(ngh_iter) != empty_locations.end()) { ++num_dangling; diskann::cerr << "Error in compact_data(). _final_graph[" << old - << "][" << i << "] = " << _final_graph[old][i] + << "] has neighbor " << ngh_iter << " which is a location not associated with any tag." << std::endl; - _final_graph[old].erase(_final_graph[old].begin() + i); - i--; + } else { - _final_graph[old][i] = new_location[_final_graph[old][i]]; + new_adj_list.push_back(new_location[ngh_iter]); } } + _final_graph[old].swap(new_adj_list); // Move the data and adj list to the correct position if (new_location[old] != old) { @@ -1942,14 +1960,13 @@ namespace diskann { _tag_to_location[tag] = new_location[pos._key]; } _location_to_tag.clear(); - for (auto iter : _tag_to_location) { + for (const auto &iter : _tag_to_location) { _location_to_tag.set(iter.second, iter.first); } for (_u64 old = _nd; old < _max_points; ++old) { _final_graph[old].clear(); } - _delete_set.clear(); _empty_slots.clear(); for (auto i = _nd; i < _max_points; i++) { _empty_slots.insert((uint32_t) i); @@ -1960,6 +1977,9 @@ namespace diskann { << timer.elapsed() / 1000000. << "s." << std::endl; } + // + // Caller must hold unique _tag_lock and _delete_lock before calling this + // template int Index::reserve_location() { if (_nd >= _max_points) { @@ -1973,13 +1993,11 @@ namespace diskann { // consecutive locations. location = (unsigned) _nd; } else { - // no need of delete_lock here, _tag_lock will ensure lazy delete does - // not update empty slots assert(_empty_slots.size() != 0); assert(_empty_slots.size() + _nd == _max_points); location = _empty_slots.pop_any(); - _delete_set.erase(location); + _delete_set->erase(location); } ++_nd; @@ -2000,7 +2018,7 @@ namespace diskann { template size_t Index::release_locations( - tsl::robin_set &locations) { + const tsl::robin_set &locations) { for (auto location : locations) { if (_empty_slots.is_in_set(location)) throw ANNException( @@ -2102,29 +2120,34 @@ namespace diskann { std::shared_lock shared_ul(_update_lock); std::unique_lock tl(_tag_lock); + std::unique_lock dl(_delete_lock); // Find a vacant location in the data array to insert the new point auto location = reserve_location(); if (location == -1) { #if EXPAND_IF_FULL + dl.unlock(); tl.unlock(); shared_ul.unlock(); { std::unique_lock ul(_update_lock); tl.lock(); + dl.lock(); if (_nd >= _max_points) { auto new_max_points = (size_t) (_max_points * INDEX_GROWTH_FACTOR); resize(new_max_points); } + dl.unlock(); tl.unlock(); ul.unlock(); } shared_ul.lock(); tl.lock(); + dl.lock(); location = reserve_location(); if (location == -1) { @@ -2136,6 +2159,7 @@ namespace diskann { return -1; #endif } + dl.unlock(); // Insert tag and mapping to location if (_enable_tags) { @@ -2159,11 +2183,36 @@ namespace diskann { } // Find and add appropriate graph edges - std::vector pruned_list; - ScratchStoreManager> manager(_query_scratch); auto scratch = manager.scratch_space(); - search_for_point_and_add_links(location, _indexingQueueSize, scratch); + std::vector pruned_list; + search_for_point_and_prune(location, _indexingQueueSize, pruned_list, + scratch); + { + std::shared_lock tlock(_tag_lock, + std::defer_lock); + if (_conc_consolidate) + tlock.lock(); + + LockGuard guard(_locks[location]); + _final_graph[location].clear(); + _final_graph[location].reserve( + (_u64) (_indexingRange * GRAPH_SLACK_FACTOR * 1.05)); + + for (auto link : pruned_list) { + if (_conc_consolidate) + if (!_location_to_tag.contains(link)) + continue; + _final_graph[location].emplace_back(link); + } + assert(_final_graph[location].size() <= _indexingRange); + + if (_conc_consolidate) + tlock.unlock(); + } + + inter_insert(location, pruned_list, scratch); + return 0; } @@ -2172,7 +2221,6 @@ namespace diskann { std::shared_lock ul(_update_lock); std::unique_lock tl(_tag_lock); std::unique_lock dl(_delete_lock); - _lazy_done = true; _data_compacted = false; if (_tag_to_location.find(tag) == _tag_to_location.end()) { @@ -2182,7 +2230,7 @@ namespace diskann { assert(_tag_to_location[tag] < _max_points); const auto location = _tag_to_location[tag]; - _delete_set.insert(location); + _delete_set->insert(location); _location_to_tag.erase(location); _tag_to_location.erase(tag); @@ -2199,7 +2247,6 @@ namespace diskann { std::shared_lock ul(_update_lock); std::unique_lock tl(_tag_lock); std::unique_lock dl(_delete_lock); - _lazy_done = true; _data_compacted = false; for (auto tag : tags) { @@ -2207,7 +2254,7 @@ namespace diskann { failed_tags.push_back(tag); } else { const auto location = _tag_to_location[tag]; - _delete_set.insert(location); + _delete_set->insert(location); _location_to_tag.erase(location); _tag_to_location.erase(tag); } @@ -2229,7 +2276,12 @@ namespace diskann { } template - void Index::print_status() const { + void Index::print_status() { + std::shared_lock ul(_update_lock); + std::shared_lock cl(_consolidate_lock); + std::shared_lock tl(_tag_lock); + std::shared_lock dl(_delete_lock); + diskann::cout << "------------------- Index object: " << (uint64_t) this << " -------------------" << std::endl; diskann::cout << "Number of points: " << _nd << std::endl; @@ -2241,15 +2293,16 @@ namespace diskann { diskann::cout << "Number of empty slots: " << _empty_slots.size() << std::endl; diskann::cout << std::boolalpha - << "Data compacted: " << this->_data_compacted - << " Lazy done: " << this->_lazy_done << std::endl; + << "Data compacted: " << this->_data_compacted << std::endl; diskann::cout << "---------------------------------------------------------" "------------" << std::endl; } template - void Index::count_nodes_at_bfs_levels() const { + void Index::count_nodes_at_bfs_levels() { + std::unique_lock ul(_update_lock); + boost::dynamic_bitset<> visited(_max_points + _num_frozen_pts); size_t MAX_BFS_LEVELS = 32; @@ -2285,15 +2338,16 @@ namespace diskann { template void Index::optimize_index_layout() { // use after build or load - if (_dynamic_index) - throw ANNException( + if (_dynamic_index) { + throw diskann::ANNException( "Optimize_index_layout not implemented for dyanmic indices", -1, __FUNCSIG__, __FILE__, __LINE__); + } _data_len = (_aligned_dim + 1) * sizeof(float); _neighbor_len = (_max_observed_degree + 1) * sizeof(unsigned); _node_size = _data_len + _neighbor_len; - _opt_graph = (char *) malloc(_node_size * _nd); + _opt_graph = new char[_node_size * _nd]; DistanceFastL2 *dist_fast = (DistanceFastL2 *) _distance; for (unsigned i = 0; i < _nd; i++) { char *cur_node_offset = _opt_graph + i * _node_size; diff --git a/src/scratch.cpp b/src/scratch.cpp index fd888216e..adeb98477 100644 --- a/src/scratch.cpp +++ b/src/scratch.cpp @@ -53,6 +53,10 @@ namespace diskann { _id_scratch.clear(); _dist_scratch.clear(); + + _expanded_nodes_set.clear(); + _expanded_nghrs_vec.clear(); + _occlude_list_output.clear(); } template