From 6f6533316b1eb3f47d9510fe5d919e74f27a4b67 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 3 Feb 2025 14:05:43 +0200 Subject: [PATCH 1/2] refactor: simplify journal and restore streamer cancelation --- src/server/cluster/incoming_slot_migration.cc | 4 +- src/server/cluster/outgoing_slot_migration.cc | 21 +++---- src/server/common.cc | 8 +-- src/server/common.h | 59 +++++++++---------- src/server/dflycmd.cc | 2 +- src/server/journal/streamer.cc | 33 ++++++----- src/server/journal/streamer.h | 13 +--- src/server/protocol_client.cc | 2 +- src/server/rdb_save.cc | 12 ++-- src/server/replica.cc | 22 +++---- src/server/snapshot.cc | 6 +- 11 files changed, 84 insertions(+), 98 deletions(-) diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 4b75dcc8ba16..50ec68466fbe 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -59,7 +59,7 @@ class ClusterShardMigration { JournalReader reader{source, 0}; TransactionReader tx_reader; - while (!cntx->IsCancelled()) { + while (cntx->IsRunning()) { if (pause_) { ThisFiber::SleepFor(100ms); continue; @@ -126,7 +126,7 @@ class ClusterShardMigration { private: void ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { - if (cntx->IsCancelled()) { + if (!cntx->IsRunning()) { return; } if (!tx_data.IsGlobalCmd()) { diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index fc5d25bf56e0..046f258406e1 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -84,6 +84,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } void Cancel() { + // We don't care about errors during cancel + cntx_.SwitchErrorHandler([](auto ge) {}); // Close socket for clean disconnect. CloseSocket(); streamer_.Cancel(); @@ -194,13 +196,12 @@ void OutgoingMigration::SyncFb() { break; } - last_error_ = cntx_.GetError(); - cntx_.Reset(nullptr); - - if (last_error_) { - LOG(ERROR) << last_error_.Format(); + if (cntx_.IsError()) { + last_error_ = cntx_.GetError(); + LOG(ERROR) << last_error_; ThisFiber::SleepFor(1000ms); // wait some time before next retry } + cntx_.Reset(nullptr); VLOG(1) << "Connecting to target node"; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; @@ -246,7 +247,7 @@ void OutgoingMigration::SyncFb() { } OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); - if (cntx_.GetError()) { + if (cntx_.IsError()) { continue; } @@ -257,13 +258,13 @@ void OutgoingMigration::SyncFb() { OnAllShards([](auto& migration) { migration->PrepareSync(); }); } - if (cntx_.GetError()) { + if (cntx_.IsError()) { continue; } OnAllShards([](auto& migration) { migration->RunSync(); }); - if (cntx_.GetError()) { + if (cntx_.IsError()) { continue; } @@ -273,7 +274,7 @@ void OutgoingMigration::SyncFb() { VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } - if (cntx_.GetError()) { + if (cntx_.IsError()) { continue; } break; @@ -288,7 +289,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt; if (attempt > 1) { - if (cntx_.GetError()) { + if (cntx_.IsError()) { return true; } auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; diff --git a/src/server/common.cc b/src/server/common.cc index 7285ba5b1520..5537d45132cc 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -349,10 +349,6 @@ GenericError ExecutionState::GetError() const { return err_; } -const Cancellation* ExecutionState::GetCancellation() const { - return this; -} - void ExecutionState::ReportCancelError() { ReportError(std::make_error_code(errc::operation_canceled), "Context cancelled"); } @@ -363,7 +359,7 @@ void ExecutionState::Reset(ErrHandler handler) { unique_lock lk{err_mu_}; err_ = {}; err_handler_ = std::move(handler); - Cancellation::flag_.store(false, std::memory_order_relaxed); + state_.store(State::RUN, std::memory_order_relaxed); fb.swap(err_handler_fb_); lk.unlock(); fb.JoinIfNeeded(); @@ -402,7 +398,7 @@ GenericError ExecutionState::ReportErrorInternal(GenericError&& err) { // We can move err_handler_ because it should run at most once. if (err_handler_) err_handler_fb_ = fb2::Fiber("report_internal_error", std::move(err_handler_), err_); - Cancellation::Cancel(); + state_.store(State::ERROR, std::memory_order_relaxed); return err_; } diff --git a/src/server/common.h b/src/server/common.h index 828a53c63bc5..96fb21087312 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -205,23 +205,6 @@ using AggregateStatus = AggregateValue; static_assert(bool(facade::OpStatus::OK) == false, "Default intitialization should be a falsy OK value"); -// Simple wrapper interface around atomic cancellation flag. -struct Cancellation { - Cancellation() : flag_{false} { - } - - void Cancel() { - flag_.store(true, std::memory_order_relaxed); - } - - bool IsCancelled() const { - return flag_.load(std::memory_order_relaxed); - } - - protected: - std::atomic_bool flag_; -}; - // Error wrapper, that stores error_code and optional string message. class GenericError { public: @@ -246,45 +229,58 @@ class GenericError { // Thread safe utility to store the first non null generic error. using AggregateGenericError = AggregateValue; -// ExecutionState is a utility for managing error reporting and cancellation for complex tasks. -// -// When submitting an error with `Error`, only the first is stored (as in aggregate values). -// Then a special error handler is run, if present, and the ExecutionState is cancelled. The error -// handler is run in a separate handler to free up the caller. +// ExecutionState is a thread-safe utility for managing error reporting and cancellation for complex +// tasks. There are 3 states: RUN, CANCELLED, ERROR RUN and CANCELLED are just a state without any +// actions When report an error, only the first is stored, the next ones will be ignored. Then a +// special error handler is run, if present, and the ExecutionState is ERROR. The error handler is +// run in a separate handler to free up the caller. // // ReportCancelError() reporting an `errc::operation_canceled` error. -class ExecutionState : protected Cancellation { +class ExecutionState { public: using ErrHandler = std::function; ExecutionState() = default; - ExecutionState(ErrHandler err_handler) - : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} { + ExecutionState(ErrHandler err_handler) : err_handler_{std::move(err_handler)} { } ~ExecutionState(); // Cancels the context by submitting an `errc::operation_canceled` error. void ReportCancelError(); - using Cancellation::IsCancelled; - const Cancellation* GetCancellation() const; + + bool IsRunning() const { + return state_.load(std::memory_order_relaxed) == State::RUN; + } + + bool IsError() const { + return state_.load(std::memory_order_relaxed) == State::ERROR; + } + + bool IsCancelled() const { + return state_.load(std::memory_order_relaxed) == State::CANCELLED; + } + + void Cancel() { + state_.store(State::CANCELLED, std::memory_order_relaxed); + } GenericError GetError() const; // Report an error by submitting arguments for GenericError. // If this is the first error that occured, then the error handler is run - // and the context is cancelled. + // and the context state set to ERROR. template GenericError ReportError(T... ts) { return ReportErrorInternal(GenericError{std::forward(ts)...}); } - // Wait for error handler to stop, reset error and cancellation flag, assign new error handler. + // Wait for error handler to stop, reset error and state, assign new error handler. void Reset(ErrHandler handler); // Atomically replace the error handler if no error is present, and return the // current stored error. This function can be used to transfer cleanup responsibility safely // - // Beware, never do this manually in two steps. If you check for cancellation, + // Beware, never do this manually in two steps. If you check the state, // set the error handler and initialize resources, then the new error handler // will never run if the context was cancelled between the first two steps. GenericError SwitchErrorHandler(ErrHandler handler); @@ -293,9 +289,10 @@ class ExecutionState : protected Cancellation { void JoinErrorHandler(); private: - // Report error. GenericError ReportErrorInternal(GenericError&& err); + enum class State { RUN, CANCELLED, ERROR }; + std::atomic state_{State::RUN}; GenericError err_; ErrHandler err_handler_; util::fb2::Fiber err_handler_fb_; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 302e6bc2291f..a69f8c493b93 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -88,7 +88,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r << ", expecting " << shard->journal()->GetLsn(); return false; } - if (replica->cntx.IsCancelled()) { + if (!replica->cntx.IsRunning()) { return false; } VLOG(1) << "Replica lsn:" << flow->last_acked_lsn diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 18210f1b77e6..619303cd1786 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -42,7 +42,7 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx } JournalStreamer::~JournalStreamer() { - if (!cntx_->IsCancelled()) { + if (!cntx_->IsError()) { DCHECK_EQ(in_flight_bytes_, 0u); } VLOG(1) << "~JournalStreamer"; @@ -83,7 +83,7 @@ void JournalStreamer::Cancel() { VLOG(1) << "JournalStreamer::Cancel"; waker_.notifyAll(); journal_->UnregisterOnChange(journal_cb_id_); - if (!cntx_->IsCancelled()) { + if (!cntx_->IsError()) { WaitForInflightToComplete(); } } @@ -134,10 +134,12 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { DVLOG(3) << "Completing " << in_flight_bytes_; in_flight_bytes_ = 0; pending_buf_.Pop(); - if (ec && !IsStopped()) { - cntx_->ReportError(ec); - } else if (!pending_buf_.Empty() && !IsStopped()) { - AsyncWrite(); + if (cntx_->IsRunning()) { + if (ec) { + cntx_->ReportError(ec); + } else if (!pending_buf_.Empty()) { + AsyncWrite(); + } } // notify ThrottleIfNeeded or WaitForInflightToComplete that waits @@ -149,7 +151,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { } void JournalStreamer::ThrottleIfNeeded() { - if (IsStopped() || !IsStalled()) + if (!cntx_->IsRunning() || !IsStalled()) return; auto next = @@ -158,7 +160,7 @@ void JournalStreamer::ThrottleIfNeeded() { size_t sent_start = total_sent_; std::cv_status status = - waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next); + waker_.await_until([this]() { return !IsStalled() || !cntx_->IsRunning(); }, next); if (status == std::cv_status::timeout) { LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/" << sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_; @@ -188,7 +190,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal } void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { - if (fiber_cancelled_) + if (!cntx_->IsRunning()) return; VLOG(1) << "RestoreStreamer start"; @@ -206,16 +208,16 @@ void RestoreStreamer::Run() { PrimeTable* pt = &db_array_[0]->prime; do { - if (fiber_cancelled_) + if (!cntx_->IsRunning()) return; cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) { - if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt + if (!cntx_->IsRunning()) // Could be cancelled any time as Traverse may preempt return; db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); - if (fiber_cancelled_) // Could have been cancelled in above call too + if (!cntx_->IsRunning()) // Could have been cancelled in above call too return; std::lock_guard guard(big_value_mu_); @@ -231,7 +233,7 @@ void RestoreStreamer::Run() { ThisFiber::Yield(); last_yield = 0; } - } while (cursor && !fiber_cancelled_); + } while (cursor); VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString() << ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop; @@ -252,8 +254,7 @@ void RestoreStreamer::SendFinalize(long attempt) { writer.Write(entry); Write(std::move(sink).str()); - // TODO: is the intent here to flush everything? - // + // DFLYMIGRATE ACK command has a timeout so we want to send it only when LSN is ready to be sent ThrottleIfNeeded(); } @@ -263,7 +264,7 @@ RestoreStreamer::~RestoreStreamer() { void RestoreStreamer::Cancel() { auto sver = snapshot_version_; snapshot_version_ = 0; // to prevent double cancel in another fiber - fiber_cancelled_ = true; + cntx_->Cancel(); if (sver != 0) { db_slice_->UnregisterOnChange(sver); JournalStreamer::Cancel(); diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 73bc2a8343fc..46409d2f2016 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -47,7 +47,7 @@ class JournalStreamer { void ThrottleIfNeeded(); virtual bool ShouldWrite(const journal::JournalItem& item) const { - return !IsStopped(); + return cntx_->IsRunning(); } void WaitForInflightToComplete(); @@ -59,10 +59,6 @@ class JournalStreamer { void AsyncWrite(); void OnCompletion(std::error_code ec, size_t len); - bool IsStopped() const { - return cntx_->IsCancelled(); - } - bool IsStalled() const; journal::Journal* journal_; @@ -92,10 +88,6 @@ class RestoreStreamer : public JournalStreamer { void SendFinalize(long attempt); - bool IsSnapshotFinished() const { - return snapshot_finished_; - } - private: void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; @@ -122,8 +114,7 @@ class RestoreStreamer : public JournalStreamer { DbTableArray db_array_; uint64_t snapshot_version_ = 0; cluster::SlotSet my_slots_; - bool fiber_cancelled_ = false; - bool snapshot_finished_ = false; + ThreadLocalMutex big_value_mu_; Stats stats_; }; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 3ffcb0c63dc3..70901d4958b6 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -158,7 +158,7 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time // The context closes sock_. So if the context error handler has already // run we must not create a new socket. sock_mu_ syncs between the two // functions. - if (!cntx->IsCancelled()) { + if (cntx->IsRunning()) { if (sock_) { LOG_IF(WARNING, sock_->Close()) << "Error closing socket"; sock_.reset(nullptr); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index f904fe4bf10f..f084ee750e1a 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1061,7 +1061,7 @@ class RdbSaver::Impl final : public SliceSnapshot::SnapshotDataConsumerInterface void Finalize() override; // used only for legacy rdb save flows. - error_code ConsumeChannel(const Cancellation* cll); + error_code ConsumeChannel(const ExecutionState* cll); void FillFreqMap(RdbTypeFreqMap* dest) const; @@ -1161,7 +1161,7 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) return error_code{}; } -error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { +error_code RdbSaver::Impl::ConsumeChannel(const ExecutionState* es) { error_code io_error; string record; @@ -1170,11 +1170,11 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. while (channel_->Pop(record)) { - if (io_error || cll->IsCancelled()) + if (io_error || (!es->IsRunning())) continue; do { - if (cll->IsCancelled()) + if (!es->IsRunning()) continue; auto start = absl::GetCurrentTimeNanos(); @@ -1258,7 +1258,7 @@ void RdbSaver::Impl::WaitForSnapshottingFinish(EngineShard* shard) { } void RdbSaver::Impl::ConsumeData(std::string data, ExecutionState* cntx) { - if (cntx->IsCancelled()) { + if (!cntx->IsRunning()) { return; } if (channel_) { // Rdb write to channel @@ -1468,7 +1468,7 @@ error_code RdbSaver::SaveBody(const ExecutionState& cntx) { if (save_mode_ == SaveMode::RDB) { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); - error_code io_error = impl_->ConsumeChannel(cntx.GetCancellation()); + error_code io_error = impl_->ConsumeChannel(&cntx); if (io_error) { return io_error; } diff --git a/src/server/replica.cc b/src/server/replica.cc index 72f35208dcde..d7de6d96d151 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -92,7 +92,7 @@ error_code Replica::Start(facade::SinkReplyBuilder* builder) { CHECK(mythread); auto check_connection_error = [this, builder](error_code ec, const char* msg) -> error_code { - if (cntx_.IsCancelled()) { + if (!cntx_.IsRunning()) { builder->SendError("replication cancelled"); return std::make_error_code(errc::operation_canceled); } @@ -578,7 +578,7 @@ error_code Replica::InitiateDflySync() { sync_block->Wait(); // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) + if (!cntx_.IsRunning()) return cntx_.GetError(); RdbLoader::PerformPostLoad(&service_); @@ -865,7 +865,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); - while (!cntx->IsCancelled()) { + while (cntx->IsRunning()) { auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) break; @@ -893,7 +893,7 @@ void Replica::RedisStreamAcksFb() { std::string ack_cmd; auto next_ack_tp = std::chrono::steady_clock::now(); - while (!cntx_.IsCancelled()) { + while (cntx_.IsRunning()) { VLOG(2) << "Sending an ACK with offset=" << repl_offs_; ack_cmd = absl::StrCat("REPLCONF ACK ", repl_offs_); next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval; @@ -904,7 +904,7 @@ void Replica::RedisStreamAcksFb() { ack_offs_ = repl_offs_; replica_waker_.await_until( - [&]() { return repl_offs_ > ack_offs_ + kAckRecordMaxInterval || cntx_.IsCancelled(); }, + [&]() { return repl_offs_ > ack_offs_ + kAckRecordMaxInterval || (!cntx_.IsRunning()); }, next_ack_tp); } } @@ -919,7 +919,7 @@ void DflyShardReplica::StableSyncDflyAcksFb(ExecutionState* cntx) { auto next_ack_tp = std::chrono::steady_clock::now(); uint64_t current_offset; - while (!cntx->IsCancelled()) { + while (cntx->IsRunning()) { // Handle ACKs with the master. PING opcodes from the master mean we should immediately // answer. current_offset = journal_rec_executed_.load(std::memory_order_relaxed); @@ -937,7 +937,7 @@ void DflyShardReplica::StableSyncDflyAcksFb(ExecutionState* cntx) { [&]() { return journal_rec_executed_.load(std::memory_order_relaxed) > ack_offs_ + kAckRecordMaxInterval || - force_ping_ || cntx->IsCancelled(); + force_ping_ || (!cntx->IsRunning()); }, next_ack_tp); } @@ -961,7 +961,7 @@ DflyShardReplica::~DflyShardReplica() { } void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { - if (cntx->IsCancelled()) { + if (!cntx->IsRunning()) { return; } @@ -982,7 +982,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // and replica recieved all the commands from all shards. multi_shard_data.block->Wait(); // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) + if (!cntx_.IsRunning()) return; VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished"; @@ -990,7 +990,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // Wait until all shards flows get to execution step of this transaction. multi_shard_data.barrier.Wait(); // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) + if (!cntx_.IsRunning()) return; // Global command will be executed only from one flow fiber. This ensure corectness of data in // replica. @@ -1001,7 +1001,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx // executed. multi_shard_data.barrier.Wait(); // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) + if (!cntx_.IsRunning()) return; // Erase from map can be done only after all flow fibers executed the transaction commands. diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 289870105b17..29b86de590bd 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -158,7 +158,7 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { } for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) { - if (cntx_->IsCancelled()) + if (!cntx_->IsRunning()) return; if (!db_array_[db_indx]) @@ -169,7 +169,7 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; do { - if (cntx_->IsCancelled()) { + if (!cntx_->IsRunning()) { return; } @@ -213,7 +213,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; // The replica sends the LSN of the next entry is wants to receive. - while (!cntx_->IsCancelled() && journal->IsLSNInBuffer(lsn)) { + while (cntx_->IsRunning() && journal->IsLSNInBuffer(lsn)) { serializer_->WriteJournalEntry(journal->GetEntry(lsn)); PushSerialized(false); lsn++; From 8dafc983dde3dc444ee8c2c0c19cc906e3e6b71b Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 20 Feb 2025 10:45:37 +0200 Subject: [PATCH 2/2] refactor: address comments --- src/server/cluster/outgoing_slot_migration.cc | 12 +++++------- src/server/common.cc | 6 +++++- src/server/common.h | 5 ++++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 046f258406e1..7854319a487f 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -84,8 +84,6 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } void Cancel() { - // We don't care about errors during cancel - cntx_.SwitchErrorHandler([](auto ge) {}); // Close socket for clean disconnect. CloseSocket(); streamer_.Cancel(); @@ -247,7 +245,7 @@ void OutgoingMigration::SyncFb() { } OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); - if (cntx_.IsError()) { + if (!cntx_.IsRunning()) { continue; } @@ -258,13 +256,13 @@ void OutgoingMigration::SyncFb() { OnAllShards([](auto& migration) { migration->PrepareSync(); }); } - if (cntx_.IsError()) { + if (!cntx_.IsRunning()) { continue; } OnAllShards([](auto& migration) { migration->RunSync(); }); - if (cntx_.IsError()) { + if (!cntx_.IsRunning()) { continue; } @@ -274,7 +272,7 @@ void OutgoingMigration::SyncFb() { VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } - if (cntx_.IsError()) { + if (!cntx_.IsRunning()) { continue; } break; @@ -289,7 +287,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id << " attempt " << attempt; if (attempt > 1) { - if (cntx_.IsError()) { + if (!cntx_.IsRunning()) { return true; } auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; diff --git a/src/server/common.cc b/src/server/common.cc index 5537d45132cc..30b8ee663c9a 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -350,7 +350,7 @@ GenericError ExecutionState::GetError() const { } void ExecutionState::ReportCancelError() { - ReportError(std::make_error_code(errc::operation_canceled), "Context cancelled"); + ReportError(std::make_error_code(errc::operation_canceled), "ExecutionState cancelled"); } void ExecutionState::Reset(ErrHandler handler) { @@ -384,6 +384,10 @@ void ExecutionState::JoinErrorHandler() { } GenericError ExecutionState::ReportErrorInternal(GenericError&& err) { + if (IsCancelled()) { + LOG_IF(INFO, err != errc::operation_canceled) << err.Format(); + return {}; + } lock_guard lk{err_mu_}; if (err_) return err_; diff --git a/src/server/common.h b/src/server/common.h index 96fb21087312..cb811707892f 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -234,6 +234,7 @@ using AggregateGenericError = AggregateValue; // actions When report an error, only the first is stored, the next ones will be ignored. Then a // special error handler is run, if present, and the ExecutionState is ERROR. The error handler is // run in a separate handler to free up the caller. +// If the state is CANCELLED all errors are ignored // // ReportCancelError() reporting an `errc::operation_canceled` error. class ExecutionState { @@ -246,7 +247,8 @@ class ExecutionState { ~ExecutionState(); - // Cancels the context by submitting an `errc::operation_canceled` error. + // Report a cancel error the context by submitting an `errc::operation_canceled` error. + // If the state is CANCELLED does nothing void ReportCancelError(); bool IsRunning() const { @@ -270,6 +272,7 @@ class ExecutionState { // Report an error by submitting arguments for GenericError. // If this is the first error that occured, then the error handler is run // and the context state set to ERROR. + // If the state is CANCELLED does nothing template GenericError ReportError(T... ts) { return ReportErrorInternal(GenericError{std::forward(ts)...}); }