Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

propose dev reload (similar to Quarkus) #516

Merged
merged 22 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion ansible_rulebook/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def qsize(self):

# FIXME(cutwater): Replace parsed_args with clear interface
async def run(parsed_args: argparse.ArgumentParser) -> None:
file_monitor = None

if parsed_args.worker and parsed_args.websocket_address and parsed_args.id:
logger.info("Starting worker mode")
Expand All @@ -79,6 +80,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
startup_args.rulesets = load_rulebook(
parsed_args, startup_args.variables
)
if parsed_args.hot_reload is True and os.path.exists(
parsed_args.rulebook
):
logger.info(
"Hot-reload was requested, "
+ "will monitor for rulebook file changes"
)
file_monitor = parsed_args.rulebook
if parsed_args.inventory:
startup_args.inventory = load_inventory(parsed_args.inventory)
startup_args.project_data_file = parsed_args.project_tarball
Expand Down Expand Up @@ -115,13 +124,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
)
)

await run_rulesets(
should_reload = await run_rulesets(
event_log,
ruleset_queues,
startup_args.variables,
startup_args.inventory,
parsed_args,
startup_args.project_data_file,
file_monitor,
)

logger.info("Cancelling event source tasks")
Expand All @@ -141,6 +151,9 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
await event_log.put(dict(type="Exit"))
if error_found:
raise Exception("One of the source plugins failed")
elif should_reload is True:
logger.info("Hot-reload, now restarting")
await run(parsed_args)


# TODO(cutwater): Maybe move to util.py
Expand Down
6 changes: 6 additions & 0 deletions ansible_rulebook/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ def get_parser() -> argparse.ArgumentParser:
"Default is sequential, actions will be run only after the "
"previous one ends",
)
parser.add_argument(
"--hot-reload",
help="Should perform hot-reload on rulebook file changes",
default="false",
action="store_true",
)
return parser


Expand Down
52 changes: 50 additions & 2 deletions ansible_rulebook/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from drools.dispatch import establish_async_channel, handle_async_messages
from drools.ruleset import session_stats
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

import ansible_rulebook.rule_generator as rule_generator
from ansible_rulebook.collection import (
Expand All @@ -47,6 +49,7 @@
)

from .exception import (
HotReloadException,
SourceFilterNotFoundException,
SourcePluginMainMissingException,
SourcePluginNotAsyncioCompatibleException,
Expand Down Expand Up @@ -217,14 +220,45 @@ async def start_source(
)


class RulebookFileChangeHandler(FileSystemEventHandler):
modified = False

def on_modified(self, event):
logger.debug(f"Rulebook file {event.src_path} has been modified")
self.modified = True

def is_modified(self):
return self.modified


async def monitor_rulebook(rulebook_file):
event_handler = RulebookFileChangeHandler()
to_observe = os.path.abspath(rulebook_file)
observer = Observer()
observer.schedule(event_handler, to_observe, recursive=True)
observer.start()
try:
while not event_handler.is_modified():
await asyncio.sleep(1)
finally:
observer.stop()
observer.join()
raise HotReloadException(
"Rulebook file changed, "
+ "raising exception so to asyncio.FIRST_EXCEPTION "
+ "in order to reload"
)


async def run_rulesets(
event_log: asyncio.Queue,
ruleset_queues: List[RuleSetQueue],
variables: Dict,
inventory: str = "",
parsed_args: argparse.ArgumentParser = None,
project_data_file: Optional[str] = None,
):
file_monitor: str = None,
) -> bool:
logger.info("run_ruleset")
rulesets_queue_plans = rule_generator.generate_rulesets(
ruleset_queues, variables, inventory
Expand Down Expand Up @@ -272,6 +306,11 @@ async def run_rulesets(
)
ruleset_tasks.append(ruleset_task)

monitor_task = None
if file_monitor:
monitor_task = asyncio.create_task(monitor_rulebook(file_monitor))
ruleset_tasks.append(monitor_task)

logger.info("Waiting for all ruleset tasks to end")
await asyncio.wait(ruleset_tasks, return_when=asyncio.FIRST_EXCEPTION)
async_task.cancel()
Expand All @@ -281,12 +320,21 @@ async def run_rulesets(
logger.info("Cancelling " + task.get_name())
task.cancel()

should_reload = False
if monitor_task and isinstance(
monitor_task.exception(), HotReloadException
):
logger.debug("Hot-reload, setting should_reload")
should_reload = True

logger.info("Waiting on gather")
asyncio.gather(*ruleset_tasks)
asyncio.gather(*ruleset_tasks, return_exceptions=True)
logger.info("Returning from run_rulesets")
if send_heartbeat_task:
send_heartbeat_task.cancel()

return should_reload


def meta_info_filter(source: EventSource) -> EventSourceFilter:
source_filter_name = "eda.builtin.insert_meta_info"
Expand Down
5 changes: 5 additions & 0 deletions ansible_rulebook/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,8 @@ class WebSocketExchangeException(Exception):
class UnsupportedActionException(Exception):

pass


class HotReloadException(Exception):

pass