Skip to content

Commit

Permalink
Microsecond granularity for Piccolo (#5492)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyashton authored Aug 1, 2023
1 parent a14a50e commit 3ea079c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
18 changes: 9 additions & 9 deletions tests/infra/basicperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,16 @@ def table():
requestSize=pl.col("request").apply(len),
responseSize=pl.col("rawResponse").apply(len),
)
print(
overall.with_columns(
pl.col("receiveTime").alias("latency")
- pl.col("sendTime")
).sort("latency")
overall = overall.with_columns(
pl.col("receiveTime").alias("latency") - pl.col("sendTime")
)
print(overall.sort("latency"))
agg.append(overall)

table()

agg = pl.concat(agg, rechunk=True)
LOG.info("Aggregate results")
print(agg)
agg_path = os.path.join(
network.common_dir, "aggregated_basicperf_output.parquet"
Expand All @@ -328,9 +327,9 @@ def table():

sent_per_sec = (
agg.with_columns(
((pl.col("sendTime").alias("second") - start_send) / 1000).cast(
pl.Int64
)
(
(pl.col("sendTime").alias("second") - start_send) / 1000000
).cast(pl.Int64)
)
.groupby("second")
.count()
Expand All @@ -339,7 +338,8 @@ def table():
recv_per_sec = (
agg.with_columns(
(
(pl.col("receiveTime").alias("second") - start_send) / 1000
(pl.col("receiveTime").alias("second") - start_send)
/ 1000000
).cast(pl.Int64)
)
.groupby("second")
Expand Down
18 changes: 9 additions & 9 deletions tests/perf-system/submitter/submit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
{
LOG_INFO_FMT("Start storing results");

auto ms_timestamp_type = arrow::timestamp(arrow::TimeUnit::MILLI);
auto us_timestamp_type = arrow::timestamp(arrow::TimeUnit::MICRO);

// Write Send Parquet
{
arrow::StringBuilder message_id_builder;
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));

arrow::TimestampBuilder send_time_builder(
ms_timestamp_type, arrow::default_memory_pool());
us_timestamp_type, arrow::default_memory_pool());
PARQUET_THROW_NOT_OK(
send_time_builder.AppendValues(data_handler.send_time));

auto table = arrow::Table::Make(
arrow::schema(
{arrow::field("messageID", arrow::utf8()),
arrow::field("sendTime", ms_timestamp_type)}),
arrow::field("sendTime", us_timestamp_type)}),
{message_id_builder.Finish().ValueOrDie(),
send_time_builder.Finish().ValueOrDie()});

Expand All @@ -211,7 +211,7 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids));

arrow::TimestampBuilder receive_time_builder(
ms_timestamp_type, arrow::default_memory_pool());
us_timestamp_type, arrow::default_memory_pool());
PARQUET_THROW_NOT_OK(
receive_time_builder.AppendValues(data_handler.response_time));

Expand All @@ -225,7 +225,7 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler)
auto table = arrow::Table::Make(
arrow::schema({
arrow::field("messageID", arrow::utf8()),
arrow::field("receiveTime", ms_timestamp_type),
arrow::field("receiveTime", us_timestamp_type),
arrow::field("rawResponse", arrow::binary()),
}),
{message_id_builder.Finish().ValueOrDie(),
Expand Down Expand Up @@ -276,7 +276,6 @@ int main(int argc, char** argv)

LOG_INFO_FMT("Start Request Submission");


constexpr size_t retry_max = 5;
size_t retry_count = 0;
size_t read_reqs = 0;
Expand Down Expand Up @@ -318,7 +317,8 @@ int main(int argc, char** argv)
}
catch (std::logic_error& e)
{
LOG_FAIL_FMT("Sending interrupted: {}, attempting reconnection", e.what());
LOG_FAIL_FMT(
"Sending interrupted: {}, attempting reconnection", e.what());
connection = create_connection(certificates, server_address);
connection->set_tcp_nodelay(true);
retry_count++;
Expand All @@ -330,8 +330,8 @@ int main(int argc, char** argv)
for (size_t req = 0; req < requests_size; req++)
{
data_handler.raw_response.push_back(resp_text[req]);
size_t send_time = start[req].tv_sec * 1000 + start[req].tv_usec / 1000;
size_t response_time = end[req].tv_sec * 1000 + end[req].tv_usec / 1000;
size_t send_time = start[req].tv_sec * 1'000'000 + start[req].tv_usec;
size_t response_time = end[req].tv_sec * 1'000'000 + end[req].tv_usec;
data_handler.send_time.push_back(send_time);
data_handler.response_time.push_back(response_time);
}
Expand Down

0 comments on commit 3ea079c

Please sign in to comment.