Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test to ensure the report_stats hook is actually called internally #18

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion jbpf_tests/functional/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ add_subdirectory(verifier_extensions)
add_subdirectory(ctrl_hooks)
add_subdirectory(helper_functions)
add_subdirectory(array)

add_subdirectory(report_stats)
27 changes: 27 additions & 0 deletions jbpf_tests/functional/report_stats/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
## report stats hook tests
set(REPORT_STATS_TESTS ${TESTS_FUNCTIONAL}/report_stats)
file(GLOB REPORT_STATS_TESTS_SOURCES ${REPORT_STATS_TESTS}/*.c)
set(JBPF_TESTS ${JBPF_TESTS} PARENT_SCOPE)
# Loop through each test file and create an executable
foreach(TEST_FILE ${REPORT_STATS_TESTS_SOURCES})
# Get the filename without the path
get_filename_component(TEST_NAME ${TEST_FILE} NAME_WE)

# Create an executable target for the test
add_executable(${TEST_NAME} ${TEST_FILE} ${TESTS_COMMON}/jbpf_test_lib.c ${JBPF_HASHMAP_MGMT_SOURCES})

# Link the necessary libraries
target_link_libraries(${TEST_NAME} PUBLIC jbpf::core_lib jbpf::logger_lib jbpf::mem_mgmt_lib)

# Set the include directories
target_include_directories(${TEST_NAME} PUBLIC ${JBPF_LIB_HEADER_FILES} ${TEST_HEADER_FILES} ${JBPF_HASHMAP_MGMT_HEADER_FILES})

# Add the test to the list of tests to be executed
add_test(NAME REPORT_STATS_TESTS/${TEST_NAME} COMMAND ${TEST_NAME})

# Test coverage
list(APPEND JBPF_TESTS REPORT_STATS_TESTS/${TEST_NAME})
add_clang_format_check(${TEST_NAME} ${TEST_FILE})
set(JBPF_TESTS ${JBPF_TESTS} PARENT_SCOPE)
endforeach()
252 changes: 252 additions & 0 deletions jbpf_tests/functional/report_stats/report_stats_hook_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* This test does the following:
* 1. It creates a process that uses the LCM IPC API to load a single codelet (C1) to a jbpf agent.
doctorlai-msrc marked this conversation as resolved.
Show resolved Hide resolved
* 2. The codelet has a single hook (report_stats) that is called by the agent internally at the interval of
* MAINTENANCE_CHECK_INTERVAL (see jbpf_perf.c).
* 3. Then we check if the codelet (attached to report_stats hook) has been actually called by the agent. If it has been
* called, we should get the output from the codelet.
* 4. It uses the LCM IPC API to unload the codelet.
*/

#include <assert.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <signal.h>

#include "jbpf.h"
#include "jbpf_agent_common.h"
#include "jbpf_utils.h"

// Contains the struct and hook definitions
#include "jbpf_test_def.h"

pid_t cpid = -1;

sem_t sem;

#define LCM_IPC_SEM_NAME "/jbpf_e2e_lcm_report_stats_ipc_standalone_sem"
#define LCM_IPC_AGENT_SEM_NAME "/jbpf_e2e_report_stats_lcm_ipc_agent_sem"

sem_t *lcm_ipc_sem, *lcm_ipc_agent_sem;

jbpf_io_stream_id_t stream_id_c1 = {
.id = {0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF}};

void
handle_sigchld(int sig)
{
return;
}

void
handle_sigterm(int sig)
{
if (cpid > 0) {
kill(cpid, SIGKILL);
}
exit(EXIT_FAILURE);
}

static void
io_channel_check_output(jbpf_io_stream_id_t* stream_id, void** bufs, int num_bufs, void* ctx)
{
int count = 0;
for (int i = 0; i < num_bufs; i++) {
if (memcmp(stream_id, &stream_id_c1, sizeof(stream_id_c1)) == 0) {
// Output from C1. Check that the counter has the expected value
count++;
} else {
// Unexpected output. Fail the test
assert(false);
}
}

// This means the report_stats hook has been called
if (count > 0) {
sem_post(&sem);
} else {
// we don't get any output from the codelet, meaning the report_stats hook has not been called
assert(false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this test fail due to timing issues? What if the output thread is called right before the maintenance thread is called? Wouldn't io_channel_check_output() fail in this case although everything might be working fine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use sem_post(lcm_ipc_sem) to make sure the codelet is loaded only after maintenance thread is created. By default, a dummy output_handler is registered, before io_channel_check_output is registered.

I've ran the test 500 times, all passing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how can we guarantee that the maintenance thread is called? What if it is delayed for a long time (e.g., 1s), because it runs in a very resource constrained environment? Can we somehow make this test deterministic and avoid relying on the timer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. As the test use the semphore to ensure the maintenance thread is called see line 47 and 109.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore of lines 47 and 109 ensures that the test will only terminate when the data is received. But what if the maintenance thread is delayed for a long time as I described above? Doesn't that mean that when the io_channel_check_output() function is called, execution might go to line 50? This would lead to a test failure, not because there is any issue, but just because the maintenance thread was late to be called. Or am I missing something?

Copy link
Collaborator Author

@doctorlai-msrc doctorlai-msrc Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that won't happen, as shown here and here the maintenance thread will be ready when _jbpf_handle_out_bufs is called. Correct me if I am wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread will be ready, but there is no guarantee that it will be called before _jbpf_handle_out_bufs(). For example if both threads are running on the same CPU core and there is high contention, it could be that maintenance thread runs later than the io thread. As such, there could be scenarios where this test could fail simply due to high contention.

I would suggest to revisit this test, to somehow not rely on the synchronization order of the two threads.

}
}

int
run_jbpf_agent(void)
{

struct jbpf_config config = {0};

jbpf_set_default_config_options(&config);
sem_init(&sem, 0, 0);

assert(jbpf_init(&config) == 0);

// The thread will be calling hooks, so we need to register it
jbpf_register_thread();

// Register a callback to handle the buffers sent from the codelets
jbpf_register_io_output_cb(io_channel_check_output);

// Notify the LCM tool that we can now load the codeletset
sem_post(lcm_ipc_sem);

// Wait until the codeletset is loaded
sem_wait(lcm_ipc_agent_sem);

// Wait for all the output checks to finish
sem_wait(&sem);

// Test is done. Notify the LCM IPC tool that it can proceed with unloading the codelet
sem_post(lcm_ipc_sem);

// Wait for the LCM IPC tools to finish
sem_wait(lcm_ipc_agent_sem);

// Stop
jbpf_stop();

sem_destroy(&sem);

return 0;
}

int
run_lcm_ipc_loader(void)
{
struct jbpf_codeletset_load_req codeletset_req_c1 = {0};
struct jbpf_codeletset_unload_req codeletset_unload_req_c1 = {0};
jbpf_lcm_ipc_address_t address = {0};

const char* jbpf_path = getenv("JBPF_PATH");

snprintf(
address.path,
sizeof(address.path) - 1,
"%s/%s/%s",
JBPF_DEFAULT_RUN_PATH,
JBPF_DEFAULT_NAMESPACE,
JBPF_DEFAULT_LCM_SOCKET);

// Wait until the agent is initialized and is expecting incoming connections
sem_wait(lcm_ipc_sem);

// Make a request to load codeletset C1 in hook "test1"

// The name of the codeletset
strcpy(codeletset_req_c1.codeletset_id.name, "simple_output_codeletset");

// We have only one codelet in this codeletset
codeletset_req_c1.num_codelet_descriptors = 1;

// The codelet has just one output channel and no shared maps
codeletset_req_c1.codelet_descriptor[0].num_in_io_channel = 0;
codeletset_req_c1.codelet_descriptor[0].num_out_io_channel = 1;

// The name of the output map that corresponds to the output channel of the codelet
strcpy(codeletset_req_c1.codelet_descriptor[0].out_io_channel[0].name, "output_map");
// Link the map to a stream id
memcpy(&codeletset_req_c1.codelet_descriptor[0].out_io_channel[0].stream_id, &stream_id_c1, JBPF_STREAM_ID_LEN);
// The output channel of the codelet does not have a serializer
codeletset_req_c1.codelet_descriptor[0].out_io_channel[0].has_serde = false;
codeletset_req_c1.codelet_descriptor[0].num_linked_maps = 0;

assert(jbpf_path != NULL);
snprintf(
codeletset_req_c1.codelet_descriptor[0].codelet_path,
JBPF_PATH_LEN,
"%s/jbpf_tests/test_files/codelets/simple_output/simple_output.o",
jbpf_path);
strcpy(codeletset_req_c1.codelet_descriptor[0].codelet_name, "simple_output");
strcpy(codeletset_req_c1.codelet_descriptor[0].hook_name, "report_stats");

// Make a request to load the codeletset
assert(jbpf_lcm_ipc_send_codeletset_load_req(&address, &codeletset_req_c1) == JBPF_LCM_IPC_REQ_SUCCESS);

// Loading was done, so the agent test can move on
sem_post(lcm_ipc_agent_sem);

// Wait for the test to finish
sem_wait(lcm_ipc_sem);

// Test is done. Let's unload the codeletset
strcpy(codeletset_unload_req_c1.codeletset_id.name, "simple_output_codeletset");

// Make a request to unload an existing codeletset
assert(jbpf_lcm_ipc_send_codeletset_unload_req(&address, &codeletset_unload_req_c1) == JBPF_LCM_IPC_REQ_SUCCESS);

// Tests are done. Let the agent know that it can terminate
sem_post(lcm_ipc_agent_sem);
return 0;
}

int
main(int argc, char** argv)
{

pid_t child_pid;
int secondary_status;
int res;

JBPF_UNUSED(res);

// Register some signals to kkill the test, if it fails
struct sigaction sa;
sa.sa_handler = &handle_sigchld;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, 0) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}

struct sigaction sa_child;
sa_child.sa_handler = &handle_sigterm;
sigemptyset(&sa_child.sa_mask);
sa_child.sa_flags = 0;
if (sigaction(SIGTERM, &sa_child, 0) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}
if (sigaction(SIGINT, &sa_child, 0) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}
if (sigaction(SIGABRT, &sa_child, 0) == -1) {
perror("sigaction");
exit(EXIT_FAILURE);
}

// Remove the semaphore if the test did not finish gracefully
sem_unlink(LCM_IPC_SEM_NAME);
sem_unlink(LCM_IPC_AGENT_SEM_NAME);

lcm_ipc_sem = sem_open(LCM_IPC_SEM_NAME, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
if (lcm_ipc_sem == SEM_FAILED) {
exit(EXIT_FAILURE);
}

lcm_ipc_agent_sem = sem_open(LCM_IPC_AGENT_SEM_NAME, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
if (lcm_ipc_agent_sem == SEM_FAILED) {
exit(EXIT_FAILURE);
}

child_pid = fork();
assert(child_pid >= 0);

if (child_pid == 0) {
assert(run_lcm_ipc_loader() == 0);
} else {
cpid = child_pid; // TODO
assert(run_jbpf_agent() == 0);
res = wait(&secondary_status);
assert(res != -1);
assert(secondary_status == 0);

printf("Test is now complete...\n");
}

sem_unlink(LCM_IPC_SEM_NAME);
sem_unlink(LCM_IPC_AGENT_SEM_NAME);
return 0;
}