Skip to content

Commit

Permalink
Merge pull request ClickHouse#69706 from ClickHouse/pror_events_for_pr
Browse files Browse the repository at this point in the history
New profile events for parallel replicas
  • Loading branch information
nickitat authored and zvonand committed Dec 17, 2024
1 parent 556c04a commit bd014e1
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 31 deletions.
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ The server successfully detected this situation and will download merged part fr
\
M(ParallelReplicasHandleRequestMicroseconds, "Time spent processing requests for marks from replicas") \
M(ParallelReplicasHandleAnnouncementMicroseconds, "Time spent processing replicas announcements") \
M(ParallelReplicasAnnouncementMicroseconds, "Time spent to send an announcement") \
M(ParallelReplicasReadRequestMicroseconds, "Time spent for read requests") \
\
M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \
M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \
Expand Down
20 changes: 6 additions & 14 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,8 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit
{
const auto & client_info = context->getClientInfo();

auto extension = ParallelReadingExtension
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
};
auto extension = ParallelReadingExtension{
all_ranges_callback.value(), read_task_callback.value(), number_of_current_replica.value_or(client_info.number_of_current_replica), context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount()};

/// We have a special logic for local replica. It has to read less data, because in some cases it should
/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
Expand Down Expand Up @@ -549,13 +544,10 @@ Pipe ReadFromMergeTree::readInOrder(
if (is_parallel_reading_from_replicas)
{
const auto & client_info = context->getClientInfo();
ParallelReadingExtension extension
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
};
ParallelReadingExtension extension{
all_ranges_callback.value(),
read_task_callback.value(),
number_of_current_replica.value_or(client_info.number_of_current_replica), context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount()};

const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
Expand Down
14 changes: 6 additions & 8 deletions src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ namespace DB

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}

MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
Expand Down Expand Up @@ -126,11 +127,9 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
min_marks_per_task,
pool_settings.threads,
pool_settings.sum_marks,
extension.total_nodes_count))
extension.getTotalNodesCount()))
{

extension.all_callback(InitialAllRangesAnnouncement(
coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size));
extension.sendInitialRequest(coordination_mode, parts_ranges, mark_segment_size);
}

MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task)
Expand All @@ -142,12 +141,11 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id

if (buffered_ranges.empty())
{
auto result = extension.callback(ParallelReadRequest(
auto result = extension.sendReadRequest(
coordination_mode,
extension.number_of_current_replica,
min_marks_per_task * pool_settings.threads,
/// For Default coordination mode we don't need to pass part names.
RangesInDataPartsDescription{}));
RangesInDataPartsDescription{});

if (!result || result->finish)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});

extension.all_callback(
InitialAllRangesAnnouncement(mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, /*mark_segment_size_=*/0));
extension.sendInitialRequest(mode, parts_ranges, /*mark_segment_size_=*/0);
}

MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
Expand Down Expand Up @@ -83,8 +82,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
if (no_more_tasks)
return nullptr;

auto response
= extension.callback(ParallelReadRequest(mode, extension.number_of_current_replica, min_marks_per_task * request.size(), request));
auto response = extension.sendReadRequest(mode, min_marks_per_task * request.size(), request);

if (!response || response->description.empty() || response->finish)
{
Expand Down
58 changes: 58 additions & 0 deletions src/Storages/MergeTree/MergeTreeSelectProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,39 @@
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
#include <city.h>

namespace
{

template <typename Func>
struct TelemetryWrapper
{
TelemetryWrapper(Func callback_, ProfileEvents::Event event_, std::string span_name_)
: callback(std::move(callback_)), event(event_), span_name(std::move(span_name_))
{
}

template <typename... Args>
auto operator()(Args &&... args)
{
DB::OpenTelemetry::SpanHolder span(span_name);
DB::ProfileEventTimeIncrement<DB::Time::Microseconds> increment(event);
return callback(std::forward<Args>(args)...);
}

private:
Func callback;
ProfileEvents::Event event;
std::string span_name;
};

}

namespace ProfileEvents
{
extern const Event ParallelReplicasAnnouncementMicroseconds;
extern const Event ParallelReplicasReadRequestMicroseconds;
}

namespace DB
{

Expand All @@ -23,6 +56,31 @@ namespace ErrorCodes
extern const int QUERY_WAS_CANCELLED;
}

ParallelReadingExtension::ParallelReadingExtension(
MergeTreeAllRangesCallback all_callback_,
MergeTreeReadTaskCallback callback_,
size_t number_of_current_replica_,
size_t total_nodes_count_)
: number_of_current_replica(number_of_current_replica_), total_nodes_count(total_nodes_count_)
{
all_callback = TelemetryWrapper<MergeTreeAllRangesCallback>{
std::move(all_callback_), ProfileEvents::ParallelReplicasAnnouncementMicroseconds, "ParallelReplicasAnnouncement"};

callback = TelemetryWrapper<MergeTreeReadTaskCallback>{
std::move(callback_), ProfileEvents::ParallelReplicasReadRequestMicroseconds, "ParallelReplicasReadRequest"};
}

void ParallelReadingExtension::sendInitialRequest(CoordinationMode mode, const RangesInDataParts & ranges, size_t mark_segment_size) const
{
all_callback(InitialAllRangesAnnouncement{mode, ranges.getDescriptions(), number_of_current_replica, mark_segment_size});
}

std::optional<ParallelReadResponse> ParallelReadingExtension::sendReadRequest(
CoordinationMode mode, size_t min_number_of_marks, const RangesInDataPartsDescription & description) const
{
return callback(ParallelReadRequest{mode, number_of_current_replica, min_number_of_marks, description});
}

MergeTreeSelectProcessor::MergeTreeSelectProcessor(
MergeTreeReadPoolPtr pool_,
MergeTreeSelectAlgorithmPtr algorithm_,
Expand Down
28 changes: 23 additions & 5 deletions src/Storages/MergeTree/MergeTreeSelectProcessor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#pragma once

#include <Storages/MergeTree/IMergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Storages/MergeTree/MergeTreeReadTask.h>
#include <Storages/MergeTree/IMergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeSelectAlgorithms.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/RequestResponse.h>

#include <boost/core/noncopyable.hpp>


Expand All @@ -22,12 +25,27 @@ struct ChunkAndProgress
bool is_finished = false;
};

struct ParallelReadingExtension
class ParallelReadingExtension
{
public:
ParallelReadingExtension(
MergeTreeAllRangesCallback all_callback_,
MergeTreeReadTaskCallback callback_,
size_t number_of_current_replica_,
size_t total_nodes_count_);

void sendInitialRequest(CoordinationMode mode, const RangesInDataParts & ranges, size_t mark_segment_size) const;

std::optional<ParallelReadResponse>
sendReadRequest(CoordinationMode mode, size_t min_number_of_marks, const RangesInDataPartsDescription & description) const;

size_t getTotalNodesCount() const { return total_nodes_count; }

private:
MergeTreeAllRangesCallback all_callback;
MergeTreeReadTaskCallback callback;
size_t number_of_current_replica{0};
size_t total_nodes_count{0};
const size_t number_of_current_replica;
const size_t total_nodes_count;
};

/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm
Expand Down

0 comments on commit bd014e1

Please sign in to comment.