Skip to content

Commit

Permalink
AzureMLSystem: Unique input resources (#1237)
Browse files Browse the repository at this point in the history
## Describe your changes
- When the same AzureML model resource is provided as input to a command
multiple times, it leads to an invalid job (interestingly, this is not
the case for a local resource).
- AzureML system is updated so that a particular resource is provided as
input only once. The resources are now referenced using names of format
`resource__{i}`. AML runners use the `num_resources` and resource maps
to reconstruct the configs.

## Checklist before requesting a review
- [ ] Add unit tests for this change.
- [ ] Make sure all tests can pass.
- [ ] Update documents if necessary.
- [ ] Lint and apply fixes to your code by running `lintrunner -a`
- [ ] Is this a user-facing change? If yes, give a description of this
change to be included in the release notes.
- [ ] Is this PR including examples changes? If yes, please remember to
update [example
documentation](https://github.com/microsoft/Olive/blob/main/docs/source/examples.md)
in a follow-up PR.

## (Optional) Issue link
  • Loading branch information
jambayk authored Jul 16, 2024
1 parent 0d580b0 commit 80e1fa9
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 115 deletions.
6 changes: 3 additions & 3 deletions olive/systems/azureml/aml_evaluation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def main(raw_args=None):
# login to hf if HF_LOGIN is set to True
aml_runner_hf_login()

model_config, pipeline_output, extra_args = get_common_args(raw_args)
metric_config, extra_args = parse_config(extra_args, "metric")
accelerator_config, extra_args = parse_config(extra_args, "accelerator")
pipeline_output, resources, model_config, extra_args = get_common_args(raw_args)
metric_config, extra_args = parse_config(extra_args, "metric", resources)
accelerator_config, extra_args = parse_config(extra_args, "accelerator", resources)

# load metric
metric = Metric.from_json(metric_config)
Expand Down
4 changes: 2 additions & 2 deletions olive/systems/azureml/aml_pass_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def main(raw_args=None):
# login to hf if HF_LOGIN is set to True
aml_runner_hf_login()

input_model_config, pipeline_output, extra_args = get_common_args(raw_args)
pass_config, extra_args = parse_config(extra_args, "pass")
pipeline_output, resources, input_model_config, extra_args = get_common_args(raw_args)
pass_config, extra_args = parse_config(extra_args, "pass", resources)

# Import the pass package configuration from the package_config
package_config = OlivePackageConfig.load_default_config()
Expand Down
123 changes: 69 additions & 54 deletions olive/systems/azureml/aml_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,31 +187,61 @@ def run_pass(

return self._load_model(model_config.to_json(check_object=True), output_model_path, pipeline_output_path)

def create_inputs_and_args(self, name: str, config_json: dict, tmp_dir: Path, ignore_keys=None):
inputs = {f"{name}_config": Input(type=AssetTypes.URI_FILE)}
args = {}

resource_map = {}
# create inputs and args for each resource in the config
for resource_key, resource_path in find_all_resources(config_json, ignore_keys=None).items():
resource_name = f"{name}__{'__'.join(map(str, resource_key))}"
inputs[resource_name], args[resource_name] = self._create_arg_and_input_from_resource_path(resource_path)
resource_map[resource_name] = resource_key
set_nested_dict_value(config_json, resource_key, None)

if resource_map:
resource_map_path = tmp_dir / f"{name}_resource_map.json"
with resource_map_path.open("w") as f:
json.dump(resource_map, f, sort_keys=True, indent=4)
inputs[f"{name}_resource_map"] = Input(type=AssetTypes.URI_FILE)
args[f"{name}_resource_map"] = Input(type=AssetTypes.URI_FILE, path=resource_map_path)

config_path = tmp_dir / f"{name}_config.json"
with config_path.open("w") as f:
json.dump(config_json, f, sort_keys=True, indent=4)
args[f"{name}_config"] = Input(type=AssetTypes.URI_FILE, path=config_path)

return inputs, args
def create_inputs_and_args(
self, all_configs: Dict[str, Dict], tmp_dir: Path, ignore_keys: Optional[List[str]] = None
):
"""Create inputs and args for a job.
:param all_configs: Dictionary of configs used in the job.
:param tmp_dir: The temporary directory to save the config json.
:param ignore_keys: The keys to ignore when creating inputs and args.
:return: The inputs and args for the job.
"""
all_resources = {}
inputs, args = {}, {}

# only create input for a unique resource once
# multiple inputs referring to the same aml model resource leads to invalid job error
resource_inputs, resource_args = {}, {}
for name, config in all_configs.items():
# create a copy of the config to avoid modifying the original config
config_copy = deepcopy(config)

# using a list of tuples since json cannot have tuple keys
resource_map = []
# create inputs and args for each resource in the config
for resource_key, resource_path in find_all_resources(config_copy, ignore_keys=ignore_keys).items():
resource_str = resource_path.get_path()
if resource_str not in all_resources:
resource_input_name = f"resource__{len(all_resources)}"
resource_inputs[resource_input_name], resource_args[resource_input_name] = (
self._create_arg_and_input_from_resource_path(resource_path)
)
all_resources[resource_str] = resource_input_name
resource_map.append((resource_key, all_resources[resource_str]))
set_nested_dict_value(config_copy, resource_key, None)

# input for the config
config_path = tmp_dir / f"{name}_config.json"
with config_path.open("w") as f:
json.dump(config_copy, f, sort_keys=True, indent=4)
inputs[f"{name}_config"] = Input(type=AssetTypes.URI_FILE)
args[f"{name}_config"] = Input(type=AssetTypes.URI_FILE, path=config_path)

# input for the resource map
if resource_map:
resource_map_path = tmp_dir / f"{name}_resource_map.json"
with resource_map_path.open("w") as f:
json.dump(resource_map, f, sort_keys=True, indent=4)
inputs[f"{name}_resource_map"] = Input(type=AssetTypes.URI_FILE)
args[f"{name}_resource_map"] = Input(type=AssetTypes.URI_FILE, path=resource_map_path)

# add resources to inputs and args at the end just for the sake of cleaner inputs and args order
return {**inputs, "num_resources": Input(type="integer"), **resource_inputs}, {
**args,
"num_resources": len(all_resources),
**resource_args,
}

def _create_arg_and_input_from_resource_path(self, resource_path: ResourcePath):
asset_type = get_asset_type_from_resource_path(resource_path)
Expand Down Expand Up @@ -339,13 +369,11 @@ def _create_pipeline_for_pass(
self.copy_code(code_files, code_root)

# prepare inputs
inputs, args = {}, {}
for name, config_json in [("model", model_config), ("pass", pass_config)]:
name_inputs, name_args = self.create_inputs_and_args(
name, config_json, tmp_dir, ignore_keys=["model_attributes"] if name == "model" else None
)
inputs.update(name_inputs)
args.update(name_args)
# want to ignore model_attributes since hf config has _model_name_or_path
# that points to where the config was loaded from
inputs, args = self.create_inputs_and_args(
{"model": model_config, "pass": pass_config}, tmp_dir, ignore_keys=["model_attributes"]
)

# prepare outputs
outputs = {"pipeline_output": Output(type=AssetTypes.URI_FOLDER)}
Expand Down Expand Up @@ -497,26 +525,16 @@ def _create_pipeline_for_evaluation(
):
tmp_dir = Path(tmp_dir)

# model args
model_inputs, model_args = self.create_inputs_and_args(
"model", model_config, tmp_dir, ignore_keys=["model_attributes"]
)

accelerator_config_path: Path = tmp_dir / "accelerator_config.json"
with accelerator_config_path.open("w") as f:
json.dump(accelerator.to_json(), f, sort_keys=True)

@pipeline
def evaluate_pipeline():
outputs = {}
for metric in metrics:
metric_tmp_dir = tmp_dir / metric.name
metric_component = self._create_metric_component(
metric_tmp_dir,
model_config,
metric,
model_inputs,
model_args,
accelerator_config_path,
accelerator.to_json(),
)
outputs[metric.name] = metric_component.outputs.pipeline_output
return outputs
Expand All @@ -529,10 +547,9 @@ def evaluate_pipeline():
def _create_metric_component(
self,
tmp_dir: Path,
model_config: dict,
metric: "Metric",
model_inputs: Dict[str, Input],
model_args: Dict[str, Input],
accelerator_config_path: str,
accelerator_config: dict,
):
metric_json = metric.to_json(check_object=True)

Expand All @@ -551,13 +568,11 @@ def _create_metric_component(
self.copy_code(code_files, code_root)

# prepare inputs
metric_inputs, metric_args = self.create_inputs_and_args("metric", metric_json, tmp_dir)
inputs = {**model_inputs, **metric_inputs, "accelerator_config": Input(type=AssetTypes.URI_FILE)}
args = {
**model_args,
**metric_args,
"accelerator_config": Input(type=AssetTypes.URI_FILE, path=accelerator_config_path),
}
inputs, args = self.create_inputs_and_args(
{"model": model_config, "metric": metric_json, "accelerator": accelerator_config},
tmp_dir,
ignore_keys=["model_attributes"],
)

# prepare outputs
outputs = {"pipeline_output": Output(type=AssetTypes.URI_FOLDER)}
Expand Down
44 changes: 29 additions & 15 deletions olive/systems/utils/arg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,36 @@


def parse_common_args(raw_args):
"""Parse common args."""
"""Parse common args.
Get the pipeline output path and resources.
Resources are expected to be provided as inputs of the form --resource__{i} where i is the index of the resource.
`num_resources` is the number of resources provided.
"""
parser = argparse.ArgumentParser("Olive common args")

# pipeline output arg
parser.add_argument("--pipeline_output", type=str, help="pipeline output path", required=True)

return parser.parse_known_args(raw_args)
# resources arg
parser.add_argument("--num_resources", type=int, help="number of resources", required=True)

args, extra_args = parser.parse_known_args(raw_args)

if args.num_resources == 0:
return args.pipeline_output, {}, extra_args

# parse resources
parser = argparse.ArgumentParser("Olive resources")
for i in range(args.num_resources):
parser.add_argument(f"--resource__{i}", type=str, help=f"resource {i} path", required=True)

def parse_config(raw_args, name: str) -> Tuple[dict, str]:
resource_args, extra_args = parser.parse_known_args(extra_args)

return args.pipeline_output, vars(resource_args), extra_args


def parse_config(raw_args, name: str, resources: dict) -> Tuple[dict, str]:
"""Parse config and related resource args."""
parser = argparse.ArgumentParser(f"{name} config")

Expand All @@ -43,15 +63,9 @@ def parse_config(raw_args, name: str) -> Tuple[dict, str]:
with open(args[resource_map_name]) as f:
resource_map = json.load(f)

# parse resources and replace in config
parser = argparse.ArgumentParser(f"{name} resources")
for resource_name in resource_map:
parser.add_argument(f"--{resource_name}", type=str, help=f"{resource_name} path", required=True)

args, extra_args = parser.parse_known_args(extra_args)
args = vars(args)
for resource_name, resource_key in resource_map.items():
set_nested_dict_value(config, resource_key, args[resource_name])
# replace resource paths in config
for resource_key, resource_name in resource_map:
set_nested_dict_value(config, resource_key, resources[resource_name])

return config, extra_args

Expand All @@ -62,8 +76,8 @@ def get_common_args(raw_args):
The return value includes json with the model resource paths filled in, the pipeline output path, and any
extra args that were not parsed.
"""
common_args, extra_args = parse_common_args(raw_args)
pipeline_output, resources, extra_args = parse_common_args(raw_args)

model_json, extra_args = parse_config(extra_args, "model")
model_json, extra_args = parse_config(extra_args, "model", resources)

return model_json, common_args.pipeline_output, extra_args
return pipeline_output, resources, model_json, extra_args
Loading

0 comments on commit 80e1fa9

Please sign in to comment.