Skip to content

Commit

Permalink
Fix TSAN warnings in indexing_test (#5149)
Browse files Browse the repository at this point in the history
  • Loading branch information
takuro-sato authored Apr 3, 2023
1 parent 44cffc5 commit 274683f
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .daily_canary
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
___ ___
(- -) (o o) | Y & +-
( V ) z O z O +---=---'
/--x-m- /--m-m---xXx--/--yY------
/--x-m- /--m-m---xXx--/--yY-----
2 changes: 1 addition & 1 deletion .threading_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This looks like a "job" for Threading Canary!!!!!!
This looks like a "job" for Threading Canary!!!!
1 change: 1 addition & 0 deletions include/ccf/indexing/strategies/seqnos_by_key_in_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace ccf::indexing::strategies

std::optional<SeqNoCollection> get_all_write_txs(const typename M::Key& key)
{
std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
return get_write_txs_in_range(key, 0, current_txid.seqno);
}
};
Expand Down
7 changes: 6 additions & 1 deletion include/ccf/indexing/strategies/visit_each_entry_in_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "ccf/byte_vector.h"
#include "ccf/indexing/strategy.h"
#include "ccf/pal/locking.h"

namespace ccf::indexing::strategies
{
Expand All @@ -14,6 +15,10 @@ namespace ccf::indexing::strategies
{
protected:
std::string map_name;

// Protect access to current_txid
ccf::pal::Mutex current_txid_lock;

ccf::TxID current_txid = {};

virtual void visit_entry(
Expand All @@ -34,6 +39,6 @@ namespace ccf::indexing::strategies

nlohmann::json describe() override;

ccf::TxID get_indexed_watermark() const;
ccf::TxID get_indexed_watermark();
};
}
4 changes: 2 additions & 2 deletions src/apps/external_executor/external_executor_indexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace externalexecutor

// We were already trying to fetch this. If it's finished fetching,
// parse and store the result
const auto fetch_result = bucket_value->fetch_result;
const auto fetch_result = bucket_value->fetch_result.load();
switch (fetch_result)
{
case (ccf::indexing::FetchResult::Fetching):
Expand Down Expand Up @@ -149,4 +149,4 @@ namespace externalexecutor
{
return impl_index->fetch_data(key);
}
}
}
5 changes: 3 additions & 2 deletions src/indexing/lfs_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ namespace ccf::indexing

struct FetchResult
{
enum
enum FetchResultType
{
Fetching,
Loaded,
NotFound,
Corrupt,
} fetch_result;
};
std::atomic<FetchResultType> fetch_result;

LFSKey key;

Expand Down
17 changes: 14 additions & 3 deletions src/indexing/strategies/seqnos_by_key_bucketed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ namespace ccf::indexing::strategies
using BucketValue = std::pair<FetchResultPtr, SeqNoCollection>;
LRU<BucketKey, BucketValue> old_results;

// When results_access and current_txid_lock need to be
// taken at the same time, results_access should be first
ccf::pal::Mutex results_access;

std::string name;

std::shared_ptr<AbstractLFSAccess> lfs_access;

ccf::pal::Mutex& current_txid_lock;
ccf::TxID& current_txid;

Impl(
const std::string& name_,
ccf::pal::Mutex& current_txid_lock_,
ccf::TxID& current_txid_,
const std::shared_ptr<AbstractLFSAccess>& lfs_access_,
size_t seqnos_per_bucket_,
Expand All @@ -50,6 +55,7 @@ namespace ccf::indexing::strategies
old_results(max_buckets_),
name(name_),
lfs_access(lfs_access_),
current_txid_lock(current_txid_lock_),
current_txid(current_txid_)
{
if (lfs_access == nullptr)
Expand Down Expand Up @@ -166,7 +172,8 @@ namespace ccf::indexing::strategies
fmt::format("Range goes backwards: {} -> {}", from, to));
}

if (to > current_txid.seqno)
if (std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
to > current_txid.seqno)
{
// If the requested range hasn't been populated yet, indicate
// that with nullopt
Expand Down Expand Up @@ -221,7 +228,7 @@ namespace ccf::indexing::strategies
// parse and store the result
if (bucket_value.first != nullptr)
{
const auto fetch_result = bucket_value.first->fetch_result;
const auto fetch_result = bucket_value.first->fetch_result.load();
switch (fetch_result)
{
case (FetchResult::Fetching):
Expand Down Expand Up @@ -266,7 +273,10 @@ namespace ccf::indexing::strategies
// for safety, and consistency with the simple indexing
// strategies currently used, this re-indexes everything from
// the start of time.
current_txid = {};
{
std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
current_txid = {};
}
old_results.clear();
current_results.clear();

Expand Down Expand Up @@ -436,6 +446,7 @@ namespace ccf::indexing::strategies

impl = std::make_shared<Impl>(
get_name(),
current_txid_lock,
current_txid,
node_context.get_subsystem<AbstractLFSAccess>(),
seqnos_per_bucket_,
Expand Down
5 changes: 4 additions & 1 deletion src/indexing/strategies/visit_each_entry_in_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ namespace ccf::indexing::strategies
visit_entry(tx_id, k, v);
return true;
});
std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
current_txid = tx_id;
}

std::optional<ccf::SeqNo> VisitEachEntryInMap::next_requested()
{
std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
return current_txid.seqno + 1;
}

Expand All @@ -41,8 +43,9 @@ namespace ccf::indexing::strategies
return j;
}

ccf::TxID VisitEachEntryInMap::get_indexed_watermark() const
ccf::TxID VisitEachEntryInMap::get_indexed_watermark()
{
std::lock_guard<ccf::pal::Mutex> guard(current_txid_lock);
return current_txid;
}
}
4 changes: 2 additions & 2 deletions src/indexing/test/indexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ TEST_CASE_TEMPLATE(
}

using namespace std::chrono_literals;
const auto max_multithread_run_time = 10s;
const auto max_multithread_run_time = 100s;

// Uses the real classes, and access + update them concurrently
TEST_CASE(
Expand Down Expand Up @@ -957,4 +957,4 @@ TEST_CASE(
ringbuffer_flusher.join();
index_ticker.join();
watchdog.join();
}
}
5 changes: 0 additions & 5 deletions tsan_env_suppressions
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,5 @@
deadlock:*/store.h
deadlock:*/untyped_map.h

# For indexing_test
race:*/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h
race:*/indexing/strategies/visit_each_entry_in_map.cpp
race:*/indexing/strategies/seqnos_by_key_bucketed.cpp

# For governance_test
race:*/node/*rpc/*frontend.h

0 comments on commit 274683f

Please sign in to comment.