Skip to content

Commit

Permalink
Fix locking errors in streaming code (#214)
Browse files Browse the repository at this point in the history
* report number of process_delete calls in consolidation_report

* correct locking comments

* convert _delete_set member in index class to a unique_ptr

* Add shared delete_lock to process_delete; move adj list lock inside process_delete

* insert acqires exclusive delete_lock before calling reserve_location(); change comments on locking requirements for calling reserve_location()

* In search_for_points_and_add_links(), remove unnecessary checks of pool against delete_set. The function already checks the final results against _location_to_tag

* Copy by reference when scratch spaces are returned

* use reference to iterate over vector when modifying it's contents

* minor changes to help compiler optimize

* acquire shared delete lock to check delete set size

* change locking in process_delete to avoid deadlock

* remove _lazy_done member from Index class. It can be inferred from _delete_set.size()

* Add locks to compact_data; remove _delete_set->clear since it's supposed to be empty here; use iterators to renumber edges

* In process_delete, clear _final_graph[loc] before passing as output buffer to occlude_list

* Use a set to collect unique nodes for candidate pool to pass to occlude_list

* throw an exception if conslidate_delete or compact_data finds that the _start node has been deleted

* add shared locks to print_status in Index class

* fix to check on deleted start point

* acquire tag and delete lock in enable delete

* In process_delete, renamed iterator variable and conditioned the call to occlude_list on the candidate pool being larger than R

* In process_delete, acquire lock on adj list before clearing it

* Refactor protected member search_for_points_and_add_links so that tag_lock is moved to public functions

* acquire _locks[loc] later in process_delete control flow

* pre-allocating scratch space needed in process_delete
  • Loading branch information
harsha-simhadri authored Jan 31, 2023
1 parent 032cab4 commit 351bf79
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 213 deletions.
99 changes: 62 additions & 37 deletions include/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,31 @@ namespace diskann {
};
status_code _status;
size_t _active_points, _max_points, _empty_slots, _slots_released,
_delete_set_size;
_delete_set_size, _num_calls_to_process_delete;
double _time;

consolidation_report(status_code status, size_t active_points,
size_t max_points, size_t empty_slots,
size_t slots_released, size_t delete_set_size,
double time_secs)
size_t num_calls_to_process_delete, double time_secs)
: _status(status), _active_points(active_points),
_max_points(max_points), _empty_slots(empty_slots),
_slots_released(slots_released), _delete_set_size(delete_set_size),
_num_calls_to_process_delete(num_calls_to_process_delete),
_time(time_secs) {
}
};

template<typename T, typename TagT = uint32_t>
class Index {
/**************************************************************************
*
* Public functions acquire one or more of _update_lock, _consolidate_lock,
* _tag_lock, _delete_lock before calling protected functions which DO NOT
* acquire these locks. They might acquire locks on _locks[i]
*
**************************************************************************/

public:
// Constructor for Bulk operations and for creating the index object solely
// for loading a prexisting index.
Expand Down Expand Up @@ -125,7 +134,7 @@ namespace diskann {
// Set starting point to a random point on a sphere of certain radius
DISKANN_DLLEXPORT void set_start_point_at_random(T radius);

// For Bulk Index FastL2 search, we interleave the data with graph
// For FastL2 search on a static index, we interleave the data with graph
DISKANN_DLLEXPORT void optimize_index_layout();

// For FastL2 search on optimized layout
Expand Down Expand Up @@ -183,9 +192,9 @@ namespace diskann {
// memory should be allocated for vec before calling this function
DISKANN_DLLEXPORT int get_vector_by_tag(TagT &tag, T *vec);

DISKANN_DLLEXPORT void print_status() const;
DISKANN_DLLEXPORT void print_status();

DISKANN_DLLEXPORT void count_nodes_at_bfs_levels() const;
DISKANN_DLLEXPORT void count_nodes_at_bfs_levels();

// This variable MUST be updated if the number of entries in the metadata
// change.
Expand All @@ -203,23 +212,25 @@ namespace diskann {
Index<T, TagT> &operator=(const Index<T, TagT> &) = delete;

// Use after _data and _nd have been populated
// Acquire exclusive _update_lock before calling
void build_with_data_populated(Parameters &parameters,
const std::vector<TagT> &tags);

// generates 1 frozen point that will never be deleted from the graph
// This is not visible to the user
int generate_frozen_point();

// determines navigating node of the graph by calculating medoid of data
// determines navigating node of the graph by calculating medoid of datafopt
unsigned calculate_entry_point();

std::pair<uint32_t, uint32_t> iterate_to_fixed_point(
const T *node_coords, const unsigned Lindex,
const std::vector<unsigned> &init_ids, InMemQueryScratch<T> *scratch,
bool ret_frozen = true, bool search_invocation = false);

void search_for_point_and_add_links(int location, _u32 Lindex,
InMemQueryScratch<T> *scratch);
void search_for_point_and_prune(int location, _u32 Lindex,
std::vector<unsigned> &pruned_list,
InMemQueryScratch<T> *scratch);

void prune_neighbors(const unsigned location, std::vector<Neighbor> &pool,
std::vector<unsigned> &pruned_list,
Expand All @@ -230,6 +241,8 @@ namespace diskann {
const float alpha, std::vector<unsigned> &pruned_list,
InMemQueryScratch<T> *scratch);

// Prunes candidates in @pool to a shorter list @result
// @pool must be sorted before calling
void occlude_list(
const unsigned location, std::vector<Neighbor> &pool, const float alpha,
const unsigned degree, const unsigned maxc,
Expand All @@ -243,30 +256,34 @@ namespace diskann {
void inter_insert(unsigned n, std::vector<unsigned> &pruned_list,
InMemQueryScratch<T> *scratch);

// Acquire exclusive _update_lock before calling
void link(Parameters &parameters);

// Acquire _tag_lock before calling
int reserve_location();
// Acquire exclusive _tag_lock and _delete_lock before calling
int reserve_location();

// Acquire exclusive _tag_lock before calling
size_t release_location(int location);
size_t release_locations(tsl::robin_set<unsigned> &locations);
size_t release_locations(const tsl::robin_set<unsigned> &locations);

// Resize the index when no slots are left for insertion.
// MUST acquire _num_points_lock and _update_lock before calling.
// Acquire exclusive _update_lock and _tag_lock before calling.
void resize(size_t new_max_points);

// Take an unique lock on _update_lock and _consolidate_lock
// before calling these functions.
// Acquire unique lock on _update_lock, _consolidate_lock, _tag_lock
// and _delete_lock before calling these functions.
// Renumber nodes, update tag and location maps and compact the
// graph, mode = _consolidated_order in case of lazy deletion and
// _compacted_order in case of eager deletion
DISKANN_DLLEXPORT void compact_data();
DISKANN_DLLEXPORT void compact_frozen_point();

// Remove deleted nodes from adj list of node i and absorb edges from
// deleted neighbors Acquire _locks[i] prior to calling for thread-safety
// Remove deleted nodes from adjacency list of node loc
// Replace removed neighbors with second order neighbors.
// Also acquires _locks[i] for i = loc and out-neighbors of loc.
void process_delete(const tsl::robin_set<unsigned> &old_delete_set,
size_t i, const unsigned &range, const unsigned &maxc,
const float &alpha, InMemQueryScratch<T> *scratch);
size_t loc, const unsigned range, const unsigned maxc,
const float alpha, InMemQueryScratch<T> *scratch);

void initialize_query_scratch(uint32_t num_threads, uint32_t search_l,
uint32_t indexing_l, uint32_t r,
Expand Down Expand Up @@ -299,7 +316,7 @@ namespace diskann {

// Data
T *_data = nullptr;
char *_opt_graph;
char *_opt_graph = nullptr;

// Graph related data structures
std::vector<std::vector<unsigned>> _final_graph;
Expand Down Expand Up @@ -335,13 +352,6 @@ namespace diskann {
// Query scratch data structures
ConcurrentQueue<InMemQueryScratch<T> *> _query_scratch;

// data structures, flags and locks for dynamic indexing
tsl::sparse_map<TagT, unsigned> _tag_to_location;
natural_number_map<unsigned, TagT> _location_to_tag;

tsl::robin_set<unsigned> _delete_set;
natural_number_set<unsigned> _empty_slots;

// Flags for PQ based distance calculation
bool _pq_dist = false;
bool _use_opq = false;
Expand All @@ -350,23 +360,38 @@ namespace diskann {
bool _pq_generated = false;
FixedChunkPQTable _pq_table;

bool _lazy_done = false; // true if lazy deletions have been made
//
// Data structures, locks and flags for dynamic indexing and tags
//

// lazy_delete removes entry from _location_to_tag and _tag_to_location. If
// _location_to_tag does not resolve a location, infer that it was deleted.
tsl::sparse_map<TagT, unsigned> _tag_to_location;
natural_number_map<unsigned, TagT> _location_to_tag;

// _empty_slots has unallocated slots and those freed by consolidate_delete.
// _delete_set has locations marked deleted by lazy_delete. Will not be
// immediately available for insert. consolidate_delete will release these
// slots to _empty_slots.
natural_number_set<unsigned> _empty_slots;
std::unique_ptr<tsl::robin_set<unsigned>> _delete_set;

bool _data_compacted = true; // true if data has been compacted
bool _is_saved = false; // Gopal. Checking if the index is already saved.
bool _conc_consolidate = false; // use _lock while searching

// Per node lock, cardinality=max_points_
std::vector<non_recursive_mutex> _locks;

// If acquiring multiple locks below, acquire locks in the order below
// Acquire locks in the order below when acquiring multiple locks
std::shared_timed_mutex // RW mutex between save/load (exclusive lock) and
_update_lock; // search/inserts/deletes/consolidate (shared lock)
std::shared_timed_mutex
_consolidate_lock; // Ensure only one consolidate is ever active
std::shared_timed_mutex
_tag_lock; // RW lock for _tag_to_location and _location_to_tag
std::shared_timed_mutex
_delete_lock; // RW Lock on _delete_set and _empty_slots
std::shared_timed_mutex // Ensure only one consolidate or compact_data is
_consolidate_lock; // ever active
std::shared_timed_mutex // RW lock for _tag_to_location,
_tag_lock; // _location_to_tag, _empty_slots, _nd, _max_points
std::shared_timed_mutex // RW Lock on _delete_set and _data_compacted
_delete_lock; // variable

// Per node lock, cardinality=_max_points
std::vector<non_recursive_mutex> _locks;

static const float INDEX_GROWTH_FACTOR;
};
Expand Down
17 changes: 14 additions & 3 deletions include/scratch.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ namespace diskann {
inline uint32_t get_maxc() {
return _maxc;
}

inline T *aligned_query() {
return _aligned_query;
}
inline PQScratch<T> *pq_scratch() {
return _pq_scratch;
}

inline std::vector<Neighbor> &pool() {
return _pool;
}
Expand All @@ -65,7 +63,6 @@ namespace diskann {
inline std::vector<float> &occlude_factor() {
return _occlude_factor;
}

inline tsl::robin_set<unsigned> &inserted_into_pool_rs() {
return _inserted_into_pool_rs;
}
Expand All @@ -78,6 +75,15 @@ namespace diskann {
inline std::vector<float> &dist_scratch() {
return _dist_scratch;
}
inline tsl::robin_set<unsigned> &expanded_nodes_set() {
return _expanded_nodes_set;
}
inline std::vector<Neighbor> &expanded_nodes_vec() {
return _expanded_nghrs_vec;
}
inline std::vector<unsigned> &occlude_list_output() {
return _occlude_list_output;
}

private:
uint32_t _L;
Expand Down Expand Up @@ -116,6 +122,11 @@ namespace diskann {
// _dist_scratch must be > R*GRAPH_SLACK_FACTOR for iterate_to_fp
// _dist_scratch should be at least the size of id_scratch
std::vector<float> _dist_scratch;

// Buffers used in process delete, capacity increases as needed
tsl::robin_set<unsigned> _expanded_nodes_set;
std::vector<Neighbor> _expanded_nghrs_vec;
std::vector<unsigned> _occlude_list_output;
};

//
Expand Down
Loading

0 comments on commit 351bf79

Please sign in to comment.