Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

propose dev reload (similar to Quarkus) #516

Merged
merged 22 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion ansible_rulebook/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def qsize(self):

# FIXME(cutwater): Replace parsed_args with clear interface
async def run(parsed_args: argparse.ArgumentParser) -> None:
file_monitor = None

if parsed_args.worker and parsed_args.websocket_address and parsed_args.id:
logger.info("Starting worker mode")
Expand All @@ -81,6 +82,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
startup_args.rulesets = load_rulebook(
parsed_args, startup_args.variables
)
if parsed_args.hot_reload is True and os.path.exists(
parsed_args.rulebook
):
logger.critical(
"HOT-RELOAD: Hot-reload was requested, "
+ "will monitor for rulebook file changes"
)
file_monitor = parsed_args.rulebook
if parsed_args.inventory:
startup_args.inventory = parsed_args.inventory
startup_args.project_data_file = parsed_args.project_tarball
Expand Down Expand Up @@ -119,13 +128,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
)
tasks.append(feedback_task)

await run_rulesets(
should_reload = await run_rulesets(
event_log,
ruleset_queues,
startup_args.variables,
startup_args.inventory,
parsed_args,
startup_args.project_data_file,
file_monitor,
)

await event_log.put(dict(type="Exit"))
Expand All @@ -151,6 +161,9 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
await job_template_runner.close_session()
if error_found:
raise Exception("One of the source plugins failed")
elif should_reload is True:
logger.critical("HOT-RELOAD! rules file changed, now restarting")
await run(parsed_args)


# TODO(cutwater): Maybe move to util.py
Expand Down
8 changes: 8 additions & 0 deletions ansible_rulebook/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ def get_parser() -> argparse.ArgumentParser:
"Default is sequential, actions will be run only after the "
"previous one ends",
)
parser.add_argument(
"--hot-reload",
help="Will perform hot-reload on rulebook file changes "
"(when running in non-worker mode)."
"This option is ignored in worker mode.",
default="false",
action="store_true",
)
return parser


Expand Down
57 changes: 55 additions & 2 deletions ansible_rulebook/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from drools.dispatch import establish_async_channel, handle_async_messages
from drools.ruleset import session_stats
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

import ansible_rulebook.rule_generator as rule_generator
from ansible_rulebook.collection import (
Expand All @@ -47,6 +49,7 @@
)

from .exception import (
HotReloadException,
SourceFilterNotFoundException,
SourcePluginMainMissingException,
SourcePluginNotAsyncioCompatibleException,
Expand Down Expand Up @@ -217,14 +220,50 @@ async def start_source(
)


class RulebookFileChangeHandler(FileSystemEventHandler):
modified = False

def on_modified(self, event):
logger.debug(f"Rulebook file {event.src_path} has been modified")
self.modified = True

def is_modified(self):
return self.modified


async def monitor_rulebook(rulebook_file):
event_handler = RulebookFileChangeHandler()
to_observe = os.path.abspath(rulebook_file)
observer = Observer()
observer.schedule(event_handler, to_observe, recursive=True)
observer.start()
try:
while not event_handler.is_modified():
await asyncio.sleep(1)
finally:
observer.stop()
observer.join()
# we need to check if the try-clause completed because
# while-loop terminated successfully, in such case we
# follow on the hot-reload use case, or if we got into
# this finally-clause because of other errors.
if event_handler.is_modified():
raise HotReloadException(
"Rulebook file changed, "
+ "raising exception so to asyncio.FIRST_EXCEPTION "
+ "in order to reload"
)


async def run_rulesets(
event_log: asyncio.Queue,
ruleset_queues: List[RuleSetQueue],
variables: Dict,
inventory: str = "",
parsed_args: argparse.ArgumentParser = None,
project_data_file: Optional[str] = None,
):
file_monitor: str = None,
) -> bool:
logger.info("run_ruleset")
rulesets_queue_plans = rule_generator.generate_rulesets(
ruleset_queues, variables, inventory
Expand Down Expand Up @@ -275,6 +314,11 @@ async def run_rulesets(
)
ruleset_tasks.append(ruleset_task)

monitor_task = None
if file_monitor:
monitor_task = asyncio.create_task(monitor_rulebook(file_monitor))
ruleset_tasks.append(monitor_task)

logger.info("Waiting for all ruleset tasks to end")
await asyncio.wait(ruleset_tasks, return_when=asyncio.FIRST_EXCEPTION)
async_task.cancel()
Expand All @@ -284,12 +328,21 @@ async def run_rulesets(
logger.info("Cancelling " + task.get_name())
task.cancel()

should_reload = False
if monitor_task and isinstance(
monitor_task.exception(), HotReloadException
):
logger.debug("Hot-reload, setting should_reload")
should_reload = True

logger.info("Waiting on gather")
asyncio.gather(*ruleset_tasks)
asyncio.gather(*ruleset_tasks, return_exceptions=True)
logger.info("Returning from run_rulesets")
if send_heartbeat_task:
send_heartbeat_task.cancel()

return should_reload


def meta_info_filter(source: EventSource) -> EventSourceFilter:
source_filter_name = "eda.builtin.insert_meta_info"
Expand Down
5 changes: 5 additions & 0 deletions ansible_rulebook/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class UnsupportedActionException(Exception):
pass


class HotReloadException(Exception):

pass


class InventoryNotFound(Exception):

pass
Expand Down
18 changes: 18 additions & 0 deletions tests/e2e/files/rulebooks/test_hot_reload.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
- name: Ruleset 1
hosts: all
sources:
- generic:
payload:
- action: "value_a"
rules:
- name: Matching for value_a
condition: event.action == "value_a"
action:
debug:
msg: "Rule 1: I matched for value_a"
- name: Matching for value_b
condition: event.action == "value_b"
action:
debug:
msg: "Rule 2: I have now matched for value_b"
56 changes: 56 additions & 0 deletions tests/e2e/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,59 @@ def test_terminate_process_sigint():
process.kill()

assert process.returncode == 130


@pytest.mark.e2e
def test_hot_reload():
"""
Execute a rulebook with hot-reload option,
check for first action being triggered,
then modify the content of the rulebook,
check for the other action being triggered.
"""

rulebook = utils.BASE_DATA_PATH / "rulebooks/test_hot_reload.yml"
cmd = utils.Command(rulebook=rulebook, hot_reload=True)

LOGGER.info(f"Running command: {cmd}")

process = subprocess.Popen(
cmd,
cwd=utils.BASE_DATA_PATH,
text=True,
stdout=subprocess.PIPE,
)

with open(rulebook, "rt") as file:
original_data = file.read()
found_rule_1_in_out = False
found_rule_2_in_out = False

start = time.time()
while line := process.stdout.readline():
if "Rule 1: I matched for value_a" in line:
found_rule_1_in_out = True
break
time.sleep(0.1)
if time.time() - start > DEFAULT_CMD_TIMEOUT:
process.kill()

assert found_rule_1_in_out

data = original_data.replace('- action: "value_a"', '- action: "value_b"')
with open(rulebook, "wt") as file:
file.write(data)

start = time.time()
while line := process.stdout.readline():
if "Rule 2: I have now matched for value_b" in line:
found_rule_2_in_out = True
break
time.sleep(0.1)
if time.time() - start > DEFAULT_CMD_TIMEOUT:
process.kill()

with open(rulebook, "wt") as file:
file.write(original_data)

assert found_rule_2_in_out
3 changes: 3 additions & 0 deletions tests/e2e/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Command:
verbosity: int = 0
heartbeat: int = 0
execution_strategy: Optional[str] = None
hot_reload: bool = False

def __post_init__(self):
# verbosity overrides verbose and debug
Expand Down Expand Up @@ -83,6 +84,8 @@ def to_list(self) -> List:
result.extend(["--heartbeat", str(self.heartbeat)])
if self.execution_strategy:
result.extend(["--execution-strategy", self.execution_strategy])
if self.hot_reload:
result.append("--hot-reload")

return result

Expand Down