Skip to content

Commit

Permalink
planner: add support for compact and spot policies (#85)
Browse files Browse the repository at this point in the history
* planner: add support for compaction policy

* planner: add support for spot policy
  • Loading branch information
csegarragonz authored May 6, 2024
1 parent 2f39e04 commit 269933e
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 46 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ running [Faasm](https://github.com/faasm/faasm) cluster.
To install `faasmctl` you need a working `pip` (virtual-)environment. Then:

```bash
pip install faasmctl==0.42.0
pip install faasmctl==0.43.0
```

## Usage
Expand Down
12 changes: 6 additions & 6 deletions faasmctl/tasks/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ def dist_tests(ctx, mount_source=None, ini_file=None):
- mount_source (str): path to the Faasm's source code checkout
- ini_file (str): optional path to a running cluster
"""
if not mount_source:
raise RuntimeError(
"When deploying a dist-tests cluster, you need to"
" specify the --mount-source"
)

# If the user provided a path to the ini_file, it means that we are
# deploying the dist-tests on top of an existing cluster. Otherwise start
# a new compose clustter
if ini_file is None:
if not mount_source:
raise RuntimeError(
"When deploying a dist-tests cluster, you need to"
" specify the --mount-source"
)

ini_file = compose(ctx, workers=0, mount_source=mount_source, clean=False)

# Second, start the dist-test-server
Expand Down
2 changes: 0 additions & 2 deletions faasmctl/tasks/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ def get_k8s_logs(s, follow, ini_file, last_restart):

k8s_cmd = [
"logs",
# TODO: there seems to be a divergence between our time and AKSs
# "--since-time={}".format(last_restart),
"-f" if follow else "",
service_to_k8s_str[service],
"--tail=-1",
Expand Down
96 changes: 75 additions & 21 deletions faasmctl/tasks/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def stop_container():
)


def get_apps_to_be_migrated(registered_workers, in_flight_apps, worker_occupation):
def get_apps_to_be_migrated(
planner_policy, registered_workers, in_flight_apps, worker_occupation
):
"""
Helper method that, given the current worker occupation, works out all the
apps that could be migrated if they checked for a migration opportunity
Expand All @@ -56,12 +58,17 @@ def get_apps_to_be_migrated(registered_workers, in_flight_apps, worker_occupatio
worker_occupation_file_path = "/tmp/worker_occupation{}.csv".format(file_suffix)
with open(worker_occupation_file_path, "w") as fh:
fh.write("WorkerIp,Slots\n")
for ip in worker_occupation:
total_slots = [w for w in registered_workers.hosts if w.ip == ip][0].slots
for i in range(len(worker_occupation[ip]), total_slots):
worker_occupation[ip].append("-1")

fh.write("{},{}\n".format(ip, ",".join(worker_occupation[ip])))
try:
for ip in worker_occupation:
total_slots = [w for w in registered_workers.hosts if w.ip == ip][
0
].slots
for i in range(len(worker_occupation[ip]), total_slots):
worker_occupation[ip].append("-1")

fh.write("{},{}\n".format(ip, ",".join(worker_occupation[ip])))
except IndexError:
return []

# Start container in the background
docker_cmd = [
Expand All @@ -81,8 +88,8 @@ def get_apps_to_be_migrated(registered_workers, in_flight_apps, worker_occupatio
docker_cmd = [
"docker exec",
get_ctr_name(),
"bash -c '/build/faasm/bin/is_app_migratable {} {}'".format(
app.appId, worker_occupation_file_path
"bash -c '/build/faasm/bin/is_app_migratable {} {} {}'".format(
planner_policy, app.appId, worker_occupation_file_path
),
]
docker_cmd = " ".join(docker_cmd)
Expand All @@ -91,7 +98,7 @@ def get_apps_to_be_migrated(registered_workers, in_flight_apps, worker_occupatio
# App can not be migrated
continue
elif out.returncode == 0:
to_be_migrated_apps.append(app.appId)
to_be_migrated_apps.append(app)
else:
# stop_container()
# Survive downstream binary errors, but report a warning
Expand All @@ -111,7 +118,7 @@ def get_apps_to_be_migrated(registered_workers, in_flight_apps, worker_occupatio
orig_num_migrations = -1


def print_planner_resources():
def print_planner_resources(policy):
"""
Helper method to visualise the state of the planner
"""
Expand All @@ -121,13 +128,19 @@ def color_text(color, text="X"):
num1 = str(color)
return f"\033[38;5;{num1}m{text}\033[0;0m"

def print_line(host_msg, worker_occupation):
def print_line(host_msg, worker_occupation, next_evicted_vm_ips=[]):
is_evicted = host_msg.ip in next_evicted_vm_ips

line = "{}\t".format(host_msg.ip)
if is_evicted:
line = "{}\t".format(color_text(1, text=host_msg.ip))

used_slots = host_msg.usedSlots
occupation = (
worker_occupation[host_msg.ip] if host_msg.ip in worker_occupation else []
)
if used_slots != len(occupation):
# TODO: FIXME: this is a symptom of a problem!!
print(
"Expected {} used slots for host {} but got {}!".format(
used_slots,
Expand All @@ -145,11 +158,25 @@ def print_line(host_msg, worker_occupation):
line += " [ ]"
print(line)

def get_app_color(app, policy):
user_id = 0
try:
user_id = app.subType
except AttributeError:
pass

# We only care about user ids if we are using the COMPACT (multi-tenant)
# policy
if policy != "compact":
return app.appId % 256

return (user_id * 10) % 256

def print_apps_legend(in_flight_apps):
num_apps_per_line = NUM_APPS_PER_LINE
line = ""
for i, app in enumerate(in_flight_apps.apps):
app_color = app.appId % 256
app_color = get_app_color(app, policy)
app_text = color_text(app_color, "App ID: {}".format(app.appId))
if i == 0:
line = app_text
Expand All @@ -163,9 +190,24 @@ def print_apps_legend(in_flight_apps):
def print_migration_opportunities(apps_to_be_migrated):
num_apps_per_line = NUM_APPS_PER_LINE
line = ""
for i, app_id in enumerate(apps_to_be_migrated):
app_color = app_id % 256
app_text = color_text(app_color, "App ID: {}".format(app_id))
for i, app in enumerate(apps_to_be_migrated):
app_color = get_app_color(app, policy)
app_text = color_text(app_color, "App ID: {}".format(app.appId))
if i == 0:
line = app_text
elif i % num_apps_per_line == 0:
print(line)
line = app_text
else:
line += "\t{}".format(app_text)
print(line)

def print_frozen_apps(frozen_apps):
num_apps_per_line = NUM_APPS_PER_LINE
line = ""
for i, app in enumerate(frozen_apps):
app_color = get_app_color(app, policy)
app_text = color_text(app_color, "App ID: {}".format(app.appId))
if i == 0:
line = app_text
elif i % num_apps_per_line == 0:
Expand All @@ -178,6 +220,7 @@ def print_migration_opportunities(apps_to_be_migrated):
header = "============== PLANNER RESOURCES ==============="
divide = "------------------------------------------------"
div_mg = "*********** MIGRATION OPPORTUNITIES ************"
div_fa = "***************** FROZEN APPS ******************"
div_al = "************* APP ID COLOR LEGEND **************"
footer = "================================================"

Expand All @@ -192,7 +235,7 @@ def print_migration_opportunities(apps_to_be_migrated):
worker_occupation = {}
worker_occupation_ids = {}
for app in in_flight_apps.apps:
app_color = app.appId % 256
app_color = get_app_color(app, policy)
for ip in app.hostIps:
if ip not in worker_occupation:
worker_occupation[ip] = []
Expand All @@ -204,10 +247,14 @@ def print_migration_opportunities(apps_to_be_migrated):
if orig_num_migrations < 0:
orig_num_migrations = in_flight_apps.numMigrations

# Work-out the forzen apps
next_evicted_vm_ips = in_flight_apps.nextEvictedVmIps
frozen_apps = [app for app in in_flight_apps.frozenApps]

# Work out the existing migration opportunities
registered_workers = get_available_hosts()
apps_to_be_migrated = get_apps_to_be_migrated(
registered_workers, in_flight_apps, worker_occupation_ids
policy, registered_workers, in_flight_apps, worker_occupation_ids
)

# -------------
Expand All @@ -217,7 +264,7 @@ def print_migration_opportunities(apps_to_be_migrated):
print(header)
# Print registered worker occupation
for worker in registered_workers.hosts:
print_line(worker, worker_occupation)
print_line(worker, worker_occupation, next_evicted_vm_ips)

# Print migration opportunities (if any)
if len(apps_to_be_migrated) > 0:
Expand All @@ -226,6 +273,13 @@ def print_migration_opportunities(apps_to_be_migrated):
print(divide)
print_migration_opportunities(apps_to_be_migrated)

# Print frozen apps (if any)
if len(frozen_apps) > 0:
print(divide)
print(div_fa)
print(divide)
print_frozen_apps(frozen_apps)

# Print app-to-color legend (if any)
if len(in_flight_apps.apps) > 0:
print(divide)
Expand Down Expand Up @@ -253,11 +307,11 @@ def signal_handler(sig, frame):


@task
def planner(ctx, poll_period_sec=2):
def planner(ctx, policy="bin-pack", poll_period_sec=2):
"""
Monitor the in-flight apps and host occupation in the planner
"""
signal(SIGINT, signal_handler)
while True:
print_planner_resources()
print_planner_resources(policy)
sleep(poll_period_sec)
18 changes: 15 additions & 3 deletions faasmctl/tasks/restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@
BACKEND_INI_STRING,
get_faasm_ini_file,
get_faasm_ini_value,
update_faasm_ini_vaule,
update_faasm_ini_value,
)
from faasmctl.util.restart import replica as do_restart_replica
from faasmctl.util.time import get_time_rfc3339
from invoke import task


@task
def replica(ctx, name, ini_file=None):
"""
Restart an individual replica by name
The meaning of name here will depend on wether we are using a compose
or a k8s backend.
"""
do_restart_replica(name)


@task(default=True, iterable=["s"])
def restart(ctx, s, ini_file=None):
"""
Restart a running service in the cluster
Parameters:
- s (str, repeateble): service to get the logs from
- s (str, repeateble): service to restart
- ini_file (str): path to the cluster's INI file
"""
if not ini_file:
Expand All @@ -30,5 +42,5 @@ def restart(ctx, s, ini_file=None):
)

# Update the last restart value
update_faasm_ini_vaule(ini_file, "Faasm", "last_restart", get_time_rfc3339())
update_faasm_ini_value(ini_file, "Faasm", "last_restart", get_time_rfc3339())
run_compose_cmd(ini_file, "restart {}".format(" ".join(s)))
6 changes: 3 additions & 3 deletions faasmctl/tasks/scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
BACKEND_INI_STRING,
get_faasm_ini_file,
get_faasm_ini_value,
update_faasm_ini_vaule,
update_faasm_ini_value,
)
from invoke import task

Expand Down Expand Up @@ -43,5 +43,5 @@ def scale(ctx, service, replicas, ini_file=None):
worker_ips = "{}".format(
",".join(get_container_ips_from_compose(faasm_checkout, cluster_name))
)
update_faasm_ini_vaule(ini_file, "Faasm", "worker_names", worker_names)
update_faasm_ini_vaule(ini_file, "Faasm", "worker_ips", worker_ips)
update_faasm_ini_value(ini_file, "Faasm", "worker_names", worker_names)
update_faasm_ini_value(ini_file, "Faasm", "worker_ips", worker_ips)
31 changes: 30 additions & 1 deletion faasmctl/util/compose.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from faasmctl.util.config import get_faasm_ini_value
from faasmctl.util.config import get_faasm_ini_value, update_faasm_ini_value
from faasmctl.util.deploy import generate_ini_file
from faasmctl.util.docker import get_docker_tag
from faasmctl.util.faasm import FAASM_DOCKER_REGISTRY
Expand Down Expand Up @@ -368,3 +368,32 @@ def get_container_ips_from_compose(faasm_checkout, cluster_name):
)
container_ips.append(c_ip)
return container_ips


def restart_ctr_by_name(ini_file, ctr_names):
all_ctr_names = get_faasm_ini_value(ini_file, "Faasm", "worker_names").split(",")

if not all([ctr_name in all_ctr_names for ctr_name in ctr_names]):
print(
"Requested to restart a ctr list "
"({}) not a subset of the worker list: {}".format(ctr_names, all_ctr_names)
)
raise RuntimeError("Unrecognised container name!")

docker_cmd = "docker restart {}".format(" ".join(ctr_names))
out = run(docker_cmd, shell=True, capture_output=True)
assert out.returncode == 0, "Error restarting docker container: {}".format(
out.stderr
)

# Update the container names and ips
faasm_checkout = get_faasm_ini_value(ini_file, "Faasm", "working_dir")
cluster_name = get_faasm_ini_value(ini_file, "Faasm", "cluster_name")

# Ge the names and the ips directly from docker as the ones in the INI
# file are now stale after the restart
new_ctr_names = get_container_names_from_compose(faasm_checkout, cluster_name)
new_ctr_ips = get_container_ips_from_compose(faasm_checkout, cluster_name)

update_faasm_ini_value(ini_file, "Faasm", "worker_names", ",".join(new_ctr_names))
update_faasm_ini_value(ini_file, "Faasm", "worker_ips", ",".join(new_ctr_ips))
2 changes: 1 addition & 1 deletion faasmctl/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_faasm_ini_value(ini_file, section, key):
return config[section].get(key, "")


def update_faasm_ini_vaule(ini_file, section, key, new_value):
def update_faasm_ini_value(ini_file, section, key, new_value):
if not exists(ini_file):
raise RuntimeError("Did not find faasm config at: {}".format(ini_file))

Expand Down
2 changes: 1 addition & 1 deletion faasmctl/util/faasm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from os import environ

FAASM_VERSION = "0.17.0"
FAASM_VERSION = "0.26.0"


def get_version():
Expand Down
18 changes: 16 additions & 2 deletions faasmctl/util/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,22 @@ def invoke_and_await(url, json_msg, expected_num_messages):
"""
poll_period = 2

# The first invocation returns an appid to poll for the message
response = post(url, data=json_msg, timeout=None)
# The first invocation returns an appid to poll for the message. If there
# are not enough slots, this will POST will fail. In general, we want to
# tolerate this a number of times (for example, to accomodate for dynamic
# cluster sizes)

num_retries = 10
sleep_period_secs = 0.5

for i in range(num_retries):
response = post(url, data=json_msg, timeout=None)
if response.status_code == 500 and response.text == "No available hosts":
print("No available hosts, retrying... {}/{}".format(i + 1, num_retries))
sleep(sleep_period_secs)
continue
break

if response.status_code != 200:
print(
"POST request failed (code: {}): {}".format(
Expand Down
Loading

0 comments on commit 269933e

Please sign in to comment.