Skip to content

Commit

Permalink
Nested scatters [BA-5803] (broadinstitute#5061)
Browse files Browse the repository at this point in the history
* Adding a flag to the configuration file; do we want to distinguish between inner and outer scatters?

* Using the innerOuterScatter flag in the WOM transforms code. Create a sub-workflow from the inner scatter, only if the flag is turned on.

* Fixing code review comments

* Fixing code review comments

* Changing womParse to wom-parse

* Adding test for the WomParseConfig convert-nested-scatter-to-subworkflow flag

* Passing the flag more directly, without using the configuration.
  • Loading branch information
orodeh authored and aednichols committed Jul 16, 2019
1 parent b87bea2 commit 80559d8
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ class CwlV1_0LanguageFactory(override val config: Config) extends LanguageFactor
} yield validatedWomNamespace
}

override def getWomBundle(workflowSource: WorkflowSource, workflowOptionsJson: WorkflowOptionsJson, importResolvers: List[ImportResolver], languageFactories: List[LanguageFactory]): Checked[WomBundle] =
override def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory],
convertNestedScatterToSubworkflow : Boolean = true): Checked[WomBundle] =
enabledCheck flatMap { _ => "No getWomBundle method implemented in CWL v1".invalidNelCheck }

override def createExecutable(womBundle: WomBundle, inputs: WorkflowJson, ioFunctions: IoFunctionSet): Checked[ValidatedWomNamespace] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ trait LanguageFactory {
def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory]): Checked[WomBundle]
languageFactories: List[LanguageFactory],
convertNestedScatterToSubworkflow : Boolean = true): Checked[WomBundle]

def createExecutable(womBundle: WomBundle,
inputs: WorkflowJson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ class WdlBiscayneLanguageFactory(override val config: Config) extends LanguageFa

}

override def getWomBundle(workflowSource: WorkflowSource, workflowOptionsJson: WorkflowOptionsJson, importResolvers: List[ImportResolver], languageFactories: List[LanguageFactory]): Checked[WomBundle] = {
override def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory],
convertNestedScatterToSubworkflow : Boolean = true): Checked[WomBundle] = {
val checkEnabled: CheckedAtoB[FileStringParserInput, FileStringParserInput] = CheckedAtoB.fromCheck(x => enabledCheck map(_ => x))
val converter: CheckedAtoB[FileStringParserInput, WomBundle] = checkEnabled andThen stringToAst andThen wrapAst andThen astToFileElement.map(FileElementToWomBundleInputs(_, workflowOptionsJson, importResolvers, languageFactories, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle
val converter: CheckedAtoB[FileStringParserInput, WomBundle] = checkEnabled andThen stringToAst andThen wrapAst andThen astToFileElement.map(FileElementToWomBundleInputs(_, workflowOptionsJson, convertNestedScatterToSubworkflow, importResolvers, languageFactories, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle
converter.run(FileStringParserInput(workflowSource, "input.wdl"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ class WdlDraft2LanguageFactory(override val config: Config) extends LanguageFact
}
}

override def getWomBundle(workflowSource: WorkflowSource, workflowOptionsJson: WorkflowOptionsJson, importResolvers: List[ImportResolver], languageFactories: List[LanguageFactory]): Checked[WomBundle] = {
override def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory],
convertNestedScatterToSubworkflow : Boolean = true): Checked[WomBundle] = {
for {
_ <- enabledCheck
namespace <- WdlNamespace.loadUsingSource(workflowSource, None, Some(importResolvers map resolverConverter)).toChecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ class WdlDraft3LanguageFactory(override val config: Config) extends LanguageFact

}

override def getWomBundle(workflowSource: WorkflowSource, workflowOptionsJson: WorkflowOptionsJson, importResolvers: List[ImportResolver], languageFactories: List[LanguageFactory]): Checked[WomBundle] = {
override def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory],
convertNestedScatterToSubworkflow : Boolean = true): Checked[WomBundle] = {
val checkEnabled: CheckedAtoB[FileStringParserInput, FileStringParserInput] = CheckedAtoB.fromCheck(x => enabledCheck map(_ => x))
val converter: CheckedAtoB[FileStringParserInput, WomBundle] = checkEnabled andThen stringToAst andThen wrapAst andThen astToFileElement.map(FileElementToWomBundleInputs(_, workflowOptionsJson, importResolvers, languageFactories, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle
val converter: CheckedAtoB[FileStringParserInput, WomBundle] = checkEnabled andThen stringToAst andThen wrapAst andThen astToFileElement.map(FileElementToWomBundleInputs(_, workflowOptionsJson, convertNestedScatterToSubworkflow, importResolvers, languageFactories, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle
converter.run(FileStringParserInput(workflowSource, "input.wdl"))
}

Expand Down
12 changes: 12 additions & 0 deletions wdl/transforms/draft3/src/test/cases/two_level_scatter.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version 1.0

workflow two_level_scatter {

Array[Int] indices = [1,2,3]

scatter(a in indices) {
scatter(b in indices) {
Int x = a + b
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,41 @@ object WdlFileToWdlomSpec {
),
Vector()
),
"two_level_scatter" -> FileElement(
Vector(),
Vector(),
Vector(WorkflowDefinitionElement(
"two_level_scatter",
None,
Set(
IntermediateValueDeclarationElement(
ArrayTypeElement(PrimitiveTypeElement(WomIntegerType)),
"indices",
ArrayLiteral(Vector(PrimitiveLiteralExpressionElement(WomInteger(1)), PrimitiveLiteralExpressionElement(WomInteger(2)), PrimitiveLiteralExpressionElement(WomInteger(3))))
),
ScatterElement(
scatterName = "ScatterAt8_11",
IdentifierLookup("indices"),
"a",
Vector(
ScatterElement(
scatterName = "ScatterAt9_13",
IdentifierLookup("indices"), "b",
Vector(
IntermediateValueDeclarationElement(PrimitiveTypeElement(WomIntegerType), "x", Add(IdentifierLookup("a"), IdentifierLookup("b")))),
sourceLocation = None
)
),
None
)
),
None,
None,
None,
Some(SourceFileLocation(3)))
),
Vector()
),
"simple_conditional" -> FileElement(
imports = Vector.empty,
structs = Vector.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import wom.callable.MetaValueElement._
import wom.callable.{CallableTaskDefinition, WorkflowDefinition}
import wom.executable.WomBundle
import wom.graph.expression.{ExposedExpressionNode, TaskCallInputExpressionNode}
import wom.graph.{ScatterNode, WorkflowCallNode}
import wom.types._

class WdlFileToWomSpec extends FlatSpec with Matchers {
Expand All @@ -41,7 +42,7 @@ class WdlFileToWomSpec extends FlatSpec with Matchers {
}

testOrIgnore {
val converter: CheckedAtoB[File, WomBundle] = fileToAst andThen wrapAst andThen astToFileElement.map(fe => FileElementToWomBundleInputs(fe, "{}", List.empty, List.empty, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle
val converter: CheckedAtoB[File, WomBundle] = fileToAst andThen wrapAst andThen astToFileElement.map(fe => FileElementToWomBundleInputs(fe, "{}", convertNestedScatterToSubworkflow = true, List.empty, List.empty, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle

converter.run(testCase) match {
case Right(bundle) => validators(testName).apply(bundle)
Expand All @@ -52,6 +53,68 @@ class WdlFileToWomSpec extends FlatSpec with Matchers {
}
}

// There is a scatter within a scatter
//
// scatter(a in indices) {
// scatter(b in indices) {
// Int x = a + b
// }
// }
//
it should "be able to leave nested scatters intact" in {
val converter: CheckedAtoB[File, WomBundle] = fileToAst andThen wrapAst andThen astToFileElement.map(fe => FileElementToWomBundleInputs(fe, "{}", convertNestedScatterToSubworkflow = false, List.empty, List.empty, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle

val twoLevelScatterFile = File("wdl/transforms/draft3/src/test/cases/two_level_scatter.wdl")

converter.run(twoLevelScatterFile) match {
case Right(bundle) =>
val wf = bundle.primaryCallable.get.asInstanceOf[WorkflowDefinition]
val graph = wf.innerGraph

// get the top scatter node
graph.scatters.size shouldBe(1)
val topScatter : ScatterNode = graph.scatters.toVector.head
val wfCalls = graph.allNodes.filterByType[WorkflowCallNode]

// don't generate any sub-workflows
wfCalls.size shouldBe(0)

// there should be one scatter inside the top scatter
val innerGraph = topScatter.innerGraph
innerGraph.scatters.size shouldBe(1)
Succeeded

case Left(errors) =>
val formattedErrors = errors.toList.mkString(System.lineSeparator(), System.lineSeparator(), System.lineSeparator())
fail(s"Failed to create WOM bundle: $formattedErrors")
}
}


it should "split a nested scatter into a toplevel scatter, and a bottom sub-workflow" in {
val converter: CheckedAtoB[File, WomBundle] = fileToAst andThen wrapAst andThen astToFileElement.map(fe => FileElementToWomBundleInputs(fe, "{}", convertNestedScatterToSubworkflow = true, List.empty, List.empty, workflowDefinitionElementToWomWorkflowDefinition, taskDefinitionElementToWomTaskDefinition)) andThen fileElementToWomBundle

val twoLevelScatterFile = File("wdl/transforms/draft3/src/test/cases/two_level_scatter.wdl")

converter.run(twoLevelScatterFile) match {
case Right(bundle) =>
val wf = bundle.primaryCallable.get.asInstanceOf[WorkflowDefinition]
val graph = wf.innerGraph

// There should be just one scatter.
graph.scatters.size shouldBe(1)
val wfCalls = graph.allNodes.filterByType[WorkflowCallNode]

// There should be a call to a generated sub-workflow in the graph
wfCalls.size shouldBe(1)
Succeeded
case Left(errors) =>
val formattedErrors = errors.toList.mkString(System.lineSeparator(), System.lineSeparator(), System.lineSeparator())
fail(s"Failed to create WOM bundle: $formattedErrors")
}
}


private val validators: Map[String, WomBundle => Assertion] = Map(
"declaration_chain" -> anyWomWillDo,
"empty_workflow" -> anyWomWillDo,
Expand All @@ -68,6 +131,7 @@ class WdlFileToWomSpec extends FlatSpec with Matchers {
"simple_scatter" -> anyWomWillDo,
"ogin_scatter" -> anyWomWillDo,
"nested_scatter" -> anyWomWillDo,
"two_level_scatter" -> anyWomWillDo,
"simple_conditional" -> anyWomWillDo,
"lots_of_nesting" -> anyWomWillDo,
"standalone_task" -> anyWomWillDo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ object FileElementToWomBundle {
val workflowsValidation: ErrorOr[Vector[WorkflowDefinition]] = {
a.fileElement.workflows.toVector.traverse { workflowDefinition =>

val convertInputs = WorkflowDefinitionConvertInputs(workflowDefinition, allStructs, localTaskMapping ++ imports.flatMap(_.allCallables))
val convertInputs = WorkflowDefinitionConvertInputs(workflowDefinition,
allStructs,
localTaskMapping ++ imports.flatMap(_.allCallables),
a.convertNestedScatterToSubworkflow)
a.workflowConverter.run(convertInputs).toValidated
}
}
Expand Down Expand Up @@ -129,6 +132,7 @@ object FileElementToWomBundle {

final case class FileElementToWomBundleInputs(fileElement: FileElement,
workflowOptionsJson: WorkflowOptionsJson,
convertNestedScatterToSubworkflow : Boolean,
importResolvers: List[ImportResolver],
languageFactories: List[LanguageFactory],
workflowConverter: CheckedAtoB[WorkflowDefinitionConvertInputs, WorkflowDefinition],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import wdl.transforms.base.wdlom2wom.graph.renaming._

object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {

final case class WorkflowDefinitionConvertInputs(definitionElement: WorkflowDefinitionElement, typeAliases: Map[String, WomType], callables: Map[String, Callable])
final case class WorkflowDefinitionConvertInputs(definitionElement: WorkflowDefinitionElement,
typeAliases: Map[String, WomType],
callables: Map[String, Callable],
convertNestedScatterToSubworkflow : Boolean)

def convert(b: WorkflowDefinitionConvertInputs)
(implicit expressionValueConsumer: ExpressionValueConsumer[ExpressionElement],
Expand All @@ -39,7 +42,10 @@ object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {
a.definitionElement.inputsSection.toSeq.flatMap(_.inputDeclarations) ++
a.definitionElement.outputsSection.toSeq.flatMap(_.outputs)

val innerGraph: ErrorOr[WomGraph] = convertGraphElements(GraphLikeConvertInputs(graphNodeElements, Set.empty, Map.empty, a.typeAliases, a.definitionElement.name, insideAScatter = false, a.callables))
val innerGraph: ErrorOr[WomGraph] = convertGraphElements(GraphLikeConvertInputs(graphNodeElements, Set.empty, Map.empty, a.typeAliases, a.definitionElement.name,
insideAScatter = false,
convertNestedScatterToSubworkflow = b.convertNestedScatterToSubworkflow,
a.callables))
// NB: isEmpty means "not isDefined". We specifically do NOT add defaults if the output section is defined but empty.
val withDefaultOutputs: ErrorOr[WomGraph] = if (a.definitionElement.outputsSection.isEmpty) {
innerGraph map { WomGraphMakerTools.addDefaultOutputs(_, Some(WomIdentifier(a.definitionElement.name))) }
Expand All @@ -60,6 +66,7 @@ object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {
typeAliases: Map[String, WomType],
workflowName: String,
insideAScatter: Boolean,
convertNestedScatterToSubworkflow: Boolean,
callables: Map[String, Callable])

def convertGraphElements(a: GraphLikeConvertInputs)
Expand All @@ -77,7 +84,7 @@ object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {

for {
linkedGraph <- LinkedGraphMaker.make(nodes = a.graphElements, seedGeneratedValueHandles ++ finished, typeAliases = a.typeAliases, callables = a.callables)
womGraph <- makeWomGraph(linkedGraph, a.seedNodes, a.externalUpstreamCalls, a.workflowName, a.insideAScatter, a.callables)
womGraph <- makeWomGraph(linkedGraph, a.seedNodes, a.externalUpstreamCalls, a.workflowName, a.insideAScatter, a.convertNestedScatterToSubworkflow, a.callables)
} yield womGraph
}

Expand All @@ -86,6 +93,7 @@ object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {
externalUpstreamCalls: Map[String, CallNode],
workflowName: String,
insideAScatter: Boolean,
convertNestedScatterToSubworkflow : Boolean,
callables: Map[String, Callable])
(implicit expressionValueConsumer: ExpressionValueConsumer[ExpressionElement],
fileEvaluator: FileEvaluator[ExpressionElement],
Expand All @@ -111,7 +119,7 @@ object WorkflowDefinitionElementToWomWorkflowDefinition extends Util {

val generatedGraphNodesValidation: ErrorOr[Set[GraphNode]] =
WorkflowGraphElementToGraphNode.convert(
GraphNodeMakerInputs(next, upstreamCallNodes, linkedGraph.consumedValueLookup, availableValues, linkedGraph.typeAliases, workflowName, insideAScatter, callables))
GraphNodeMakerInputs(next, upstreamCallNodes, linkedGraph.consumedValueLookup, availableValues, linkedGraph.typeAliases, workflowName, insideAScatter, convertNestedScatterToSubworkflow, callables))
generatedGraphNodesValidation map { nextGraphNodes: Set[GraphNode] => currentList ++ nextGraphNodes }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ object IfElementToGraphNode {
OuterGraphInputNode(WomIdentifier(name), port, preserveScatterIndex = true)
}).toSet

val graphLikeConvertInputs = GraphLikeConvertInputs(graphElements.toSet, ogins, foundOuterGenerators.completionPorts, a.availableTypeAliases, a.workflowName, insideAScatter = a.insideAnotherScatter, a.callables)
val graphLikeConvertInputs = GraphLikeConvertInputs(graphElements.toSet, ogins, foundOuterGenerators.completionPorts, a.availableTypeAliases, a.workflowName,
insideAScatter = a.insideAnotherScatter,
convertNestedScatterToSubworkflow = a.convertNestedScatterToSubworkflow,
a.callables)
val innerGraph: ErrorOr[Graph] = WorkflowDefinitionElementToWomWorkflowDefinition.convertGraphElements(graphLikeConvertInputs)

innerGraph map { ig =>
Expand All @@ -92,4 +95,5 @@ final case class ConditionalNodeMakerInputs(node: IfElement,
availableTypeAliases: Map[String, WomType],
workflowName: String,
insideAnotherScatter: Boolean,
convertNestedScatterToSubworkflow : Boolean,
callables: Map[String, Callable])
Loading

0 comments on commit 80559d8

Please sign in to comment.