Skip to content

[reconfigurator-execution] switch to async closures #7811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 25 additions & 32 deletions nexus/reconfigurator/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ fn register_zone_external_networking_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Ensure external networking resources",
move |_cx| async move {
async move |_cx| {
datastore
.blueprint_ensure_external_networking_resources(
opctx, blueprint,
Expand All @@ -327,7 +327,7 @@ fn register_support_bundle_failure_step<'a>(
ExecutionStepId::Cleanup,
"Mark support bundles as failed if they rely on \
an expunged disk or sled",
move |_cx| async move {
async move |_cx| {
let Some(nexus_id) = nexus_id else {
return StepSkipped::new((), "not running as Nexus").into();
};
Expand Down Expand Up @@ -355,26 +355,19 @@ fn register_sled_list_step<'a>(
datastore: &'a DataStore,
) -> StepHandle<Arc<BTreeMap<SledUuid, Sled>>> {
registrar
.new_step(
ExecutionStepId::Fetch,
"Fetch sled list",
move |_cx| async move {
let sleds_by_id: BTreeMap<SledUuid, _> = datastore
.sled_list_all_batched(opctx, SledFilter::InService)
.await
.context("listing all sleds")?
.into_iter()
.map(|db_sled| {
(
SledUuid::from_untyped_uuid(db_sled.id()),
db_sled.into(),
)
})
.collect();

StepSuccess::new(Arc::new(sleds_by_id)).into()
},
)
.new_step(ExecutionStepId::Fetch, "Fetch sled list", async move |_cx| {
let sleds_by_id: BTreeMap<SledUuid, _> = datastore
.sled_list_all_batched(opctx, SledFilter::InService)
.await
.context("listing all sleds")?
.into_iter()
.map(|db_sled| {
(SledUuid::from_untyped_uuid(db_sled.id()), db_sled.into())
})
.collect();

StepSuccess::new(Arc::new(sleds_by_id)).into()
})
.register()
}

Expand All @@ -388,7 +381,7 @@ fn register_deploy_sled_configs_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy sled configs",
move |cx| async move {
async move |cx| {
let sleds_by_id = sleds.into_value(cx.token()).await;
let res = omicron_sled_config::deploy_sled_configs(
opctx,
Expand Down Expand Up @@ -419,7 +412,7 @@ fn register_plumb_firewall_rules_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Plumb service firewall rules",
move |_cx| async move {
async move |_cx| {
let res = nexus_networking::plumb_service_firewall_rules(
datastore,
opctx,
Expand Down Expand Up @@ -448,7 +441,7 @@ fn register_dns_records_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy DNS records",
move |cx| async move {
async move |cx| {
let sleds_by_id = sleds.into_value(cx.token()).await;

let res = dns::deploy_dns(
Expand Down Expand Up @@ -478,7 +471,7 @@ fn register_cleanup_expunged_zones_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Cleanup expunged zones",
move |_cx| async move {
async move |_cx| {
let res = omicron_zones::clean_up_expunged_zones(
opctx, datastore, resolver, blueprint,
)
Expand All @@ -500,7 +493,7 @@ fn register_decommission_sleds_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Decommission sleds",
move |_cx| async move {
async move |_cx| {
let res =
sled_state::decommission_sleds(opctx, datastore, blueprint)
.await
Expand All @@ -521,7 +514,7 @@ fn register_decommission_disks_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Decommission expunged disks",
move |_cx| async move {
async move |_cx| {
let res = omicron_physical_disks::decommission_expunged_disks(
opctx, datastore, blueprint,
)
Expand All @@ -542,7 +535,7 @@ fn register_deploy_clickhouse_cluster_nodes_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy clickhouse cluster nodes",
move |_cx| async move {
async move |_cx| {
if let Some(clickhouse_cluster_config) =
&blueprint.clickhouse_cluster_config
{
Expand Down Expand Up @@ -571,7 +564,7 @@ fn register_deploy_clickhouse_single_node_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy single-node clickhouse cluster",
move |_cx| async move {
async move |_cx| {
let res =
clickhouse::deploy_single_node(opctx, blueprint).await;
Ok(map_err_to_step_warning(res))
Expand All @@ -592,7 +585,7 @@ fn register_reassign_sagas_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Reassign sagas",
move |_cx| async move {
async move |_cx| {
let Some(nexus_id) = nexus_id else {
return StepSkipped::new(false, "not running as Nexus")
.into();
Expand Down Expand Up @@ -630,7 +623,7 @@ fn register_cockroachdb_settings_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Ensure CockroachDB settings",
move |_cx| async move {
async move |_cx| {
let res =
cockroachdb::ensure_settings(opctx, datastore, blueprint)
.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn decommission_expunged_disks_impl(
expunged_disks: impl Iterator<Item = (SledUuid, PhysicalDiskUuid)>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<anyhow::Error> = stream::iter(expunged_disks)
.filter_map(|(sled_id, disk_id)| async move {
.filter_map(async |(sled_id, disk_id)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"disk_id" => disk_id.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion nexus/reconfigurator/execution/src/omicron_sled_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn deploy_sled_configs(
sled_configs: &BTreeMap<SledUuid, BlueprintSledConfig>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<_> = stream::iter(sled_configs)
.filter_map(|(sled_id, config)| async move {
.filter_map(async |(sled_id, config)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"generation" => i64::from(&config.sled_agent_generation),
Expand Down
2 changes: 1 addition & 1 deletion nexus/reconfigurator/execution/src/omicron_zones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn clean_up_expunged_zones_impl<R: CleanupResolver>(
zones_to_clean_up: impl Iterator<Item = (SledUuid, &BlueprintZoneConfig)>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<anyhow::Error> = stream::iter(zones_to_clean_up)
.filter_map(|(sled_id, config)| async move {
.filter_map(async |(sled_id, config)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"zone_id" => config.id.to_string(),
Expand Down
Loading