Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify journal and restore streamer cancellation #4549

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ClusterShardMigration {
JournalReader reader{source, 0};
TransactionReader tx_reader;

while (!cntx->IsCancelled()) {
while (cntx->IsRunning()) {
if (pause_) {
ThisFiber::SleepFor(100ms);
continue;
Expand Down Expand Up @@ -126,7 +126,7 @@ class ClusterShardMigration {

private:
void ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) {
if (cntx->IsCancelled()) {
if (!cntx->IsRunning()) {
return;
}
if (!tx_data.IsGlobalCmd()) {
Expand Down
21 changes: 11 additions & 10 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
// We don't care about errors during cancel
cntx_.SwitchErrorHandler([](auto ge) {});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other places in code that we do care about error during cancel?
Should this be the caller to decide if we need error handling after cancel was sent or doesnt it makes more sense that ExecutionState will handle this i.e after cancel there is not switching to error.

// Close socket for clean disconnect.
CloseSocket();
streamer_.Cancel();
Expand Down Expand Up @@ -194,13 +196,12 @@ void OutgoingMigration::SyncFb() {
break;
}

last_error_ = cntx_.GetError();
cntx_.Reset(nullptr);

if (last_error_) {
LOG(ERROR) << last_error_.Format();
if (cntx_.IsError()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change of the semantics here.

Before: we sleep either if operation::canceled or if we reported via error ReportError.

Now: we only sleep if there was an error. In other words, we no longer treat cancellation as error which is a mistake.

Cancellation is error and that's why errno exists with: ECANCELED == errc::operation_canceled.

I would like us to keep those semantics for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have cancel_error as well. I don't see any problems if you want create cancel error you call reportCancelError() if you want just cancel the process you cancel the process

last_error_ = cntx_.GetError();
LOG(ERROR) << last_error_;
ThisFiber::SleepFor(1000ms); // wait some time before next retry
}
cntx_.Reset(nullptr);

VLOG(1) << "Connecting to target node";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
Expand Down Expand Up @@ -246,7 +247,7 @@ void OutgoingMigration::SyncFb() {
}

OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (cntx_.GetError()) {
if (cntx_.IsError()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about cancellation ?
If the flow was cancelled how do we exit here?

continue;
}

Expand All @@ -257,13 +258,13 @@ void OutgoingMigration::SyncFb() {
OnAllShards([](auto& migration) { migration->PrepareSync(); });
}

if (cntx_.GetError()) {
if (cntx_.IsError()) {
continue;
}

OnAllShards([](auto& migration) { migration->RunSync(); });

if (cntx_.GetError()) {
if (cntx_.IsError()) {
continue;
}

Expand All @@ -273,7 +274,7 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
if (cntx_.GetError()) {
if (cntx_.IsError()) {
continue;
}
break;
Expand All @@ -288,7 +289,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
if (attempt > 1) {
if (cntx_.GetError()) {
if (cntx_.IsError()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here what about cancle? how do we exit?

return true;
}
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
Expand Down
8 changes: 2 additions & 6 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ GenericError ExecutionState::GetError() const {
return err_;
}

const Cancellation* ExecutionState::GetCancellation() const {
return this;
}

void ExecutionState::ReportCancelError() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is still being used (in replication and in other places). The issue here is that after this function is called IsError() is true. This is a bug and the correct values should be false and IsCancelled true.

You do not update the state == Cancelled here but this is also not needed. See my top comment regarding the 3 states you introduced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you report an error even if it CancelError the state will be error

ReportError(std::make_error_code(errc::operation_canceled), "Context cancelled");
}
Expand All @@ -363,7 +359,7 @@ void ExecutionState::Reset(ErrHandler handler) {
unique_lock lk{err_mu_};
err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
state_.store(State::RUN, std::memory_order_relaxed);
fb.swap(err_handler_fb_);
lk.unlock();
fb.JoinIfNeeded();
Expand Down Expand Up @@ -402,7 +398,7 @@ GenericError ExecutionState::ReportErrorInternal(GenericError&& err) {
// We can move err_handler_ because it should run at most once.
if (err_handler_)
err_handler_fb_ = fb2::Fiber("report_internal_error", std::move(err_handler_), err_);
Cancellation::Cancel();
state_.store(State::ERROR, std::memory_order_relaxed);
return err_;
}

Expand Down
59 changes: 28 additions & 31 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,6 @@ using AggregateStatus = AggregateValue<facade::OpStatus>;
static_assert(bool(facade::OpStatus::OK) == false,
"Default intitialization should be a falsy OK value");

// Simple wrapper interface around atomic cancellation flag.
struct Cancellation {
Cancellation() : flag_{false} {
}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}

bool IsCancelled() const {
return flag_.load(std::memory_order_relaxed);
}

protected:
std::atomic_bool flag_;
};

// Error wrapper, that stores error_code and optional string message.
class GenericError {
public:
Expand All @@ -246,45 +229,58 @@ class GenericError {
// Thread safe utility to store the first non null generic error.
using AggregateGenericError = AggregateValue<GenericError>;

// ExecutionState is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the ExecutionState is cancelled. The error
// handler is run in a separate handler to free up the caller.
// ExecutionState is a thread-safe utility for managing error reporting and cancellation for complex
// tasks. There are 3 states: RUN, CANCELLED, ERROR RUN and CANCELLED are just a state without any
// actions When report an error, only the first is stored, the next ones will be ignored. Then a
// special error handler is run, if present, and the ExecutionState is ERROR. The error handler is
// run in a separate handler to free up the caller.
//
// ReportCancelError() reporting an `errc::operation_canceled` error.
class ExecutionState : protected Cancellation {
class ExecutionState {
public:
using ErrHandler = std::function<void(const GenericError&)>;

ExecutionState() = default;
ExecutionState(ErrHandler err_handler)
: Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
ExecutionState(ErrHandler err_handler) : err_handler_{std::move(err_handler)} {
}

~ExecutionState();

// Cancels the context by submitting an `errc::operation_canceled` error.
void ReportCancelError();
using Cancellation::IsCancelled;
const Cancellation* GetCancellation() const;

bool IsRunning() const {
return state_.load(std::memory_order_relaxed) == State::RUN;
}

bool IsError() const {
return state_.load(std::memory_order_relaxed) == State::ERROR;
}

bool IsCancelled() const {
return state_.load(std::memory_order_relaxed) == State::CANCELLED;
}

void Cancel() {
state_.store(State::CANCELLED, std::memory_order_relaxed);
}

GenericError GetError() const;

// Report an error by submitting arguments for GenericError.
// If this is the first error that occured, then the error handler is run
// and the context is cancelled.
// and the context state set to ERROR.
template <typename... T> GenericError ReportError(T... ts) {
return ReportErrorInternal(GenericError{std::forward<T>(ts)...});
}

// Wait for error handler to stop, reset error and cancellation flag, assign new error handler.
// Wait for error handler to stop, reset error and state, assign new error handler.
void Reset(ErrHandler handler);

// Atomically replace the error handler if no error is present, and return the
// current stored error. This function can be used to transfer cleanup responsibility safely
//
// Beware, never do this manually in two steps. If you check for cancellation,
// Beware, never do this manually in two steps. If you check the state,
// set the error handler and initialize resources, then the new error handler
// will never run if the context was cancelled between the first two steps.
GenericError SwitchErrorHandler(ErrHandler handler);
Expand All @@ -293,9 +289,10 @@ class ExecutionState : protected Cancellation {
void JoinErrorHandler();

private:
// Report error.
GenericError ReportErrorInternal(GenericError&& err);

enum class State { RUN, CANCELLED, ERROR };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need these states. You want to figure out if:

  1. Operation cancelled
  2. Operation cancelled with an error

Cancelled operation is an error. That's why we have ReportCancelcedError which cancels the context with the error ECANCELED. Therefore you don't need any of the 3 states here. All you have to do is:

  bool IsError() const {
    return ec && !IsCancelled();
  }

  bool IsCancelled() const {
    return ec && err_ == ECANCELED;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your state machine:

RUN == (ec_ == std::error_code{})
CANCELLED == (ec_ == ECANCELED)
ERROR == (ec_ && !CANCELLED) 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use ec_ you need mutex and it's slow. Let's have a call.

std::atomic<State> state_{State::RUN};
GenericError err_;
ErrHandler err_handler_;
util::fb2::Fiber err_handler_fb_;
Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
<< ", expecting " << shard->journal()->GetLsn();
return false;
}
if (replica->cntx.IsCancelled()) {
if (!replica->cntx.IsRunning()) {
return false;
}
VLOG(1) << "Replica lsn:" << flow->last_acked_lsn
Expand Down
33 changes: 17 additions & 16 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx
}

JournalStreamer::~JournalStreamer() {
if (!cntx_->IsCancelled()) {
if (!cntx_->IsError()) {
DCHECK_EQ(in_flight_bytes_, 0u);
}
VLOG(1) << "~JournalStreamer";
Expand Down Expand Up @@ -83,7 +83,7 @@ void JournalStreamer::Cancel() {
VLOG(1) << "JournalStreamer::Cancel";
waker_.notifyAll();
journal_->UnregisterOnChange(journal_cb_id_);
if (!cntx_->IsCancelled()) {
if (!cntx_->IsError()) {
WaitForInflightToComplete();
}
}
Expand Down Expand Up @@ -134,10 +134,12 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
DVLOG(3) << "Completing " << in_flight_bytes_;
in_flight_bytes_ = 0;
pending_buf_.Pop();
if (ec && !IsStopped()) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty() && !IsStopped()) {
AsyncWrite();
if (cntx_->IsRunning()) {
if (ec) {
cntx_->ReportError(ec);
} else if (!pending_buf_.Empty()) {
AsyncWrite();
}
}

// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
Expand All @@ -149,7 +151,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
}

void JournalStreamer::ThrottleIfNeeded() {
if (IsStopped() || !IsStalled())
if (!cntx_->IsRunning() || !IsStalled())
return;

auto next =
Expand All @@ -158,7 +160,7 @@ void JournalStreamer::ThrottleIfNeeded() {
size_t sent_start = total_sent_;

std::cv_status status =
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
waker_.await_until([this]() { return !IsStalled() || !cntx_->IsRunning(); }, next);
if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
Expand Down Expand Up @@ -188,7 +190,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal
}

void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
if (fiber_cancelled_)
if (!cntx_->IsRunning())
return;

VLOG(1) << "RestoreStreamer start";
Expand All @@ -206,16 +208,16 @@ void RestoreStreamer::Run() {
PrimeTable* pt = &db_array_[0]->prime;

do {
if (fiber_cancelled_)
if (!cntx_->IsRunning())
return;
cursor = pt->TraverseBuckets(cursor, [&](PrimeTable::bucket_iterator it) {
if (fiber_cancelled_) // Could be cancelled any time as Traverse may preempt
if (!cntx_->IsRunning()) // Could be cancelled any time as Traverse may preempt
return;

db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);

if (fiber_cancelled_) // Could have been cancelled in above call too
if (!cntx_->IsRunning()) // Could have been cancelled in above call too
return;

std::lock_guard guard(big_value_mu_);
Expand All @@ -231,7 +233,7 @@ void RestoreStreamer::Run() {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor && !fiber_cancelled_);
} while (cursor);

VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
Expand All @@ -252,8 +254,7 @@ void RestoreStreamer::SendFinalize(long attempt) {
writer.Write(entry);
Write(std::move(sink).str());

// TODO: is the intent here to flush everything?
//
// DFLYMIGRATE ACK command has a timeout so we want to send it only when LSN is ready to be sent
ThrottleIfNeeded();
}

Expand All @@ -263,7 +264,7 @@ RestoreStreamer::~RestoreStreamer() {
void RestoreStreamer::Cancel() {
auto sver = snapshot_version_;
snapshot_version_ = 0; // to prevent double cancel in another fiber
fiber_cancelled_ = true;
cntx_->Cancel();
if (sver != 0) {
db_slice_->UnregisterOnChange(sver);
JournalStreamer::Cancel();
Expand Down
13 changes: 2 additions & 11 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class JournalStreamer {
void ThrottleIfNeeded();

virtual bool ShouldWrite(const journal::JournalItem& item) const {
return !IsStopped();
return cntx_->IsRunning();
}

void WaitForInflightToComplete();
Expand All @@ -59,10 +59,6 @@ class JournalStreamer {
void AsyncWrite();
void OnCompletion(std::error_code ec, size_t len);

bool IsStopped() const {
return cntx_->IsCancelled();
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
}

bool IsStalled() const;

journal::Journal* journal_;
Expand Down Expand Up @@ -92,10 +88,6 @@ class RestoreStreamer : public JournalStreamer {

void SendFinalize(long attempt);

bool IsSnapshotFinished() const {
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
return snapshot_finished_;
}

private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
Expand All @@ -122,8 +114,7 @@ class RestoreStreamer : public JournalStreamer {
DbTableArray db_array_;
uint64_t snapshot_version_ = 0;
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;

ThreadLocalMutex big_value_mu_;
Stats stats_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
// The context closes sock_. So if the context error handler has already
// run we must not create a new socket. sock_mu_ syncs between the two
// functions.
if (!cntx->IsCancelled()) {
if (cntx->IsRunning()) {
if (sock_) {
LOG_IF(WARNING, sock_->Close()) << "Error closing socket";
sock_.reset(nullptr);
Expand Down
Loading
Loading