Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add ability to reinit incoming migration object #4756

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder

new_config = new_config->CloneWithChanges(enable_slots, disable_slots);

StartSlotMigrations(new_config->GetNewOutgoingMigrations(ClusterConfig::Current()));
StartNewSlotMigrations(*new_config);

SlotSet before =
ClusterConfig::Current() ? ClusterConfig::Current()->GetOwnedSlots() : SlotSet(true);
Expand Down Expand Up @@ -700,11 +700,23 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* bui
return builder->SendOk();
}

void ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations) {
// Add validating and error processing
for (auto& m : migrations) {
auto outgoing_migration = CreateOutgoingMigration(std::move(m));
outgoing_migration->Start();
void ClusterFamily::StartNewSlotMigrations(const ClusterConfig& new_config) {
// TODO Add validating and error processing
auto out_migrations = new_config.GetNewOutgoingMigrations(ClusterConfig::Current());
auto in_migrations = new_config.GetNewIncomingMigrations(ClusterConfig::Current());

util::fb2::LockGuard lk(migration_mu_);

for (auto& m : out_migrations) {
auto migration = make_shared<OutgoingMigration>(std::move(m), this, server_family_);
outgoing_migration_jobs_.emplace_back(migration);
migration->Start();
}

for (auto& m : in_migrations) {
auto migration = make_shared<IncomingSlotMigration>(m.node_info.id, &server_family_->service(),
m.slot_ranges);
incoming_migrations_jobs_.emplace_back(migration);
}
}

Expand Down Expand Up @@ -900,35 +912,39 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {

SlotRanges slot_ranges(std::move(slots));

const auto& incoming_migrations = ClusterConfig::Current()->GetIncomingMigrations();
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
[source_id = source_id, &slot_ranges](const MigrationInfo& info) {
return info.node_info.id == source_id && info.slot_ranges == slot_ranges;
std::shared_ptr<IncomingSlotMigration> migration;
{
util::fb2::LockGuard lk(migration_mu_);

auto it = find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[source_id = source_id, &slot_ranges](const auto& migration) {
return migration->GetSourceID() == source_id &&
migration->GetSlots() == slot_ranges;
});
if (!found) {

if (it != incoming_migrations_jobs_.end()) {
migration = *it;
}
}

if (!migration) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
}

VLOG(1) << "Init migration " << source_id;

util::fb2::LockGuard lk(migration_mu_);
auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id);
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
if (migration->GetState() != MigrationState::C_CONNECTING) {
migration->Stop();
auto slots = migration->GetSlots();
LOG(INFO) << "Flushing slots during migration reinitialization " << migration->GetSourceID()
<< ", slots: " << slots.ToString();
DeleteSlots(slots);
}

incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
string(source_id), &server_family_->service(), std::move(slot_ranges), flows_num));
migration->Init(flows_num);

return builder->SendOk();
}

std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(MigrationInfo info) {
util::fb2::LockGuard lk(migration_mu_);
auto migration = make_shared<OutgoingMigration>(std::move(info), this, server_family_);
outgoing_migration_jobs_.emplace_back(migration);
return migration;
}

void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
CmdArgParser parser{args};
Expand Down
6 changes: 1 addition & 5 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ClusterFamily {
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id)
ABSL_LOCKS_EXCLUDED(migration_mu_);

void StartSlotMigrations(std::vector<MigrationInfo> migrations);
void StartNewSlotMigrations(const ClusterConfig& new_config);

// must be destroyed excluded set_config_mu and migration_mu_ locks
struct PreparedToRemoveOutgoingMigrations {
Expand All @@ -105,10 +105,6 @@ class ClusterFamily {
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations)
ABSL_LOCKS_EXCLUDED(migration_mu_);

// store info about migration and create unique session id
std::shared_ptr<OutgoingMigration> CreateOutgoingMigration(MigrationInfo info)
ABSL_LOCKS_EXCLUDED(migration_mu_);

mutable util::fb2::Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress.
std::vector<std::shared_ptr<IncomingSlotMigration>> incoming_migrations_jobs_
Expand Down
41 changes: 24 additions & 17 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,8 @@ class ClusterShardMigration {
atomic_bool pause_ = false;
};

IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
uint32_t shards_num)
: source_id_(std::move(source_id)),
service_(*se),
slots_(std::move(slots)),
state_(MigrationState::C_CONNECTING),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that in this new flow the state of the incoming migration once you create the object but do not establish connection yet between source and target should be PRE_CONNECT or something similar that will help us understand the state of migration if we run dflycluster migration status command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss migration states separately. I don't see how it can help us, but anycase, I think we need to make such changes for outgoing and incoming migration together so I will better create another PR.

bc_(shards_num) {
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this, bc_));
}
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots)
: source_id_(std::move(source_id)), service_(*se), slots_(std::move(slots)), bc_(0) {
}

IncomingSlotMigration::~IncomingSlotMigration() {
Expand Down Expand Up @@ -203,7 +194,8 @@ bool IncomingSlotMigration::Join(long attempt) {
auto wait_res = bc_->WaitFor(wait_time);
if (is_attempt_correct) {
if (wait_res) {
state_.store(MigrationState::C_FINISHED);
util::fb2::LockGuard lk(state_mu_);
state_ = MigrationState::C_FINISHED;
keys_number_ = cluster::GetKeyCount(slots_);
} else {
LOG(WARNING) << "Can't join migration because of data after LSN for " << source_id_;
Expand All @@ -215,7 +207,8 @@ bool IncomingSlotMigration::Join(long attempt) {
}

void IncomingSlotMigration::Stop() {
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
util::fb2::LockGuard lk(state_mu_);
string_view log_state = state_ == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also when the migration is stopped can we have some state that will show that the migration is stopped otherwise we will show the last state - sync/connecting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to. Adding additional states makes the code more complex; debugging commands should not add complexity.

LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString();
cntx_.Cancel();

Expand Down Expand Up @@ -244,17 +237,31 @@ void IncomingSlotMigration::Stop() {
}
}

void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
state_.store(MigrationState::C_SYNC);
void IncomingSlotMigration::Init(uint32_t shards_num) {
util::fb2::LockGuard lk(state_mu_);
cntx_.Reset(nullptr);
state_ = MigrationState::C_SYNC;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one should be connecting and start flow should be sync


bc_ = BlockingCounter(shards_num);
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this, bc_));
}
}

void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we protect startflow running at the same time with Stop that can run from cluster config command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterShardMigration::Start and ClusterShardMigration::Cancel methods are synchronized via mutex, so it's safe to call them simultaneously. If the start was called before Cancel, it's normal flow, if Cancel was called before Start(), the latest just do return

shard_flows_[shard]->Start(&cntx_, source);
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
}

size_t IncomingSlotMigration::GetKeyCount() const {
if (state_.load() == MigrationState::C_FINISHED) {
return keys_number_;
{
util::fb2::LockGuard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) {
return keys_number_;
}
}

return cluster::GetKeyCount(slots_);
}

Expand Down
12 changes: 9 additions & 3 deletions src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ClusterShardMigration;
// manage migration process state and data
class IncomingSlotMigration {
public:
IncomingSlotMigration(std::string source_id, Service* se, SlotRanges slots, uint32_t shards_num);
IncomingSlotMigration(std::string source_id, Service* se, SlotRanges slots);
~IncomingSlotMigration();

// process data from FDLYMIGRATE FLOW cmd
Expand All @@ -34,8 +34,12 @@ class IncomingSlotMigration {
// Stop and join the migration, can be called even after migration is finished
void Stop();

// Init/Reinit migration
void Init(uint32_t shards_num);

MigrationState GetState() const {
return state_.load();
util::fb2::LockGuard lk(state_mu_);
return state_;
}

const SlotRanges& GetSlots() const {
Expand Down Expand Up @@ -70,12 +74,14 @@ class IncomingSlotMigration {
Service& service_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
std::atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
ExecutionState cntx_;
mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
std::atomic<size_t> errors_count_ = 0;

mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_CONNECTING;

// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "server/transaction.h"

namespace dfly {
class DbSlice;

class ServerFamily;

namespace journal {
Expand Down
Loading