From 1d7678085aee87ff60b5d95a78191ce4e213c419 Mon Sep 17 00:00:00 2001 From: yu Date: Fri, 30 Aug 2024 00:04:50 +0200 Subject: [PATCH] fixed problems --- emulation-system/tests/test_create_topics.py | 499 ++++++++++++++++--- 1 file changed, 433 insertions(+), 66 deletions(-) diff --git a/emulation-system/tests/test_create_topics.py b/emulation-system/tests/test_create_topics.py index 2064f95e9..478a431de 100644 --- a/emulation-system/tests/test_create_topics.py +++ b/emulation-system/tests/test_create_topics.py @@ -2,17 +2,22 @@ import docker import logging import grpc -from unittest.mock import MagicMock from docker.types import IPAMConfig, IPAMPool import time from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig -from csle_common.controllers.kafka_controller import KafkaController import csle_common.constants.constants as constants +import csle_collector.constants.constants as collector_constants import csle_collector.kafka_manager.kafka_manager_pb2_grpc import csle_collector.kafka_manager.kafka_manager_pb2 import csle_collector.kafka_manager.query_kafka_server from csle_common.metastore.metastore_facade import MetastoreFacade from typing import Generator +from csle_common.dao.emulation_config.container_network import ContainerNetwork +from csle_common.dao.emulation_config.kafka_config import KafkaConfig +from csle_common.dao.emulation_config.node_container_config import NodeContainerConfig +from csle_common.dao.emulation_config.node_resources_config import NodeResourcesConfig +from csle_common.dao.emulation_config.node_firewall_config import NodeFirewallConfig +from csle_common.dao.emulation_config.kafka_topic import KafkaTopic @pytest.fixture(scope="module") @@ -76,32 +81,129 @@ def container_setup(docker_client, network) -> Generator: def test_start_kafka_manager(container_setup) -> None: """ Start kafka_manager in a container - + :param container_setup: container_setup - :return: None """ failed_containers = [] containers_info = [] container_setup.reload() assert container_setup.status == "running" - # Mock emulation_env_config - emulation_env_config = MagicMock(spec=EmulationEnvConfig) - emulation_env_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config = MagicMock() - emulation_env_config.kafka_config.container.docker_gw_bridge_ip = container_setup.attrs[ - constants.DOCKER.NETWORK_SETTINGS - ][constants.DOCKER.IP_ADDRESS_INFO] - emulation_env_config.kafka_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config.kafka_manager_port = 50051 - emulation_env_config.kafka_config.kafka_manager_log_dir = "/var/log/kafka" - emulation_env_config.kafka_config.kafka_manager_log_file = "kafka.log" - emulation_env_config.kafka_config.kafka_manager_max_workers = 4 + + container_network = ContainerNetwork( + name="test_network", + subnet_mask="255.255.255.0", + bitmask="24", + subnet_prefix="15.15.15.0/24", + interface="eth0" + ) + + # Initialize NodeContainerConfig with necessary arguments + ips_and_networks = [ + (container_setup.attrs['NetworkSettings']['IPAddress'], container_network) + ] + + container_config = NodeContainerConfig( + name="kafka_container", + ips_and_networks=ips_and_networks, + version="latest", + level="production", + restart_policy="always", + suffix="kafka", + os="linux", + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + # Initialize NodeResourcesConfig + resources_config = NodeResourcesConfig( + container_name="kafka_container", + num_cpus=2, + available_memory_gb=4, + ips_and_network_configs=ips_and_networks, + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + firewall_config = NodeFirewallConfig( + ips_gw_default_policy_networks=[], + hostname="kafka_container", + output_accept=None, + input_accept=None, + forward_accept=None, + output_drop=None, + input_drop=None, + forward_drop=None, + routes=None, + docker_gw_bridge_ip=None, + physical_host_ip=None + ) + + kafka_topic_1 = KafkaTopic( + name="test-topic-1", + num_partitions=3, + num_replicas=1, + attributes=["column1", "column2", "column3"], + retention_time_hours=72 + ) + + kafka_topic_2 = KafkaTopic( + name="test-topic-2", + num_partitions=2, + num_replicas=1, + attributes=["column1", "column2"], + retention_time_hours=48 + ) + + topics = [kafka_topic_1, kafka_topic_2] + + kafka_config = KafkaConfig( + container=container_config, + resources=resources_config, + firewall_config=firewall_config, + topics=topics, + kafka_manager_log_file='kafka_manager.log', + kafka_manager_log_dir='/var/log/kafka_manager', + kafka_manager_max_workers=4, + kafka_port=9092, + kafka_port_external=9292, + time_step_len_seconds=15, + kafka_manager_port=50051, + version="0.0.1" + ) + + emulation_env_config = EmulationEnvConfig( + name="test_env", + containers_config=None, + users_config=None, + flags_config=None, + vuln_config=None, + topology_config=None, + traffic_config=None, + resources_config=None, + kafka_config=kafka_config, + services_config=None, + descr="Test environment description", + static_attacker_sequences=None, + ovs_config=None, + sdn_controller_config=None, + host_manager_config=None, + snort_ids_manager_config=None, + ossec_ids_manager_config=None, + docker_stats_manager_config=None, + elk_config=None, + beats_config=None, + level=1, + version="1.0", + execution_id=12345, + csle_collector_version=collector_constants.LATEST_VERSION, + csle_ryu_version=collector_constants.LATEST_VERSION + ) ip = emulation_env_config.kafka_config.container.docker_gw_bridge_ip port = emulation_env_config.kafka_config.kafka_manager_port + try: - # Start kafka_manager command cmd = ( f"/root/miniconda3/bin/python3 /kafka_manager.py " f"--port {emulation_env_config.kafka_config.kafka_manager_port} " @@ -109,12 +211,12 @@ def test_start_kafka_manager(container_setup) -> None: f"--logfile {emulation_env_config.kafka_config.kafka_manager_log_file} " f"--maxworkers {emulation_env_config.kafka_config.kafka_manager_max_workers}" ) - # Run cmd in the container logging.info( - f"Starting kafka manager in container: {container_setup.id} " f"with image: {container_setup.image.tags}" + f"Starting kafka manager in container: {container_setup.id} " + f"with image: {container_setup.image.tags}" ) container_setup.exec_run(cmd, detach=True) - # Check if kafka_manager starts + cmd = ( f"sh -c '{constants.COMMANDS.PS_AUX} | {constants.COMMANDS.GREP} " f"{constants.COMMANDS.SPACE_DELIM}{constants.TRAFFIC_COMMANDS.KAFKA_MANAGER_FILE_NAME}'" @@ -127,7 +229,7 @@ def test_start_kafka_manager(container_setup) -> None: output = result.output.decode("utf-8") assert constants.COMMANDS.SEARCH_KAFKA_MANAGER in output, "Kafka manager is not running in the container" time.sleep(5) - # Call grpc + with grpc.insecure_channel(f"{ip}:{port}", options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.kafka_manager.kafka_manager_pb2_grpc.KafkaManagerStub(channel) kafka_dto = csle_collector.kafka_manager.query_kafka_server.get_kafka_status(stub) @@ -149,25 +251,155 @@ def test_start_kafka_manager(container_setup) -> None: assert not failed_containers, f"T{failed_containers} failed" -def test_start_server(container_setup) -> None: +def test_start_kafka_server(container_setup) -> None: """ Start kafka server in a container - + :param container_setup: container_setup :return: None """ - emulation_env_config = MagicMock(spec=EmulationEnvConfig) - emulation_env_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config = MagicMock() - emulation_env_config.kafka_config.container.docker_gw_bridge_ip = container_setup.attrs[ - constants.DOCKER.NETWORK_SETTINGS - ][constants.DOCKER.IP_ADDRESS_INFO] - emulation_env_config.kafka_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config.kafka_manager_port = 50051 - + container_setup.reload() + assert container_setup.status == "running" + + container_network = ContainerNetwork( + name="test_network", + subnet_mask="255.255.255.0", + bitmask="24", + subnet_prefix="15.15.15.0/24", + interface="eth0" + ) + + # Initialize NodeContainerConfig with necessary arguments + ips_and_networks = [ + (container_setup.attrs['NetworkSettings']['IPAddress'], container_network) + ] + + container_config = NodeContainerConfig( + name="kafka_container", + ips_and_networks=ips_and_networks, + version="latest", + level="production", + restart_policy="always", + suffix="kafka", + os="linux", + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + # Initialize NodeResourcesConfig + resources_config = NodeResourcesConfig( + container_name="kafka_container", + num_cpus=2, + available_memory_gb=4, + ips_and_network_configs=ips_and_networks, + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + firewall_config = NodeFirewallConfig( + ips_gw_default_policy_networks=[], + hostname="kafka_container", + output_accept=None, + input_accept=None, + forward_accept=None, + output_drop=None, + input_drop=None, + forward_drop=None, + routes=None, + docker_gw_bridge_ip=None, + physical_host_ip=None + ) + + kafka_topic_1 = KafkaTopic( + name="test-topic-1", + num_partitions=3, + num_replicas=1, + attributes=["column1", "column2", "column3"], + retention_time_hours=72 + ) + + kafka_topic_2 = KafkaTopic( + name="test-topic-2", + num_partitions=2, + num_replicas=1, + attributes=["column1", "column2"], + retention_time_hours=48 + ) + + topics = [kafka_topic_1, kafka_topic_2] + + kafka_config = KafkaConfig( + container=container_config, + resources=resources_config, + firewall_config=firewall_config, + topics=topics, + kafka_manager_log_file='kafka_manager.log', + kafka_manager_log_dir='/var/log/kafka_manager', + kafka_manager_max_workers=4, + kafka_port=9092, + kafka_port_external=9292, + time_step_len_seconds=15, + kafka_manager_port=50051, + version="0.0.1" + ) + + emulation_env_config = EmulationEnvConfig( + name="test_env", + containers_config=None, + users_config=None, + flags_config=None, + vuln_config=None, + topology_config=None, + traffic_config=None, + resources_config=None, + kafka_config=kafka_config, + services_config=None, + descr="Test environment description", + static_attacker_sequences=None, + ovs_config=None, + sdn_controller_config=None, + host_manager_config=None, + snort_ids_manager_config=None, + ossec_ids_manager_config=None, + docker_stats_manager_config=None, + elk_config=None, + beats_config=None, + level=1, + version="1.0", + execution_id=12345, + csle_collector_version=collector_constants.LATEST_VERSION, + csle_ryu_version=collector_constants.LATEST_VERSION + ) + ip = emulation_env_config.kafka_config.container.docker_gw_bridge_ip port = emulation_env_config.kafka_config.kafka_manager_port logger = logging.getLogger("test_logger") + try: + + internal_ip = emulation_env_config.kafka_config.container.get_ips()[0] + kafka_config_file = collector_constants.KAFKA.KAFKA_CONFIG_FILE + + internal_ip_cmd = ( + f"sed -i 's/{collector_constants.KAFKA.INTERNAL_IP_PLACEHOLDER}/" + f"{internal_ip}/g' {kafka_config_file}" + ) + result_internal_ip = container_setup.exec_run(internal_ip_cmd) + if result_internal_ip.exit_code != 0: + raise Exception(f"Failed to configure INTERNAL_IP: {result_internal_ip.output.decode('utf-8')}") + + # Replace EXTERNAL_IP + external_ip_cmd = ( + f"sed -i 's/{collector_constants.KAFKA.EXTERNAL_IP_PLACEHOLDER}/" + f"{ip}/g' {kafka_config_file}" + ) + result_external_ip = container_setup.exec_run(external_ip_cmd) + if result_external_ip.exit_code != 0: + raise Exception(f"Failed to configure EXTERNAL_IP: {result_external_ip.output.decode('utf-8')}") + + except Exception as e: + logger.error(f"Error configuring broker IPs: {e}") + assert False, f"Failed to configure broker IPs: {e}" + try: with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.kafka_manager.kafka_manager_pb2_grpc.KafkaManagerStub(channel) @@ -178,56 +410,191 @@ def test_start_server(container_setup) -> None: except grpc.RpcError as e: logger.error(f"gRPC Error: {e}") assert False, f"gRPC call failed with error: {e}" - + def test_create_topics(container_setup) -> None: """ - Create topics - + Create topics in a container + :param container_setup: container_setup :return: None """ - emulation_env_config = MagicMock(spec=EmulationEnvConfig) - emulation_env_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config = MagicMock() - emulation_env_config.kafka_config.container.docker_gw_bridge_ip = container_setup.attrs[ - constants.DOCKER.NETWORK_SETTINGS - ][constants.DOCKER.IP_ADDRESS_INFO] - emulation_env_config.kafka_config.get_connection.return_value = MagicMock() - emulation_env_config.kafka_config.kafka_manager_port = 50051 - emulation_env_config.kafka_config.topics = MagicMock() - emulation_env_config.kafka_config.topics.name = "topic1" - emulation_env_config.kafka_config.topics.num_partitions = 2 - emulation_env_config.kafka_config.topics.num_replicas = 2 - emulation_env_config.kafka_config.topics.retention_time_hours = 1 + container_setup.reload() + assert container_setup.status == "running" + + container_network = ContainerNetwork( + name="test_network", + subnet_mask="255.255.255.0", + bitmask="24", + subnet_prefix="15.15.15.0/24", + interface="eth0" + ) + + # Initialize NodeContainerConfig with necessary arguments + ips_and_networks = [ + (container_setup.attrs['NetworkSettings']['IPAddress'], container_network) + ] + + container_config = NodeContainerConfig( + name="kafka_container", + ips_and_networks=ips_and_networks, + version="latest", + level="production", + restart_policy="always", + suffix="kafka", + os="linux", + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + # Initialize NodeResourcesConfig + resources_config = NodeResourcesConfig( + container_name="kafka_container", + num_cpus=2, + available_memory_gb=4, + ips_and_network_configs=ips_and_networks, + docker_gw_bridge_ip=container_setup.attrs[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.IP_ADDRESS_INFO], + physical_host_ip="192.168.1.1" + ) + + firewall_config = NodeFirewallConfig( + ips_gw_default_policy_networks=[], + hostname="kafka_container", + output_accept=None, + input_accept=None, + forward_accept=None, + output_drop=None, + input_drop=None, + forward_drop=None, + routes=None, + docker_gw_bridge_ip=None, + physical_host_ip=None + ) + + kafka_topic_1 = KafkaTopic( + name="test-topic-1", + num_partitions=3, + num_replicas=1, + attributes=["column1", "column2", "column3"], + retention_time_hours=72 + ) + + kafka_topic_2 = KafkaTopic( + name="test-topic-2", + num_partitions=2, + num_replicas=1, + attributes=["column1", "column2"], + retention_time_hours=48 + ) + + topics = [kafka_topic_1, kafka_topic_2] + + kafka_config = KafkaConfig( + container=container_config, + resources=resources_config, + firewall_config=firewall_config, + topics=topics, + kafka_manager_log_file='kafka_manager.log', + kafka_manager_log_dir='/var/log/kafka_manager', + kafka_manager_max_workers=4, + kafka_port=9092, + kafka_port_external=9292, + time_step_len_seconds=15, + kafka_manager_port=50051, + version="0.0.1" + ) + + emulation_env_config = EmulationEnvConfig( + name="test_env", + containers_config=None, + users_config=None, + flags_config=None, + vuln_config=None, + topology_config=None, + traffic_config=None, + resources_config=None, + kafka_config=kafka_config, + services_config=None, + descr="Test environment description", + static_attacker_sequences=None, + ovs_config=None, + sdn_controller_config=None, + host_manager_config=None, + snort_ids_manager_config=None, + ossec_ids_manager_config=None, + docker_stats_manager_config=None, + elk_config=None, + beats_config=None, + level=1, + version="1.0", + execution_id=12345, + csle_collector_version=collector_constants.LATEST_VERSION, + csle_ryu_version=collector_constants.LATEST_VERSION + ) + ip = emulation_env_config.kafka_config.container.docker_gw_bridge_ip port = emulation_env_config.kafka_config.kafka_manager_port logger = logging.getLogger("test_logger") + try: + # Run each sed command + internal_ip = emulation_env_config.kafka_config.container.get_ips()[0] + kafka_config_file = collector_constants.KAFKA.KAFKA_CONFIG_FILE + + # Replace INTERNAL_IP + internal_ip_cmd = ( + f"sed -i 's/{collector_constants.KAFKA.INTERNAL_IP_PLACEHOLDER}/" + f"{internal_ip}/g' {kafka_config_file}" + ) + result_internal_ip = container_setup.exec_run(internal_ip_cmd) + if result_internal_ip.exit_code != 0: + raise Exception(f"Failed to configure INTERNAL_IP: {result_internal_ip.output.decode('utf-8')}") + + # Replace EXTERNAL_IP + external_ip_cmd = ( + f"sed -i 's/{collector_constants.KAFKA.EXTERNAL_IP_PLACEHOLDER}/" + f"{ip}/g' {kafka_config_file}" + ) + result_external_ip = container_setup.exec_run(external_ip_cmd) + if result_external_ip.exit_code != 0: + raise Exception(f"Failed to configure EXTERNAL_IP: {result_external_ip.output.decode('utf-8')}") + + except Exception as e: + logger.error(f"Error configuring broker IPs: {e}") + assert False, f"Failed to configure broker IPs: {e}" + try: with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.kafka_manager.kafka_manager_pb2_grpc.KafkaManagerStub(channel) - kafka_dto = KafkaController.get_kafka_status_by_port_and_ip( - ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip, - port=emulation_env_config.kafka_config.kafka_manager_port) - logging.info(f"kafka_dto:{kafka_dto.topics}") + kafka_dto = csle_collector.kafka_manager.query_kafka_server.start_kafka(stub) + logging.info(f"kafka_dto:{kafka_dto}") + assert kafka_dto.running, f"Failed to start kafka on {ip}." + logger.info(f"kafka has been successfully started on {ip}.") + except grpc.RpcError as e: + logger.error(f"gRPC Error: {e}") + assert False, f"gRPC call failed with error: {e}" + + try: + with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: + stub = csle_collector.kafka_manager.kafka_manager_pb2_grpc.KafkaManagerStub(channel) + kafka_dto = csle_collector.kafka_manager.query_kafka_server.start_kafka(stub) + logging.info(f"kafka_dto:{kafka_dto}") assert kafka_dto.running, f"Failed to start kafka on {ip}." logger.info(f"kafka has been successfully started on {ip}.") time.sleep(10) # create topics - topic = emulation_env_config.kafka_config.topics - logger.info(f"Creating topic name: {topic.name}") - response = csle_collector.kafka_manager.query_kafka_server.create_topic( - stub, name=topic.name, partitions=topic.num_partitions, replicas=topic.num_replicas, - retention_time_hours=topic.retention_time_hours - ) - logger.info(f"Create topic response: {response}") - time.sleep(5) - # get kafka_dto again and verify - kafka_dto = KafkaController.get_kafka_status_by_port_and_ip( - ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip, - port=emulation_env_config.kafka_config.kafka_manager_port) - logger.info(f"kafka_dto.topics: {kafka_dto.topics}") - assert kafka_dto.topics + for topic in emulation_env_config.kafka_config.topics: + logger.info(f"Creating topic: {topic.name}") + create_response = csle_collector.kafka_manager.query_kafka_server.create_topic( + stub, + name=topic.name, + partitions=topic.num_partitions, + replicas=topic.num_replicas, + retention_time_hours=topic.retention_time_hours + ) + assert topic.name in create_response.topics, ( + f"Topic creation failed or topic name mismatch: {topic.name}" + ) + logger.info(f"Successfully created topic: {topic.name}") except grpc.RpcError as e: logger.error(f"gRPC Error: {e}") assert False, f"gRPC call failed with error: {e}"