Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PacificA data replication consistency scheme #2994

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from

Conversation

buzhimingyonghu
Copy link
Contributor

@buzhimingyonghu buzhimingyonghu commented Jan 13, 2025

PacificA 协议概述

PacificA 协议简单来说分为两部分:

  1. 数据复制
  2. 配置管理

由于在 Pika 中,配置管理主要由 pika_sentinel 负责,本文主要关注通过主从模式的数据复制及其与 pika_sentinel 配合的协调过程。

在 Pika 中的应用

主要分为三个部分:

  1. PacificA 中主从模式的数据一致流程处理
  2. 分布式日志型存储系统的设计
  3. 故障恢复后的协调状态

启动 PacificA

在 Pika 中,建立普通主从连接的命令为:

slaveof <ip> <port>

如果需要启动 PacificA 协议,需要增加 strong 参数:

slaveof <ip> <port> strong

当从节点执行上述命令时,会触发 slaveofcmd,读取相关参数,并由 pika_server 保存这些信息,随后异步交由 PikaAuxiliaryThread 线程(以下简称 PAT)处理。
PAT 是 PacificA 协议中的核心辅助线程,负责:

- 状态机状态切换
- 主从之间的心跳发送及超时检查
- 主从之间的同步任务

PacificA 主从模式的数据一致流程

主从建立连接的四个阶段

  1. MetaSync:主从元数据的同步和检查
  2. TrySync:判断数据完整性,选择全量同步或增量同步
  3. Candidate:从节点作为候选者,追加完整的准备列表
  4. BinlogSync:正式加入集群,开始进行数据复制

image
下面是基本的数据结构:
image

MetaSync 阶段

image

从节点的 PAT 线程通过发送 MetaReq 请求与主节点建立连接,其中包含 is_consistency 字段,表示强一致性请求。
主节点收到请求后,若 consistency 标记为 true,则会:

  1. 设置所有数据库的 consistency 标记
  2. 初始化上下文
  3. 判断是否需要进入协调状态

image

随后,从节点收到主节点返回的 MetaSyncRes,并执行以下操作:

  1. 比较本地和主节点的数据库结构 (db_structs) 是否一致

  2. 如果本地 replication_id 与主节点不一致,且本地 replication_id 为空,则执行全量同步;否则进行增量同步

  3. 根据同步类型更新从节点的状态:

    • 全量同步:设置状态为 kTryDBSync
    • 增量同步:设置状态为 kTryConnect

TrySync 阶段

image

全量同步完成后,从节点更新自身的 committedID 和 preparedID,并发送 TrySyncReq 请求,携带 committedID 确认日志一致性。
主节点验证后,返回包含主节点 preparedID 的 TrySyncRes,从节点将 preparedID 与主节点对齐,完成增量同步。

流程总结:

  1. 从节点发送 TrySyncReq,带有 committedID。

  2. 主节点检查 committedID:

    • 若主节点 committedID 大于从节点,表示同步正常。
    • 若从节点 committedID 大于主节点,表示选主失败。
  3. 主节点返回 TrySyncRes,包含主节点的 preparedID,从节点需对齐。

BinlogSync 阶段

image

主节点收到从节点的第一次 binlog 请求后,将从节点设置为候选者状态,并追加日志。
主节点通过心跳包和 binlog 数据通知从节点,将日志分阶段写入本地:

  • 从节点收到 binlogSync 后,先写入 binlog,等待主节点通知哪些请求已提交。
  • 主节点收到所有从节点确认后,将请求标记为已提交,更新提交点,确保与所有从节点保持一致。

分布式日志型存储系统的设计

PacificA 中采用逻辑复制的方式,具体包括:

  1. 状态的一致性:所有副本逻辑上保持相同的状态,并可处理相同类型的更新和查询。
  2. 日志记录:系统在接收到更新请求时,首先将其写入日志中,确保即使系统故障也能通过日志恢复数据。
  3. 内存数据结构更新:日志记录完成后,将更新应用到内存中的数据结构中。
  4. 定期创建检查点:防止内存溢出,定期将数据快照保存到磁盘,形成持久化检查点。
  5. 日志截断:检查点创建后,删除已存储到检查点的日志,优化存储需求。

image

故障恢复后的协调状态

image

初始状态

  • A 是主节点,B、C 和 D 是副本节点。
  • committedB 是 committedA 的子集,committedA 是所有副本 prepared 的子集。

故障恢复

当主节点 A 故障时:

  1. 系统重新配置,将 B 提升为新主节点。
  2. B 完成协调后,新的 committedB 与旧的 preparedB 保持一致,所有副本的 preparedID 与新主节点的 preparedID 对齐。

一次写请求的操作流程

  1. 写 binlog:等待所有从节点追加日志后,执行写入数据库请求。
  2. 主节点处理 binlog 请求:
    • 若非一致性模式,按传统主从复制执行。
    • 若为一致性模式:
  3. coordinator_ 追加日志并记录 offset。
  4. 等待从节点同步,更新主节点的 committedID。
  5. 若同步失败(超时 10 秒),记录信息并退出。

image

Summary by CodeRabbit

Release Notes

  • New Features

    • Added support for strong consistency mode during replication.
    • Introduced new log management and coordination mechanisms for improved synchronization.
    • Enhanced binlog and offset tracking during replication.
    • New methods for managing log offsets and committed/prepared transaction states.
    • Added new fields in protocol buffer definitions to support consistency metadata.
  • Improvements

    • Refined error handling for consistency and synchronization scenarios.
    • Added more granular control over replication state and log management.
    • Improved tracking of server roles and consistency states.
  • Technical Enhancements

    • Extended protocol buffer definitions to support consistency metadata.
    • Implemented more robust consensus and coordination processes.
    • Enhanced the handling of synchronization responses and prepared IDs during replication.
  • Compatibility

    • New optional parameters added to existing methods.
    • Backward compatible with previous replication mechanisms.

Copy link

coderabbitai bot commented Jan 13, 2025

Walkthrough

The pull request introduces comprehensive enhancements to Pika's replication and consensus mechanisms, focusing on adding consistency management capabilities. The changes span multiple files and classes, introducing new methods and variables to support strong consistency, log management, and synchronization processes. Key additions include consistency flags, log offset tracking, new methods for managing log entries, and improved error handling for replication scenarios. The modifications aim to provide more robust and flexible replication control across the distributed database system.

Changes

File Change Summary
include/pika_admin.h Added is_consistency_cmd_ member variable to SlaveofCmd
include/pika_binlog.h Updated Put method signatures with log offset and consistency parameters
include/pika_command.h Added kConsistencyTimeout enum value
include/pika_consensus.h Added new Log class, multiple methods in ConsensusCoordinator for log and consistency management
include/pika_define.h Added KCandidate to SlaveState enum, new WriteTask constructor
include/pika_rm.h Added multiple methods to SyncMasterDB for log and consistency management
include/pika_server.h Added last_role_, is_consistency_ variables and related methods
include/pika_slave_node.h Updated log offset initializations, added target_offset
src/pika_admin.cc Updated DoInitial and SetMaster methods to support "strong" consistency flag
src/pika_binlog.cc Implemented new Put method and updated existing Put method for consistency
src/pika_command.cc Modified InternalProcessCommand for consistency checks in command processing
src/pika_consensus.cc Added methods for consistency management and log handling in ConsensusCoordinator
src/pika_db.cc Updated master offset handling to set prepared and committed IDs
src/pika_inner_message.proto Enhanced InnerRequest and InnerResponse messages with new fields for consistency and IDs
src/pika_repl_bgworker.cc Added acknowledgment handling based on consistency checks
src/pika_repl_client.cc Enhanced SendMetaSync and SendTrySync methods for consistency handling
src/pika_repl_client_conn.cc Improved HandleTrySyncResponse for managing prepared IDs
src/pika_repl_server.cc Updated BuildBinlogSyncResp to include committed ID in consistency state
src/pika_repl_server_conn.cc Enhanced methods to manage consistency state during synchronization
src/pika_rm.cc Added methods for consistency checks and log management in SyncMasterDB
src/pika_server.cc Updated methods to track server role and consistency state

Sequence Diagram

sequenceDiagram
    participant Client
    participant PikaServer
    participant ConsensusCoordinator
    participant SyncMasterDB
    participant Binlog

    Client->>PikaServer: Send Command with "strong" flag
    PikaServer->>ConsensusCoordinator: Set Consistency Mode
    ConsensusCoordinator->>SyncMasterDB: Initialize Context
    SyncMasterDB->>Binlog: Append Log Entry
    Binlog-->>SyncMasterDB: Log Offset
    SyncMasterDB->>ConsensusCoordinator: Update Committed ID
    ConsensusCoordinator-->>PikaServer: Confirm Consistency
    PikaServer-->>Client: Command Response
Loading

Poem

🐰 In the realm of distributed delight,
Where rabbits sync with all their might,
Consistency now takes the stage,
A stronger dance on replication's page!
Log by log, we hop with grace 🌟

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the ✏️ Feature New feature or request label Jan 13, 2025
@buzhimingyonghu
Copy link
Contributor Author

buzhimingyonghu commented Jan 13, 2025

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

🔭 Outside diff range comments (1)
src/pika_command.cc (1)

Line range hint 860-885: Inconsistent method calls after updating InternalProcessCommand signature

The method InternalProcessCommand now requires a bool is_consistency parameter, but the call in ProcessCommand does not include it, leading to a mismatch and potential compilation errors.

Apply this diff to update the method call:

 void Cmd::ProcessCommand(const HintKeys& hint_keys) {
   if (stage_ == kNone) {
-    InternalProcessCommand(hint_keys);
+    InternalProcessCommand(hint_keys, g_pika_server->IsConsistency());
   } else {
     if (stage_ == kBinlogStage) {
       DoBinlog();
     } else if (stage_ == kExecuteStage) {
       DoCommand(hint_keys);
     }
   }
 }

Ensure that g_pika_server->IsConsistency() correctly provides the consistency flag.

🧹 Nitpick comments (23)
src/pika_repl_bgworker.cc (2)

91-92: Initialize ack_end to avoid potential undefined behavior.

The variable ack_end is declared but not initialized. While it may be assigned later in the code, it's good practice to initialize variables upon declaration to prevent potential undefined behavior.

Apply this diff to initialize ack_end:

 LogOffset ack_start;
+ LogOffset ack_end = LogOffset();

214-218: Handle error codes consistently in function returns.

The function HandleWriteBinlog returns an int, but in the null check added, a -1 is returned. Ensure that all error codes are handled consistently according to the function's expected return values.

Apply this diff to return the appropriate error code:

 if (!db) {
   LOG(WARNING) << worker->db_name_ << " Not found.";
-  return -1;
+  return -2;  // Use a consistent error code as per project conventions
 }

Ensure that -2 (or the chosen error code) is documented and handled appropriately by the caller.

include/pika_consensus.h (2)

28-28: Prefer constructor initialization lists for member variables.

Initializing applied_index_ within the class definition is acceptable, but it's better practice to use the constructor's initialization list for consistent initialization, especially if the class grows in complexity.

Apply this diff to move the initialization to the constructor:

- LogOffset applied_index_ = LogOffset();
+ LogOffset applied_index_;
...
+ Context(std::string path)
+     : path_(std::move(path)), applied_index_(LogOffset()) {}

252-296: Ensure consistent naming conventions for member variables and methods.

Some member variables use a trailing underscore (e.g., is_consistency_), while others don't (e.g., prepared_id__rwlock_). Additionally, prepared_id__rwlock_ has a double underscore. For readability and maintainability, adhere to a consistent naming convention.

Apply this diff to correct the variable names:

- std::shared_mutex prepared_id__rwlock_;
+ std::shared_mutex prepared_id_rwlock_;

Also, review other member variables and methods for consistent naming.

src/pika_binlog.cc (2)

187-196: Possible redundant call to GetProducerStatus.

The method GetProducerStatus is called inside both Binlog::Put methods. Ensure that this is necessary and consider avoiding redundant calls to improve performance.

If GetProducerStatus is called earlier or its results are already known, consider removing the redundant call.


Line range hint 235-284: Improve error handling and code readability in Binlog::Put.

The method mixes logging, status checks, and business logic, which can make it hard to read and maintain. Consider refactoring the method to separate concerns and improve error handling.

Extract sections into helper methods or adjust the structure for clarity.

src/pika_repl_server_conn.cc (2)

40-42: Avoid redundant slave size check and potential inconsistency.

The variable slave_size is set but not used effectively in the subsequent code. Additionally, relying on slave_size at this point may not reflect the actual state due to concurrency.

Consider removing slave_size or using it appropriately to enhance code clarity:

- int slave_size = g_pika_server->slave_size();

If necessary, ensure thread-safe access to shared data.


234-252: Ensure consistent error handling and prevent code duplication.

The code within the consistency check duplicates logic that exists later in the method for seeking the binlog reader and handling errors.

Refactor the code to eliminate duplication:

  • Merge the consistency-specific logic with the existing binlog seek operation.
  • Consolidate error handling to a single block.

This enhances maintainability and reduces the risk of inconsistency between different code paths.

src/pika_consensus.cc (5)

343-344: Remove redundant log statements or refine log levels.

The log statements at lines 343-344 might be redundant or could be combined for clarity.

Consider refining the logging:

LOG(WARNING) << DBInfo(db_name_).ToString()
             << "Drop log from leader logic_id " << attribute.logic_id()
             << ", current last index " << last_index.l_offset.index;

This provides the same information in a single, concise log entry.


597-598: Avoid logging sensitive information.

The log statements at lines 597-598 include potentially sensitive details that might not be necessary for log output.

Review the logging content and consider reducing verbosity or masking sensitive data:

// Example: Log only essential information
LOG(INFO) << DBInfo(db_name_).ToString() << "Searching binlog with hint offset";

This enhances security and complies with best practices for logging.


646-647: Correct typographical error in log message.

There is a typo in the log message: "GetBInlogOffset res" should be "GetBinlogOffset res".

Correct the typo for clarity:

- LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString() << " possible_offset "
+ LOG(INFO) << DBInfo(db_name_).ToString() << "GetBinlogOffset res: " << s.ToString() << " possible_offset "

795-806: Ensure consistent naming conventions for methods.

The methods SetIsConsistency and GetISConsistency have inconsistent naming, with one using Is and the other using IS.

Standardize the method names:

- void SetIsConsistency(bool is_consistency);
- bool GetISConsistency();
+ void SetIsConsistency(bool is_consistency);
+ bool GetIsConsistency();

This improves code readability and maintains consistent naming conventions.


815-837: Avoid potential memory leaks with binlog strings.

In the PersistAppendBinlog method, the binlog string is passed by value, which could be inefficient for large strings.

Pass binlog by reference to avoid unnecessary copying:

Status s = stable_logger_->Logger()->Put(content, &offset, &binlog);

This improves performance by preventing unnecessary memory allocations.

src/pika_rm.cc (1)

488-495: Potential busy-wait loop in ConsensusProposeLog

In the ConsensusProposeLog method, the loop may cause high CPU usage due to frequent checks without adequate waiting. Consider using a condition variable or increasing the sleep duration to prevent a busy-wait.

Apply this diff to improve the loop:

 while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
   // Check if consensus has been achieved for the given log offset
   if (checkFinished(offset)) {
     return Status::OK(); 
   }
-  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  std::this_thread::sleep_for(std::chrono::milliseconds(200));
 }
src/pika_command.cc (1)

972-981: Improper formatting and missing braces in DoBinlog

In the DoBinlog method, the if-else statements lack braces {} around their code blocks, which can lead to errors or misunderstandings during maintenance.

Apply this diff to add braces and improve readability:

 if (g_pika_server->IsConsistency() && s.IsTimeout()) {
+  res().SetRes(CmdRes::kConsistencyTimeout, "Timeout waiting for consistency");
+  LOG(WARNING) << sync_db_->SyncDBInfo().ToString()
+               << " Slave node consistency timeout: " << s.ToString();
 } else {
+  LOG(WARNING) << sync_db_->SyncDBInfo().ToString()
+               << " Writing binlog failed, maybe no space left on device: " << s.ToString();
+  res().SetRes(CmdRes::kErrOther, s.ToString());
 }
src/pika_repl_server.cc (1)

25-25: Consider adding cleanup logic in destructor.

While the simplified destructor logs the exit, consider adding explicit cleanup for any resources that might need proper termination.

include/pika_rm.h (1)

216-216: Consider adding documentation for BuildBinlogOffset.

While the implementation looks correct, adding documentation would help explain the role of this method in the replication process.

src/pika_repl_client_conn.cc (1)

213-214: Consider enhancing log messages for better debugging.

While the logging is helpful, consider adding more context like node IDs or connection details to help with distributed debugging.

-  LOG(INFO)<<"PacificA master TrySync Response master_prepared_id: "<<master_prepared_id.ToString();
-  LOG(INFO)<<"PacificA slave cur_prepared_id: "<<db->GetPreparedId().ToString();
+  LOG(INFO)<<"PacificA [node="<<g_pika_server->host()<<":"<<g_pika_server->port()<<"] master TrySync Response master_prepared_id: "<<master_prepared_id.ToString();
+  LOG(INFO)<<"PacificA [node="<<g_pika_server->host()<<":"<<g_pika_server->port()<<"] slave cur_prepared_id: "<<db->GetPreparedId().ToString();
include/pika_server.h (2)

156-159: Consider thread safety implications.

While the method correctly uses a lock guard, consider documenting the thread safety guarantees in the method comment.

+  /**
+   * @brief Get the current number of connected slaves
+   * @thread_safety Thread-safe through slave_mutex_ protection
+   * @return The number of connected slaves
+   */
   int slave_size(){
     std::lock_guard l(slave_mutex_);
     return slaves_.size();
   }

581-581: Consider initialization in constructor.

The is_consistency_ member should be explicitly initialized in the constructor to maintain consistency with other member initializations.

src/pika_db.cc (1)

529-532: LGTM: Proper consistency state update after DB sync.

The code correctly:

  1. Creates a master offset combining binlog and logic offsets
  2. Updates both prepared and committed IDs
  3. Logs the state change for debugging

Consider enhancing the log message to include the DB name for better context in multi-DB setups.

-  LOG(INFO)<<"PacificA write DB finished slave_comitted: "<<master_db->GetCommittedId().ToString()<<" prepared: "<<master_db->GetPreparedId().ToString();
+  LOG(INFO)<<"PacificA write DB finished [db="<<db_name_<<"] slave_comitted: "<<master_db->GetCommittedId().ToString()<<" prepared: "<<master_db->GetPreparedId().ToString();
include/pika_command.h (1)

592-592: Consider documenting the consistency parameter.

While the signature change is correct, consider adding documentation for the is_consistency parameter to clarify its usage.

-  void InternalProcessCommand(const HintKeys& hint_key,bool is_consistency = false);
+  /**
+   * @brief Process a command internally
+   * @param hint_key The hint keys for command routing
+   * @param is_consistency If true, ensures strong consistency through PacificA protocol
+   */
+  void InternalProcessCommand(const HintKeys& hint_key, bool is_consistency = false);
src/pika_inner_message.proto (1)

152-152: Add committed_id to BinlogSync response

The committed_id field is added to track committed transaction states in the BinlogSync response.

Consider documenting the relationship between prepared_id and committed_id in comments, as they form part of the transaction state management system.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0704c3 and 21e1e9e.

📒 Files selected for processing (23)
  • .gitignore (0 hunks)
  • conf/pika.conf (8 hunks)
  • include/pika_admin.h (1 hunks)
  • include/pika_binlog.h (2 hunks)
  • include/pika_command.h (3 hunks)
  • include/pika_consensus.h (6 hunks)
  • include/pika_define.h (2 hunks)
  • include/pika_rm.h (3 hunks)
  • include/pika_server.h (5 hunks)
  • include/pika_slave_node.h (1 hunks)
  • src/pika_admin.cc (4 hunks)
  • src/pika_binlog.cc (3 hunks)
  • src/pika_command.cc (3 hunks)
  • src/pika_consensus.cc (15 hunks)
  • src/pika_db.cc (1 hunks)
  • src/pika_inner_message.proto (3 hunks)
  • src/pika_repl_bgworker.cc (3 hunks)
  • src/pika_repl_client.cc (3 hunks)
  • src/pika_repl_client_conn.cc (3 hunks)
  • src/pika_repl_server.cc (2 hunks)
  • src/pika_repl_server_conn.cc (7 hunks)
  • src/pika_rm.cc (7 hunks)
  • src/pika_server.cc (6 hunks)
💤 Files with no reviewable changes (1)
  • .gitignore
✅ Files skipped from review due to trivial changes (1)
  • conf/pika.conf
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_centos
🔇 Additional comments (27)
include/pika_consensus.h (1)

67-70: ⚠️ Potential issue

Thread-safety concern with match_index_ updates.

The AddMatchIndex method updates the match_index_ without a thread-safe guarantee if other methods access it concurrently. Ensure that access to match_index_ is properly synchronized.

Consider adding synchronization mechanisms when accessing match_index_:

 void AddMatchIndex(const std::string& ip, int port, const LogOffset& offset) {
-  std::lock_guard l(rwlock_);
+  std::unique_lock l(rwlock_);
   match_index_[ip + std::to_string(port)] = offset;
 }

Ensure that all accesses to match_index_ are protected by the same mutex.

Likely invalid or redundant comment.

src/pika_binlog.cc (1)

173-202: Check for binlog I/O errors consistently.

In the new Put method, if the Put operation fails, binlog_io_error_ is set to true. Ensure that this error state is handled appropriately elsewhere in the code to prevent unexpected behavior.

Please verify that binlog_io_error_ is checked after calling this method and that appropriate actions are taken when it's true.

src/pika_repl_server_conn.cc (1)

399-403: Adjust logic for slave binlog synchronization based on consistency state.

In the HandleBinlogSyncRequest method, when is_first_send is true, the code conditionally calls AppendCandidateBinlog or ActivateSlaveBinlogSync based on master_db->GetISConsistency(). Ensure that both methods handle the slave's synchronization state appropriately.

Confirm that AppendCandidateBinlog and ActivateSlaveBinlogSync correctly manage the slave's state and that there are no side effects when switching between consistency modes.

If needed, review the implementations:

This helps ensure that the correct synchronization process is followed.

✅ Verification successful

Verified: Slave binlog synchronization logic is correctly implemented

The implementations properly handle slave states:

  • AppendCandidateBinlog safely activates sync only when offset criteria are met
  • ActivateSlaveBinlogSync directly activates sync and updates offsets
  • Both methods maintain consistency guarantees appropriate for their respective modes
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

# Check the implementations of the methods
rg 'AppendCandidateBinlog' --files-with-matches -t cpp
rg 'ActivateSlaveBinlogSync' --files-with-matches -t cpp

Length of output: 306


Script:

#!/bin/bash
# Check the implementation of both methods in pika_rm.cc
echo "=== AppendCandidateBinlog implementation ==="
rg -A 10 "AppendCandidateBinlog.*\(" src/pika_rm.cc

echo -e "\n=== ActivateSlaveBinlogSync implementation ==="
rg -A 10 "ActivateSlaveBinlogSync.*\(" src/pika_rm.cc

Length of output: 1173

src/pika_consensus.cc (2)

872-916: Clarify loop logic in CommitAppLog method.

In the CommitAppLog method, the loop uses log_size captured before the loop. If logs_->TruncateFrom modifies logs_, this could lead to inconsistency.

Ensure that truncating logs within the loop does not affect iteration. Consider capturing logs_->Size() after potential modifications, or restructure the loop to account for dynamic size changes.

Additionally, verify that ApplyBinlog is correctly applying the logs and that SetCommittedId updates the state as intended.


472-472: Handle potential race conditions when truncating logs.

In the TruncateTo method, the code performs operations that might be affected by concurrent modifications to the logs. Ensure that necessary locks are held during truncation.

Check if adequate synchronization is in place to prevent race conditions:

// Ensure that stable_logger_ and mem_logger_ are properly locked
stable_logger_->Logger()->Lock();
// Truncate operations...
stable_logger_->Logger()->Unlock();

This helps maintain data integrity during truncation.

include/pika_binlog.h (2)

55-55: LGTM: New Put method signature enhances consistency tracking.

The new overload with LogOffset tracking and binlog output parameter aligns well with PacificA's requirements for strong consistency in replication.


82-82: Verify error handling for consistency-enabled operations.

The modified Put method adds consistency support, but we should ensure proper error handling when consistency checks fail.

Run this script to check error handling patterns:

✅ Verification successful

Error handling is properly implemented for consistency operations

The review revealed that the is_consistency parameter is used for metadata tracking (updating cur_logoffset) rather than validation. The method already implements comprehensive error handling that covers all operations, including proper propagation of errors through Status returns from file operations and produce calls.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for error handling patterns in consistency-related code
rg -A 3 "is_consistency.*true" 
rg -A 3 "Status::.*Error.*consistency"

Length of output: 282


Script:

#!/bin/bash
# Find Put method implementation
ast-grep --pattern 'Status Put(const char* $_, int $_, LogOffset* $_ = nullptr, bool $_ = false) {
  $$$
}'

# Search for any consistency-related error handling in the implementation file
rg -B 2 -A 5 "Put.*is_consistency" src/pika_binlog.cc

Length of output: 412


Script:

#!/bin/bash
# Get the complete Put method implementation with more context
rg -B 2 -A 30 "Status Binlog::Put.*is_consistency" src/pika_binlog.cc

Length of output: 1005


Script:

#!/bin/bash
# Get the complete Put method implementation with much more context
rg -B 2 -A 50 "Status Binlog::Put.*is_consistency" src/pika_binlog.cc

Length of output: 1513

src/pika_repl_server.cc (1)

104-105: LGTM: Proper handling of committed_id in sync response.

The addition of committed_id handling in BuildBinlogSyncResp is essential for maintaining consistency across replicas in the PacificA protocol.

include/pika_rm.h (2)

102-114: LGTM: Comprehensive implementation of PacificA protocol methods.

The added methods provide a complete implementation of PacificA's consistency requirements:

  • Log management through PreparedId and CommittedId
  • Consistency verification through checkFinished
  • Coordination handling through ProcessCoordination

96-101: Ensure thread safety for context initialization.

The InitContext method modifies coordinator state. Verify thread safety mechanisms are in place.

Run this script to check synchronization patterns:

✅ Verification successful

Thread safety is properly implemented for context initialization

The ConsensusCoordinator class already implements comprehensive thread safety using multiple shared mutexes to protect its internal state. The InitContext() implementation follows a simple delegation pattern that leverages these existing synchronization mechanisms.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for mutex/lock usage around context initialization
rg -B 5 "InitContext.*\{" 
ast-grep --pattern 'lock_guard|unique_lock|shared_lock'

Length of output: 594


Script:

#!/bin/bash
# Check ConsensusCoordinator implementation and usage
rg -B 3 -A 3 "class ConsensusCoordinator"
rg "mutex|atomic" include/pika_consensus.h
rg "InitContext.*\{" -A 10 "src/*.cc"

Length of output: 752


Script:

#!/bin/bash
# Find implementation files and check InitContext
fd -e cc -e cpp
rg -B 2 -A 10 "InitContext.*\{" 

Length of output: 9003

include/pika_define.h (1)

278-283: LGTM: Enhanced WriteTask with consistency tracking.

The addition of committed_id and its constructor properly integrates with PacificA's consistency requirements.

src/pika_repl_client_conn.cc (3)

203-204: LGTM: Proper extraction and conversion of prepared_id.

The code correctly extracts the prepared_id from try_sync_response and creates a LogOffset object combining both binlog and logic offsets.


229-233: LGTM: Proper handling of consistency mode.

The code correctly handles the offset calculation based on consistency mode.


238-247: Verify error handling in consistency check.

The consistency check logic is critical for data integrity. While the implementation looks correct, consider:

  1. The error state might need more detailed logging
  2. The comment on line 245 is in Chinese and should be translated to English
   if(master_prepared_id<db->GetPreparedId()){
     if(master_prepared_id<db->GetCommittedId()){
       slave_db->SetReplState(ReplState::kError);
-      LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
+      LOG(WARNING) << "DB: " << db_name << " Consistency violation: master's committed ID (" 
+                  << master_prepared_id.ToString() << ") is less than slave's committed ID ("
+                  << db->GetCommittedId().ToString() << ")";
       return;
     }
     db->SetPreparedId(master_prepared_id);
-    // 向主的preparedid看齐,多余的裁剪掉
+    // Align with master's prepared ID by truncating excess data
     db->Truncate(master_prepared_id);
   }
✅ Verification successful

The review suggestions are valid and align with codebase patterns.

  1. The enhanced error logging provides better debugging context by including both master and slave IDs, consistent with error logging patterns in the codebase.
  2. The Chinese comment translation is accurate and maintains the original meaning while improving code readability.
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Check for ToString() method implementation
ast-grep --pattern 'ToString() {
  $$$
}'

# Look for similar logging patterns
rg "LOG\(WARNING\)" -A 2

# Look for Chinese comments
rg -P "[\p{Han}]+" --color always

# Find similar consistency checks
rg "master.*prepared.*id" -A 3

Length of output: 67323

include/pika_server.h (2)

93-93: LGTM: Role tracking enhancement.

The addition of last_role() method and last_role_ member variable properly tracks the server's previous role, which is essential for role transition management in the PacificA protocol.

Also applies to: 583-583


171-171: LGTM: Consistency control methods.

The addition of consistency-related methods and parameters aligns with the PacificA protocol requirements.

Also applies to: 184-185

include/pika_command.h (1)

337-339: LGTM: Proper error handling for consistency timeout.

The addition of kConsistencyTimeout and its corresponding error message properly handles consistency timeout scenarios in the PacificA protocol.

Also applies to: 438-439

include/pika_admin.h (1)

38-38: LGTM! Well-placed member variable for consistency tracking.

The new is_consistency_cmd_ member variable is appropriately initialized to false by default and correctly placed in the private section.

src/pika_server.cc (3)

272-275: LGTM! Thread-safe role tracking implementation.

The last_role() method is properly synchronized using a shared lock to safely access the role state.


692-704: LGTM! Enhanced SetMaster with consistency support.

The SetMaster method is updated to handle consistency mode while maintaining proper synchronization and role state management.


755-762: LGTM! Thread-safe consistency state management.

The IsConsistency and SetIsConsistency methods provide synchronized access to the consistency state.

src/pika_admin.cc (3)

99-99: Documentation update for new command option

The comment has been updated to include the new "strong" option for the slaveof command.


168-168: Propagate consistency flag to SetMaster

The SetMaster call is updated to include the is_consistency_cmd_ parameter, ensuring the consistency mode is properly propagated.

Let's verify the SetMaster method signature:

✅ Verification successful

SetMaster consistency flag propagation verified

The SetMaster method signature in pika_server.h correctly accepts the consistency flag parameter, matching the updated call site.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check SetMaster method declaration
rg "SetMaster.*bool.*consistency" include/

Length of output: 152


138-139: Implementation of strong consistency mode

The code adds support for strong consistency mode by setting is_consistency_cmd_ when the "strong" argument is provided.

Let's verify the initialization of is_consistency_cmd_:

✅ Verification successful

Implementation verified: Strong consistency mode is properly implemented

The is_consistency_cmd_ flag is correctly declared, initialized, and used throughout the codebase:

  • Properly declared as a private boolean member with default initialization to false
  • Correctly set when "strong" argument is provided
  • Appropriately used in server configuration via SetMaster
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if is_consistency_cmd_ is properly initialized in the header file
rg -A 2 "class SlaveofCmd" include/

Length of output: 205


Script:

#!/bin/bash
# Get full SlaveofCmd class definition
ast-grep --pattern 'class SlaveofCmd : public Cmd {
  $$$
}'

# Find constructor implementation and usages
rg "is_consistency_cmd_" -A 2

Length of output: 1721

src/pika_inner_message.proto (3)

63-63: Add consistency flag to MetaSync message

The is_consistency boolean field is added to support strong consistency mode in metadata synchronization.


69-71: Update TrySync message structure

The TrySync message has been updated to:

  1. Use slot instead of db_name for better slot-based replication
  2. Add committed_id for tracking transaction state

138-138: Add prepared_id to TrySync response

The prepared_id field is added to track prepared transaction states in the TrySync response.

Comment on lines 156 to 161
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null check for db to prevent null pointer dereference.

After retrieving the db instance with GetSyncMasterDBByName, ensure that db is not null before using it. This prevents potential crashes if the database is not found.

Apply this diff to add the null check:

 db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
+ if (!db) {
+   LOG(WARNING) << "DB " << worker->db_name_ << " Not Found";
+   return;
+ }
 if (db->GetISConsistency()) {
   Status s = db->CommitAppLog(master_committed_id);
   if (!s.ok()) {
     return;
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if (!db) {
LOG(WARNING) << "DB " << worker->db_name_ << " Not Found";
return;
}
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}

Comment on lines 214 to 218
if(db->GetISConsistency()){
db->AppendSlaveEntries(c_ptr, worker->binlog_item_);
}else{
db->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure db is valid before accessing its methods to prevent null pointer dereference.

The variable db might be null if the database is not found. Before calling db->GetISConsistency(), add a null check to prevent potential crashes.

Apply this diff to add the null check:

 if (!db) {
   LOG(WARNING) << worker->db_name_ << " Not found.";
+  return -1;
 }
 if (db->GetISConsistency()) {
   db->AppendSlaveEntries(c_ptr, worker->binlog_item_);
 } else {
   db->ConsensusProcessLeaderLog(c_ptr, worker->binlog_item_);
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 154 to 161
const InnerMessage::BinlogOffset& committed_id = binlog_res.committed_id();
LogOffset master_committed_id(BinlogOffset(committed_id.filenum(),committed_id.offset()),LogicOffset(committed_id.term(),committed_id.index()));
std::shared_ptr<SyncMasterDB> db =g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
if(db->GetISConsistency()){
Status s= db->CommitAppLog(master_committed_id);
if(!s.ok()){
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid variable shadowing by reusing the existing db variable.

The variable db is redeclared in line 156, which shadows the db declared earlier in the function. This can lead to confusion and potential errors. Consider reusing the existing db variable instead of redeclaring it.

Apply this diff to reuse the existing db variable:

- std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));
+ db = g_pika_rm->GetSyncMasterDBByName(DBInfo(worker->db_name_));

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 123 to 150
class Log {
public:
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::string _binlog)
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), binlog_(std::move(_binlog)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::string binlog_;
};

Log();
int Size();
void AppendLog(const LogItem& item);
LogOffset LastOffset();
LogOffset FirstOffset();
LogItem At(int index);
int FindOffset(const LogOffset& send_offset);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status TruncateFrom(const LogOffset& offset);

private:
int FindLogIndex(const LogOffset& offset);
std::shared_mutex logs_mutex_;
std::vector<LogItem> logs_;
LogOffset last_index_;
LogOffset first_index_;
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider thread safety for the new Log class.

The Log class uses a std::shared_mutex named logs_mutex_, but some methods may not adequately protect shared resources. Review the methods to ensure proper locking and prevent data races.

For example, wrap accesses to logs_ with appropriate locks:

 int Size() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return logs_.size();
 }

 LogOffset LastOffset() {
-  // Existing code
+  std::shared_lock lock(logs_mutex_);
+  return last_index_;
 }

Ensure all public methods accessing shared data are thread-safe.

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +267 to +270
if(is_consistency && cur_logoffset != nullptr){
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential null pointer dereference of cur_logoffset.

In the condition if(is_consistency && cur_logoffset != nullptr), if is_consistency is true but cur_logoffset is nullptr, the code will skip setting cur_logoffset, which might be unintended. Ensure that cur_logoffset is not null when is_consistency is true.

Apply this diff to enforce that cur_logoffset should not be nullptr when is_consistency is true:

 if (is_consistency) {
+  assert(cur_logoffset != nullptr);
   cur_logoffset->b_offset.filenum = version_->pro_num_;
   cur_logoffset->b_offset.offset = version_->pro_offset_;
 }

Alternatively, handle the case where cur_logoffset may be nullptr.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if(is_consistency && cur_logoffset != nullptr){
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}
if(is_consistency) {
assert(cur_logoffset != nullptr);
cur_logoffset->b_offset.filenum=version_->pro_num_;
cur_logoffset->b_offset.offset=version_->pro_offset_;
}

Comment on lines +998 to +1009
Log::Log() = default;

int Log::Size() {
std::shared_lock lock(logs_mutex_);
return static_cast<int>(logs_.size());
}

void Log::AppendLog(const LogItem& item) {
std::lock_guard lock(logs_mutex_);
logs_.push_back(item);
last_index_ = item.offset;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize first_index_ member variable in Log class.

In the Log constructor, first_index_ is not explicitly initialized, which might result in undefined behavior.

Initialize first_index_ in the constructor:

Log::Log() : first_index_(LogOffset()), last_index_(LogOffset()) {}

This ensures that both first_index_ and last_index_ have known initial states.

src/pika_rm.cc Outdated
Comment on lines 409 to 412
bool SyncMasterDB::GetISConsistency(){
coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing return statement in GetISConsistency method

The method GetISConsistency is declared to return a bool, but it does not return any value. This can lead to undefined behavior.

Apply this diff to fix the issue:

 bool SyncMasterDB::GetISConsistency(){
-  coordinator_.GetISConsistency();
+  return coordinator_.GetISConsistency();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool SyncMasterDB::GetISConsistency(){
coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){
bool SyncMasterDB::GetISConsistency(){
return coordinator_.GetISConsistency();
}
void SyncMasterDB::SetPreparedId(const LogOffset& offset){

src/pika_rm.cc Outdated
Comment on lines 406 to 408
void SyncMasterDB::SetConsistency(bool is_consistenct){
coordinator_.SetIsConsistency(is_consistenct);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Typo in parameter name is_consistenct in SetConsistency method

The parameter name is_consistenct is misspelled. It should be is_consistent.

Apply this diff to correct the typo:

 void SyncMasterDB::SetConsistency(bool is_consistenct){
-  coordinator_.SetIsConsistency(is_consistenct);
+  coordinator_.SetIsConsistency(is_consistent);
 }

Committable suggestion skipped: line range outside the PR's diff.

@@ -69,6 +69,7 @@ class SlaveNode : public RmNode {
SyncWindow sync_win;
LogOffset sent_offset;
LogOffset acked_offset;
LogOffset target_offset;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Uninitialized member variable target_offset in SlaveNode

The newly added member variable target_offset in the SlaveNode class should be properly initialized to prevent undefined behavior.

Apply this diff to initialize target_offset in the constructor:

 SlaveNode::SlaveNode(const std::string& ip, int port, const std::string& db_name, int session_id)
     : RmNode(ip, port, db_name, session_id) {
     ...
+    target_offset = LogOffset();
 }

Ensure that target_offset is initialized to a valid state.

Committable suggestion skipped: line range outside the PR's diff.

@@ -160,6 +160,7 @@ enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
kSlaveBinlogSync = 2,
KCandidate = 3,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update SlaveStateMsg array to include KCandidate state.

The KCandidate state was added to the SlaveState enum, but the corresponding debug message array SlaveStateMsg needs to be updated.

Apply this diff:

-const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
+const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (7)
src/pika_command.cc (1)

873-885: Refactor duplicated code for duration measurement

The duration measurement code is duplicated in both branches of the if-else statement. To improve maintainability and reduce code duplication, consider moving the duration measurement outside the if-else block.

Apply this diff to refactor the code:

  if(g_pika_server->IsConsistency()){
    DoBinlog();
    if(res().ok()){
        DoCommand(hint_keys);
    }
  }else{
    DoCommand(hint_keys);
    DoBinlog();
  }
+ if (g_pika_conf->slowlog_slower_than() >= 0) {
+     do_duration_ += pstd::NowMicros() - start_us;
+ }
include/pika_consensus.h (1)

253-254: Ensure consistent naming for getter and setter methods

The method GetISConsistency() has inconsistent naming with its corresponding setter SetConsistency(bool is_consistency). Consider renaming it to GetConsistency() or IsConsistency() to align with naming conventions and improve readability.

src/pika_repl_client_conn.cc (1)

227-248: LGTM! PacificA consistency implementation looks solid.

The implementation correctly:

  • Validates prepared_id from master
  • Handles consistency checks
  • Implements proper error handling
  • Includes debug logging

However, consider adding more detailed error messages to help with debugging:

-          LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
+          LOG(WARNING) << "DB: " << db_name << " master committedId (" << master_prepared_id.ToString() 
+                      << ") is less than slave committedId (" << db->GetCommittedId().ToString() << ")";
include/pika_server.h (1)

567-569: Document the role tracking variables.

The role tracking variables need documentation to explain their purpose and valid states.

Add documentation above the variables:

  /*
   * Consistency and role management
   * is_consistency_: Indicates if the server is running in consistency mode
   * role_: Current role of the server (SINGLE/MASTER/SLAVE)
   * last_role_: Previous role of the server, used for role transition handling
   */
  bool is_consistency_ = false;
  int role_ = PIKA_ROLE_SINGLE;
  int last_role_ = PIKA_ROLE_SINGLE;
src/pika_consensus.cc (1)

796-803: Improve thread safety in consistency operations.

The consistency operations need better synchronization and documentation:

+/**
+ * Sets the consistency mode for the coordinator.
+ * Thread-safe operation protected by is_consistency_rwlock_.
+ */
 void ConsensusCoordinator::SetConsistency(bool is_consistency) {
   std::lock_guard l(is_consistency_rwlock_);
   is_consistency_ = is_consistency;
+  LOG(INFO) << "Consistency mode set to: " << is_consistency;
 }

+/**
+ * Gets the current consistency mode.
+ * Thread-safe operation protected by is_consistency_rwlock_.
+ * @return Current consistency mode
+ */
 bool ConsensusCoordinator::GetISConsistency() {
   std::shared_lock l(is_consistency_rwlock_);
   return is_consistency_;
 }
src/pika_rm.cc (1)

817-822: Improve error handling in UpdateSyncBinlogStatus.

The consistency update needs better error handling:

 if(db->GetISConsistency()){
   s = db->UpdateCommittedID();
   if (!s.ok()) {
+    LOG(WARNING) << "Failed to update committed ID: " << s.ToString();
     return s;
   }
 }
src/pika_server.cc (1)

754-761: Add thread safety documentation for consistency methods.

+/**
+ * Checks if the server is in consistency mode.
+ * Thread-safe operation protected by state_protector_.
+ * @return Current consistency mode
+ */
 bool PikaServer::IsConsistency() {
   std::shared_lock sp_l(state_protector_);
   return is_consistency_;
 }

+/**
+ * Sets the server's consistency mode.
+ * Thread-safe operation protected by state_protector_.
+ * @param is_consistency New consistency mode value
+ */
 void PikaServer::SetConsistency(bool is_consistency) {
   std::shared_lock sp_l(state_protector_);
   is_consistency_ = is_consistency;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21e1e9e and 2afafc0.

📒 Files selected for processing (15)
  • include/pika_command.h (2 hunks)
  • include/pika_consensus.h (4 hunks)
  • include/pika_define.h (2 hunks)
  • include/pika_server.h (12 hunks)
  • include/pika_slave_node.h (1 hunks)
  • src/pika_binlog.cc (3 hunks)
  • src/pika_command.cc (2 hunks)
  • src/pika_consensus.cc (19 hunks)
  • src/pika_repl_bgworker.cc (3 hunks)
  • src/pika_repl_client.cc (3 hunks)
  • src/pika_repl_client_conn.cc (2 hunks)
  • src/pika_repl_server.cc (1 hunks)
  • src/pika_repl_server_conn.cc (13 hunks)
  • src/pika_rm.cc (7 hunks)
  • src/pika_server.cc (30 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • include/pika_command.h
  • include/pika_slave_node.h
  • src/pika_repl_bgworker.cc
🧰 Additional context used
🪛 cppcheck (2.10-2)
src/pika_server.cc

[performance] 1639-1639: Prefer prefix ++/-- operators for non-primitive types.

(postfixOperator)


[performance] 1760-1760: Prefer prefix ++/-- operators for non-primitive types.

(postfixOperator)

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_centos
  • GitHub Check: Analyze (go)
  • GitHub Check: build_on_macos
🔇 Additional comments (15)
src/pika_command.cc (1)

972-981: LGTM!

The enhanced error handling logic for binlog write failures correctly differentiates between consistency timeouts and other errors, providing appropriate responses and logging.

src/pika_repl_server.cc (1)

106-109: LGTM!

The addition of the committed_id to the binlog synchronization response when the server is in consistency mode appropriately enhances consistency handling.

include/pika_consensus.h (1)

120-147: Consider thread safety for the new Log class

The Log class uses a pstd::Mutex named logs_mutex_, but some methods may not adequately protect shared resources. Please ensure that all methods accessing shared data are properly synchronized to prevent data races.

include/pika_define.h (3)

163-163: LGTM! The KCandidate state addition aligns with PacificA protocol.

The addition of KCandidate state to the SlaveState enum is essential for implementing the PacificA protocol's leader election mechanism.


167-167: LGTM! SlaveStateMsg array correctly updated.

The SlaveStateMsg array has been properly updated to include the "Candidate" string representation for the new KCandidate state.


278-278: LGTM! WriteTask enhancements support transaction tracking.

The addition of committed_id_ field and the new constructor in WriteTask struct properly support tracking committed transactions in the distributed log system, which is crucial for maintaining consistency in the PacificA protocol.

Also applies to: 282-283

src/pika_binlog.cc (2)

173-202: LGTM! New Put method properly implements log offset tracking.

The new Put method correctly handles:

  • Log offset tracking
  • Error handling
  • Lock management using DEFER
  • Binlog encoding with proper parameters

267-270: ⚠️ Potential issue

Add null pointer check for cur_logoffset when is_consistency is true.

When is_consistency is true, we should ensure cur_logoffset is not null before dereferencing it.

Apply this diff:

-    if(is_consistency && cur_logoffset != nullptr){
+    if(is_consistency) {
+      if (cur_logoffset == nullptr) {
+        return Status::InvalidArgument("cur_logoffset cannot be null when is_consistency is true");
+      }
       cur_logoffset->b_offset.filenum=version_->pro_num_;
       cur_logoffset->b_offset.offset=version_->pro_offset_;
     }

Likely invalid or redundant comment.

src/pika_repl_client.cc (1)

263-268: ⚠️ Potential issue

Add null pointer check for master_db.

The code assumes master_db is not null, which could lead to crashes.

Apply this diff:

   std::shared_ptr<SyncMasterDB> master_db =g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
+  if (!master_db) {
+    LOG(WARNING) << "Master DB not found for db_name: " << db_name;
+    return Status::NotFound("Master DB not found");
+  }
   if(master_db->GetISConsistency()){
     InnerMessage::BinlogOffset* committed_id = try_sync->mutable_committed_id();
     LogOffset master_committed_id = master_db->GetCommittedId();
     g_pika_rm->BuildBinlogOffset(master_committed_id,committed_id);
   }

Likely invalid or redundant comment.

include/pika_server.h (1)

118-122: LGTM! DB locking helpers improve thread safety.

The addition of DB locking helper methods improves code readability and ensures proper lock management.

src/pika_repl_server_conn.cc (3)

31-31: ⚠️ Potential issue

Validate the consistency flag before use.

Add a check to ensure the consistency flag is properly set in the request:

-bool is_consistency = meta_sync_request.is_consistency();
+bool is_consistency = meta_sync_request.has_is_consistency() ? meta_sync_request.is_consistency() : false;

Likely invalid or redundant comment.


228-261: 🛠️ Refactor suggestion

Improve committed ID validation and error handling.

The committed ID handling needs better validation and error messages:

 if(db->GetISConsistency()){
-  if(try_sync_request.has_committed_id()){
+  if(!try_sync_request.has_committed_id()) {
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    LOG(WARNING) << "Committed ID not provided in TrySync request from " << node.ip() << ":" << node.port();
+    return false;
+  }
   const InnerMessage::BinlogOffset& slave_committed_id = try_sync_request.committed_id();
   LogOffset committed_id(BinlogOffset(slave_committed_id.filenum(), slave_committed_id.offset()), 
                         LogicOffset(slave_committed_id.term(), slave_committed_id.index()));
   if (db->GetCommittedId() < committed_id) {
     try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
-    LOG(WARNING) << "DB Name: " << db_name << "Slave CommittedId Greater than master";
+    LOG(WARNING) << "Slave committed ID (" << committed_id.ToString() 
+                 << ") is greater than master's (" << db->GetCommittedId().ToString() 
+                 << ") for DB: " << db_name;
     return false;
   }

Likely invalid or redundant comment.


54-68: 🛠️ Refactor suggestion

Enhance error handling in consistency processing.

The coordinator processing lacks proper error handling and logging. Consider:

 if (is_consistency) {
   auto master_dbs = g_pika_rm->GetSyncMasterDBs();
   g_pika_server->SetConsistency(is_consistency);
   for (auto& db : master_dbs) {
     if (slave_size == 0) {
       db.second->SetConsistency(is_consistency);
       db.second->InitContext();
+      LOG(INFO) << "Processing coordination for DB: " << db.first;
       Status s = db.second->ProcessCoordination();
       if (!s.ok()) {
         response.set_code(InnerMessage::kError);
-        response.set_reply("master ProcessCoordination error");
+        response.set_reply("Master ProcessCoordination error: " + s.ToString());
+        LOG(WARNING) << "ProcessCoordination failed for DB: " << db.first << ", Error: " << s.ToString();
+        break;  // Exit on first error
       }
     }
   }
 }

Likely invalid or redundant comment.

src/pika_consensus.cc (2)

220-226: ⚠️ Potential issue

Initialize member variables in constructor.

The constructor should initialize all member variables:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) 
-    : db_name_(db_name) {
+    : db_name_(db_name)
+    , is_consistency_(false)
+    , committed_id_(LogOffset())
+    , prepared_id_(LogOffset()) {
   std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/";
   std::string log_path = db_log_path;
   context_ = std::make_shared<Context>(log_path + kContext);
   stable_logger_ = std::make_shared<StableLog>(db_name, log_path);
   mem_logger_ = std::make_shared<MemLog>();
   logs_ = std::make_shared<Log>();
 }

Likely invalid or redundant comment.


1000-1067: 🛠️ Refactor suggestion

Enhance thread safety and error handling in Log class.

The Log class needs better synchronization and error handling:

  1. Initialize member variables in constructor
  2. Add proper synchronization for critical operations
  3. Improve error handling
 Log::Log() = default;
+    : first_index_(LogOffset())
+    , last_index_(LogOffset()) {}

 void Log::AppendLog(const LogItem& item) {
   std::lock_guard lock(logs_mutex_);
   logs_.push_back(item);
   last_index_ = item.offset;
+  if (logs_.size() == 1) {
+    first_index_ = item.offset;
+  }
 }

 Status Log::Truncate(const LogOffset& offset) {
   std::lock_guard lock(logs_mutex_);
   int index = FindLogIndex(offset);
   if (index < 0) {
-    return Status::Corruption("Can't find correct index");
+    return Status::Corruption("Failed to find index for offset: " + offset.ToString());
   }
   last_index_ = logs_[index].offset;
   logs_.erase(logs_.begin() + index + 1, logs_.end());
   return Status::OK();
 }

Likely invalid or redundant comment.

Comment on lines +183 to +194
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for ProcessCoordination.

The ProcessCoordination call should include error handling.

Apply this diff:

       if (g_pika_server->slaves_.size() == 0) {
         db.second->SetConsistency(is_consistency);
         db.second->InitContext();
-        Status s = db.second->ProcessCoordination();
+        Status s = db.second->ProcessCoordination();
+        if (!s.ok()) {
+          LOG(WARNING) << "Failed to process coordination for DB: " << db.first 
+                      << ", error: " << s.ToString();
+          return s;
+        }
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
}
}
}
bool is_consistency = g_pika_server->IsConsistency();
meta_sync->set_is_consistency(is_consistency);
if(is_consistency){
auto master_dbs = g_pika_rm->GetSyncMasterDBs();
for (auto& db : master_dbs) {
if (g_pika_server->slaves_.size() == 0) {
db.second->SetConsistency(is_consistency);
db.second->InitContext();
Status s = db.second->ProcessCoordination();
if (!s.ok()) {
LOG(WARNING) << "Failed to process coordination for DB: " << db.first
<< ", error: " << s.ToString();
return s;
}
}
}
}

Comment on lines +407 to +411
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in binlog sync processing.

The binlog sync handling needs better error handling and status validation:

 if (master_db->GetISConsistency()) {
   Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    return;
+  }
 } else {
   Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    return;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
}
if (master_db->GetISConsistency()) {
Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
if (!s.ok()) {
LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port()
<< ", Error: " << s.ToString();
return;
}
} else {
Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
if (!s.ok()) {
LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port()
<< ", Error: " << s.ToString();
return;
}
}

Comment on lines +874 to +891
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
ApplyBinlog(log.cmd_ptr);
}
}

logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in commit log operations.

The commit log operations need better error handling and logging:

 Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
+  if (master_committed_id == LogOffset()) {
+    return Status::InvalidArgument("Invalid master committed ID");
+  }
+
   int index = logs_->FindOffset(logs_->FirstOffset());
   int log_size = logs_->Size();
   LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index 
             << " log_size: " << log_size
             << " , m_offset: " << master_committed_id.ToString();
+
+  if (index < 0) {
+    return Status::Corruption("Failed to find starting offset");
+  }
+
   for (int i = index; i < log_size; ++i) {
     Log::LogItem log = logs_->At(i);
     if (log.offset >= master_committed_id) {
       LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
                 << ", ApplyLog: " << log.offset.ToString();
-      ApplyBinlog(log.cmd_ptr);
+      Status s = ApplyBinlog(log.cmd_ptr);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
+        return s;
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
ApplyBinlog(log.cmd_ptr);
}
}
logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}
Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
if (master_committed_id == LogOffset()) {
return Status::InvalidArgument("Invalid master committed ID");
}
int index = logs_->FindOffset(logs_->FirstOffset());
int log_size = logs_->Size(); // Cache log size
LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index
<< " log_size: " << log_size
<< " , m_offset: " << master_committed_id.ToString();
if (index < 0) {
return Status::Corruption("Failed to find starting offset");
}
for (int i = index; i < log_size; ++i) {
Log::LogItem log = logs_->At(i);
if (log.offset >= master_committed_id) {
LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
<< ", ApplyLog: " << log.offset.ToString();
Status s = ApplyBinlog(log.cmd_ptr);
if (!s.ok()) {
LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
return s;
}
}
}
logs_->TruncateFrom(master_committed_id); // Truncate logs
SetCommittedId(master_committed_id); // Update committed ID
return Status::OK();
}

Comment on lines +690 to +703
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve SetMaster method with better validation.

The SetMaster method needs better validation and error handling:

-bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+  if (master_port <= 0 || master_port > 65535) {
+    LOG(WARNING) << "Invalid master port: " << master_port;
+    return false;
+  }
+
   if (master_ip == "127.0.0.1") {
     master_ip = host_;
   }
+
   std::lock_guard l(state_protector_);
   if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
     master_ip_ = master_ip;
     master_port_ = master_port;
     last_role_ = role_;
     role_ |= PIKA_ROLE_SLAVE;
     repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
     is_consistency_ = is_consistency;
     LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
               << " consistency: " << is_consistency_;
     return true;
   }
   return false;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
if (master_port <= 0 || master_port > 65535) {
LOG(WARNING) << "Invalid master port: " << master_port;
return false;
}
if (master_ip == "127.0.0.1") {
master_ip = host_;
}
std::lock_guard l(state_protector_);
if (((role_ ^ PIKA_ROLE_SLAVE) != 0) && repl_state_ == PIKA_REPL_NO_CONNECT) {
master_ip_ = master_ip;
master_port_ = master_port;
last_role_ = role_;
role_ |= PIKA_ROLE_SLAVE;
repl_state_ = PIKA_REPL_SHOULD_META_SYNC;
is_consistency_ = is_consistency;
LOG(INFO) << "Slavecmd Master, ip_port: " << master_ip_ << ":" << master_port_
<< " consistency: " << is_consistency_;
return true;
}
return false;
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
include/pika_consensus.h (3)

120-146: Add documentation for the Log class methods

The Log class implementation looks solid, but lacks documentation. Consider adding:

  • Method descriptions
  • Parameter explanations
  • Return value descriptions
  • Thread safety guarantees

Example documentation format:

/**
 * Appends a new log entry to the log.
 * @param item The log item to append
 * @thread-safety Thread-safe through logs_mutex_
 */
void AppendLog(const LogItem& item);

248-262: Consider adding error handling for edge cases

The PacificA-related methods should handle potential edge cases:

  • Network failures during coordination
  • Timeouts during consistency checks
  • Recovery from partial failures

Consider adding error handling:

pstd::Status ProcessCoordination() {
  try {
    // ... existing code ...
  } catch (const std::exception& e) {
    return pstd::Status::IOError("Coordination failed: " + std::string(e.what()));
  }
  return pstd::Status::OK();
}

284-291: Consider initializing member variables in constructor

Move the initialization of member variables to the constructor for better encapsulation:

ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) 
  : is_consistency_(false)
  , committed_id_(LogOffset())
  , prepared_id_(LogOffset())
  , logs_(std::make_shared<Log>()) {
  // ... rest of constructor
}
src/pika_repl_server_conn.cc (1)

228-263: Consider extracting consistency check logic

The consistency check logic could be extracted into a separate method for better maintainability:

bool ValidateConsistencyState(const std::shared_ptr<SyncMasterDB>& db,
                            const InnerMessage::TrySync& try_sync_request,
                            InnerMessage::TrySync* try_sync_response) {
  if (!try_sync_request.has_committed_id()) {
    try_sync_response->set_reply_code(InnerMessage::TrySync::kError);
    LOG(WARNING) << "TrySync no slave commmittedID";
    return false;
  }
  // ... rest of consistency validation logic
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2afafc0 and 7317faf.

📒 Files selected for processing (5)
  • include/pika_consensus.h (4 hunks)
  • include/pika_server.h (7 hunks)
  • src/pika_repl_client_conn.cc (2 hunks)
  • src/pika_repl_server_conn.cc (5 hunks)
  • src/pika_server.cc (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/pika_repl_client_conn.cc
  • include/pika_server.h
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_centos
  • GitHub Check: build_on_macos
  • GitHub Check: Analyze (go)
🔇 Additional comments (8)
include/pika_consensus.h (3)

28-28: LGTM: Proper initialization of applied_index_

The initialization of applied_index_ with LogOffset() ensures a clean initial state.


55-70: LGTM: Thread-safe implementation of binlog state management

The implementation correctly uses mutex locks to ensure thread-safe operations on shared state.

Also applies to: 76-76


263-279: LGTM: Thread-safe ID management

The implementation correctly uses mutex locks for thread-safe access to committed and prepared IDs.

src/pika_repl_server_conn.cc (2)

31-68: LGTM: Proper handling of consistency in meta sync

The implementation correctly:

  • Handles consistency flags
  • Initializes master databases
  • Processes coordination with proper error handling

410-414: ⚠️ Potential issue

Add error handling for AppendCandidateBinlog

The status returned by AppendCandidateBinlog is not being checked:

 if (master_db->GetISConsistency()) {
   Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    conn->NotifyClose();
+    return;
+  }
 } else {
   Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to activate slave binlog sync for " << node.ip() << ":" << node.port() 
+                 << ", Error: " << s.ToString();
+    conn->NotifyClose();
+    return;
+  }
 }

Likely invalid or redundant comment.

src/pika_server.cc (3)

271-274: LGTM: Thread-safe role access

The method correctly uses shared locking to access the last role state.


755-762: LGTM: Thread-safe consistency state management

The implementation correctly uses shared locks to manage consistency state.


691-704: ⚠️ Potential issue

Add port validation in SetMaster

The method should validate the master port before proceeding:

 bool PikaServer::SetMaster(std::string& master_ip, int master_port, bool is_consistency) {
+  if (master_port <= 0 || master_port > 65535) {
+    LOG(WARNING) << "Invalid master port: " << master_port;
+    return false;
+  }
   if (master_ip == "127.0.0.1") {
     master_ip = host_;
   }
   // ... rest of the method
 }

Likely invalid or redundant comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/pika_consensus.cc (3)

229-229: ⚠️ Potential issue

Initialize is_consistency_ member variable in constructor.

The is_consistency_ member variable is not initialized in the constructor, which could lead to undefined behavior.

Initialize is_consistency_ in the constructor:

 ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name)
-    : db_name_(db_name) {
+    : db_name_(db_name), is_consistency_(false) {
   logs_ = std::make_shared<Log>();

375-397: 🛠️ Refactor suggestion

⚠️ Potential issue

Ensure thread safety and improve error handling.

  1. Thread safety concerns:

The code accesses shared data structures without proper synchronization.

  1. Error handling improvements needed:

The error handling in the non-consistency mode could be more robust:

 } else {
   LogOffset committed_index;
   Status s = sync_pros_.Update(ip, port, start, end, &committed_index);
   if (!s.ok()) {
+    LOG(WARNING) << "Failed to update sync progress: " << s.ToString();
     return s;
   }
+  LOG(INFO) << "Successfully updated sync progress for " << ip << ":" << port;
 }

1008-1019: ⚠️ Potential issue

Initialize Log class member variables properly.

The Log class members need proper initialization:

-Log::Log() = default;
+Log::Log() : first_index_(LogOffset()), last_index_(LogOffset()) {}

Additionally, consider adding invariant checks in AppendLog:

 void Log::AppendLog(const LogItem& item) {
   std::lock_guard lock(logs_mutex_);
+  if (!logs_.empty() && item.offset <= last_index_) {
+    LOG(WARNING) << "Attempting to append out-of-order log entry";
+    return;
+  }
   logs_.push_back(item);
   last_index_ = item.offset;
 }
🧹 Nitpick comments (4)
src/pika_repl_server_conn.cc (1)

229-263: Refactor consistency check logic for better readability.

The nested conditions and multiple early returns make the code harder to follow. Consider extracting the consistency check into a separate method.

Suggested refactor:

+bool PikaReplServerConn::ValidateConsistencyState(
+    const std::shared_ptr<SyncMasterDB>& db,
+    const InnerMessage::InnerRequest::TrySync& try_sync_request,
+    InnerMessage::InnerResponse::TrySync* try_sync_response) {
+  if (!db->GetISConsistency()) {
+    return true;  // Skip consistency check
+  }
+  
+  if (!try_sync_request.has_committed_id()) {
+    LOG(WARNING) << "TrySync no slave commmittedID";
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    return false;
+  }
+  
+  const InnerMessage::BinlogOffset& slave_committed_id = try_sync_request.committed_id();
+  LogOffset committed_id(
+      BinlogOffset(slave_committed_id.filenum(), slave_committed_id.offset()),
+      LogicOffset(slave_committed_id.term(), slave_committed_id.index()));
+  
+  if (db->GetCommittedId() < committed_id) {
+    try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
+    LOG(WARNING) << "DB Name: " << db->DBName() << " Slave CommittedId Greater than master"
+                 << " master_id: " << db->GetCommittedId().ToString()
+                 << " slave_id: " << committed_id.ToString();
+    return false;
+  }
+  
+  // Set prepared ID in response
+  InnerMessage::BinlogOffset* master_prepared_id = try_sync_response->mutable_prepared_id();
+  g_pika_rm->BuildBinlogOffset(db->GetPreparedId(), master_prepared_id);
+  return true;
+}

Then use it in TrySyncOffsetCheck:

 if(db->GetISConsistency()){
-  if(try_sync_request.has_committed_id()){ ... }
-  else { ... }
+  if (!ValidateConsistencyState(db, try_sync_request, try_sync_response)) {
+    return false;
+  }
 }
src/pika_consensus.cc (3)

1031-1034: Improve method documentation and translate comments to English.

 Log::LogItem Log::At(int index) {
   std::shared_lock lock(logs_mutex_);
-  return logs_.at(index);  // 使用 at() 确保边界安全
+  return logs_.at(index);  // Using at() for bounds checking
 }

Consider adding method documentation:

/**
 * @brief Retrieves a log item at the specified index with bounds checking
 * @param index The index of the log item to retrieve
 * @return The log item at the specified index
 * @throws std::out_of_range if the index is out of bounds
 */

1068-1075: Consider performance optimization and improve documentation for FindLogIndex.

  1. The current linear search implementation could be inefficient for large logs. Consider using a more efficient data structure or indexing mechanism.
  2. Add clear documentation about return value semantics.
+/**
+ * @brief Finds the index of a log entry with the specified offset
+ * @param offset The offset to search for
+ * @return The index of the log entry if found, -1 otherwise
+ */
 int Log::FindLogIndex(const LogOffset& offset) {
+  // TODO: Consider using a more efficient data structure (e.g., map or binary search)
+  // for better performance with large logs
   for (size_t i = 0; i < logs_.size(); ++i) {
     if (logs_[i].offset == offset) {
       return static_cast<int>(i);
     }
   }
   return -1;
 }

802-1075: Consider architectural improvements for better maintainability and performance.

  1. Separation of Concerns:

    • Consider splitting the Log class into separate components for storage and indexing
    • Extract the consistency checking logic into a dedicated class
  2. Performance Optimizations:

    • Consider using more efficient data structures for log storage and indexing
    • Implement batch processing for log operations
    • Add caching mechanisms for frequently accessed log entries
  3. Error Recovery:

    • Consider implementing a recovery mechanism for partial failures
    • Add periodic state validation
  4. Monitoring:

    • Add metrics for monitoring replication lag
    • Implement health checks for consensus state

Would you like me to provide more detailed suggestions for any of these areas?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7317faf and 03d4a6e.

📒 Files selected for processing (4)
  • include/pika_server.h (6 hunks)
  • src/pika_consensus.cc (4 hunks)
  • src/pika_repl_server_conn.cc (4 hunks)
  • src/pika_server.cc (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/pika_server.h
  • src/pika_server.cc
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: build_on_centos
  • GitHub Check: build_on_macos
  • GitHub Check: build_on_ubuntu
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
src/pika_repl_server_conn.cc (3)

410-414: ⚠️ Potential issue

Add error handling for AppendCandidateBinlog.

The status returned by AppendCandidateBinlog is not checked in the consistency case, which could lead to silent failures.

Fix the error handling:

     if (master_db->GetISConsistency()) {
       Status s = master_db->AppendCandidateBinlog(node.ip(), node.port(), range_start);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to append candidate binlog for " << node.ip() << ":" << node.port() 
+                     << ", Error: " << s.ToString();
+        conn->NotifyClose();
+        return;
+      }
     } else {
       Status s = master_db->ActivateSlaveBinlogSync(node.ip(), node.port(), range_start);
     }

Likely invalid or redundant comment.


61-65: ⚠️ Potential issue

Improve error handling for ProcessCoordination.

The error from ProcessCoordination is not properly propagated. The loop continues even after an error, which could leave the system in an inconsistent state.

Add proper error propagation:

           Status s = db.second->ProcessCoordination();
           if (!s.ok()) {
             response.set_code(InnerMessage::kError);
-            response.set_reply("master ProcessCoordination error");
+            response.set_reply("Master ProcessCoordination error: " + s.ToString());
+            LOG(WARNING) << "ProcessCoordination failed for DB: " << db.first 
+                        << ", Error: " << s.ToString();
+            break;  // Exit the loop on error
           }

Likely invalid or redundant comment.


58-66: Add synchronization around slave size check.

There's a potential race condition between checking slave_size and setting consistency. Another thread could add/remove slaves between the check and the initialization.

Consider adding proper synchronization:

-      if (slave_size == 0) {
+      std::lock_guard<std::mutex> guard(slave_mutex_);  // Add appropriate mutex
+      if (g_pika_server->slave_size() == 0) {
src/pika_consensus.cc (1)

882-899: ⚠️ Potential issue

Improve error handling and state management in CommitAppLog.

The method needs better error handling and state management:

  1. Add input validation
  2. Handle ApplyBinlog errors
  3. Ensure atomic state updates
 Status ConsensusCoordinator::CommitAppLog(const LogOffset& master_committed_id) {
+  if (master_committed_id == LogOffset()) {
+    return Status::InvalidArgument("Invalid master committed ID");
+  }
+
   int index = logs_->FindOffset(logs_->FirstOffset());
   int log_size = logs_->Size();
+  if (index < 0) {
+    return Status::Corruption("Failed to find starting offset");
+  }
+
   LOG(INFO) << "PacificA CommitAppLog master_committed_id index: " << index << " log_size: " << log_size
             << " , m_offset: " << master_committed_id.ToString();
   for (int i = index; i < log_size; ++i) {
     Log::LogItem log = logs_->At(i);
     if (log.offset >= master_committed_id) {
       LOG(INFO) << "PacificA master_committed_id: " << master_committed_id.ToString()
                 << ", ApplyLog: " << log.offset.ToString();
-      ApplyBinlog(log.cmd_ptr);
+      Status s = ApplyBinlog(log.cmd_ptr);
+      if (!s.ok()) {
+        LOG(WARNING) << "Failed to apply binlog: " << s.ToString();
+        return s;
+      }
     }
   }
-  logs_->TruncateFrom(master_committed_id);
-  SetCommittedId(master_committed_id);
+  Status s = logs_->TruncateFrom(master_committed_id);
+  if (!s.ok()) {
+    LOG(WARNING) << "Failed to truncate logs: " << s.ToString();
+    return s;
+  }
+  SetCommittedId(master_committed_id);
   return Status::OK();
 }

Likely invalid or redundant comment.

Comment on lines +963 to +989
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
std::vector<WriteTask> tasks;

// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);

slave_ptr->SetLastSendTime(pstd::NowMicros());

RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));

slave_ptr->sent_offset = item.offset;
}
}
}

if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
}
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling and performance in SendBinlog.

The method needs improvements in several areas:

  1. Add input validation
  2. Improve error handling
  3. Optimize vector operations
 Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
+  if (!slave_ptr) {
+    return Status::InvalidArgument("Invalid slave pointer");
+  }
+  if (db_name.empty()) {
+    return Status::InvalidArgument("Empty database name");
+  }
+
   std::vector<WriteTask> tasks;
+  tasks.reserve(logs_->Size());  // Pre-allocate space to avoid reallocations
 
   if (logs_->LastOffset() >= slave_ptr->acked_offset) {
     int index = logs_->FindOffset(slave_ptr->acked_offset);
+    if (index < 0) {
+      return Status::Corruption("Failed to find offset");
+    }
     if (index < logs_->Size()) {
       for (int i = index; i < logs_->Size(); ++i) {
         const Log::LogItem& item = logs_->At(i);
         slave_ptr->SetLastSendTime(pstd::NowMicros());
 
         RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
         WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
         tasks.emplace_back(std::move(task));
 
         slave_ptr->sent_offset = item.offset;
       }
     }
   }
 
   if (!tasks.empty()) {
-    g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
+    Status s = g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
+    if (!s.ok()) {
+      LOG(WARNING) << "Failed to produce write queue: " << s.ToString();
+      return s;
+    }
   }
   return Status::OK();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
std::vector<WriteTask> tasks;
// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);
slave_ptr->SetLastSendTime(pstd::NowMicros());
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));
slave_ptr->sent_offset = item.offset;
}
}
}
if (!tasks.empty()) {
g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
}
return Status::OK();
}
Status ConsensusCoordinator::SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name) {
if (!slave_ptr) {
return Status::InvalidArgument("Invalid slave pointer");
}
if (db_name.empty()) {
return Status::InvalidArgument("Empty database name");
}
std::vector<WriteTask> tasks;
tasks.reserve(logs_->Size()); // Pre-allocate space to avoid reallocations
// Check if there are new log entries that need to be sent to the slave
if (logs_->LastOffset() >= slave_ptr->acked_offset) {
// Find the index of the log entry corresponding to the slave's acknowledged offset
int index = logs_->FindOffset(slave_ptr->acked_offset);
if (index < 0) {
return Status::Corruption("Failed to find offset");
}
if (index < logs_->Size()) {
for (int i = index; i < logs_->Size(); ++i) {
const Log::LogItem& item = logs_->At(i);
slave_ptr->SetLastSendTime(pstd::NowMicros());
RmNode rm_node(slave_ptr->Ip(), slave_ptr->Port(), slave_ptr->DBName(), slave_ptr->SessionId());
WriteTask task(rm_node, BinlogChip(item.offset, item.binlog_), slave_ptr->sent_offset, GetCommittedId());
tasks.emplace_back(std::move(task));
slave_ptr->sent_offset = item.offset;
}
}
}
if (!tasks.empty()) {
Status s = g_pika_rm->ProduceWriteQueue(slave_ptr->Ip(), slave_ptr->Port(), db_name, tasks);
if (!s.ok()) {
LOG(WARNING) << "Failed to produce write queue: " << s.ToString();
return s;
}
}
return Status::OK();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant