Skip to content

Commit

Permalink
Add HOG_GROUP field to WORKFLOW_STORE_ENTRY [BA-5752] (broadinstitute…
Browse files Browse the repository at this point in the history
  • Loading branch information
cjllanwarne authored Jul 1, 2019
1 parent 1a5e4f2 commit bde3bf8
Show file tree
Hide file tree
Showing 50 changed files with 407 additions and 310 deletions.
48 changes: 13 additions & 35 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package cromwell.backend

import _root_.wdl.draft2.model._
import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.Config
import common.validation.Validation._
import cromwell.core.WorkflowOptions.WorkflowOption
import cromwell.core.callcaching.MaybeCallCachingEligible
import cromwell.core.filesystem.CromwellFileSystems
import cromwell.core.labels.Labels
import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilderFactory}
import cromwell.core.{CallKey, WorkflowId, WorkflowOptions}
import cromwell.core.{CallKey, HogGroup, WorkflowId, WorkflowOptions}
import cromwell.docker.DockerInfoActor.DockerSize
import cromwell.services.keyvalue.KeyValueServiceActor.KvResponse
import net.ceedubs.ficus.Ficus._
Expand All @@ -21,7 +21,7 @@ import wom.values.WomArray.WomArrayLike
import wom.values._

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}
import scala.util.Try

/**
* For uniquely identifying a job which has been or will be sent to the backend.
Expand All @@ -36,13 +36,13 @@ case class BackendJobDescriptorKey(call: CommandCallNode, index: Option[Int], at
/**
* For passing to a BackendWorkflowActor for job execution or recovery
*/
case class BackendJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor,
key: BackendJobDescriptorKey,
runtimeAttributes: Map[LocallyQualifiedName, WomValue],
evaluatedTaskInputs: WomEvaluatedCallInputs,
maybeCallCachingEligible: MaybeCallCachingEligible,
dockerSize: Option[DockerSize],
prefetchedKvStoreEntries: Map[String, KvResponse]) {
final case class BackendJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor,
key: BackendJobDescriptorKey,
runtimeAttributes: Map[LocallyQualifiedName, WomValue],
evaluatedTaskInputs: WomEvaluatedCallInputs,
maybeCallCachingEligible: MaybeCallCachingEligible,
dockerSize: Option[DockerSize],
prefetchedKvStoreEntries: Map[String, KvResponse]) {

val fullyQualifiedInputs: Map[String, WomValue] = evaluatedTaskInputs map { case (declaration, value) =>
key.call.identifier.combine(declaration.name).fullyQualifiedName.value -> value
Expand All @@ -62,22 +62,11 @@ case class BackendJobDescriptor(workflowDescriptor: BackendWorkflowDescriptor,
case _ => Set.empty
}

val localInputs = evaluatedTaskInputs map { case (declaration, value) => declaration.name -> value }
val taskCall = key.call
lazy val localInputs = evaluatedTaskInputs map { case (declaration, value) => declaration.name -> value }
lazy val taskCall = key.call
override lazy val toString = key.mkTag(workflowDescriptor.id)
}

object BackendWorkflowDescriptor {
def apply(id: WorkflowId,
callable: ExecutableCallable,
knownValues: Map[OutputPort, WomValue],
workflowOptions: WorkflowOptions,
customLabels: Labels,
outputRuntimeExtractor: Option[WomOutputRuntimeExtractor] = None) = {
new BackendWorkflowDescriptor(id, callable, knownValues, workflowOptions, customLabels, List.empty, outputRuntimeExtractor)
}
}

/**
* For passing to a BackendActor construction time
*/
Expand All @@ -86,24 +75,13 @@ case class BackendWorkflowDescriptor(id: WorkflowId,
knownValues: Map[OutputPort, WomValue],
workflowOptions: WorkflowOptions,
customLabels: Labels,
hogGroup: HogGroup,
breadCrumbs: List[BackendJobBreadCrumb],
outputRuntimeExtractor: Option[WomOutputRuntimeExtractor]) {

val rootWorkflow = breadCrumbs.headOption.map(_.callable).getOrElse(callable)
val possiblyNotRootWorkflowId = id.toPossiblyNotRoot
val rootWorkflowId = breadCrumbs.headOption.map(_.id).getOrElse(id).toRoot
lazy val hogGroup = {
val config = ConfigFactory.load
if (config.hasPath("system.hog-safety.workflow-option")) {
val hogGroupField = config.getString("system.hog-safety.workflow-option")
workflowOptions.get(hogGroupField) match {
case Success(hg) => hg
case Failure(_) => rootWorkflowId.shortString
}
} else {
rootWorkflowId.shortString
}
}

override def toString: String = s"[BackendWorkflowDescriptor id=${id.shortString} workflowName=${callable.name}]"
def getWorkflowOption(key: WorkflowOption) = workflowOptions.get(key).toOption
Expand Down
7 changes: 5 additions & 2 deletions backend/src/test/scala/cromwell/backend/BackendSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cromwell.backend.BackendJobExecutionActor.{BackendJobExecutionResponse, J
import cromwell.backend.io.TestWorkflows._
import cromwell.core.callcaching.NoDocker
import cromwell.core.labels.Labels
import cromwell.core.{WorkflowId, WorkflowOptions}
import cromwell.core.{HogGroup, WorkflowId, WorkflowOptions}
import common.exception.AggregatedException
import org.scalatest.Matchers
import org.scalatest.concurrent.{ScalaFutures, ScaledTimeSpans}
Expand Down Expand Up @@ -45,7 +45,10 @@ trait BackendSpec extends ScalaFutures with Matchers with Mockito with ScaledTim
executable.entryPoint,
executable.resolvedExecutableInputs.flatMap({case (port, v) => v.select[WomValue] map { port -> _ }}),
options,
Labels.empty
Labels.empty,
HogGroup("foo"),
List.empty,
None
)
}

Expand Down
32 changes: 32 additions & 0 deletions core/src/main/scala/cromwell/core/HogGroup.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cromwell.core

import com.typesafe.config.ConfigFactory

import scala.util.{Failure, Success}

final case class HogGroup(value: String) extends AnyVal

object HogGroup {

type HogGroupDeciderFunction = (WorkflowOptions, WorkflowId) => HogGroup

// NB: This is separated out from the apply so that we only have to load the config once:
val HogGroupDeciderFunction: HogGroupDeciderFunction = {
val config = ConfigFactory.load

if (config.hasPath("system.hog-safety.workflow-option")) {
val hogGroupField = config.getString("system.hog-safety.workflow-option")

(options, workflowId) => {
options.get(hogGroupField) match {
case Success(hg) => HogGroup(hg)
case Failure(_) => HogGroup(workflowId.shortString)
}
}
} else {
(_, workflowId) => HogGroup(workflowId.shortString)
}
}

def decide(workflowOptions: WorkflowOptions, workflowId: WorkflowId): HogGroup = HogGroupDeciderFunction.apply(workflowOptions, workflowId)
}
24 changes: 14 additions & 10 deletions core/src/main/scala/cromwell/core/WorkflowOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,22 @@ case class WorkflowOptions(jsObject: JsObject) {

/**
* Returns a JSON representation of these workflow options where the encrypted values
* have been replaced by the string "cleared". This will be called on the workflow
* options (and subsequently stored back in the database) once a workflow finishes
* and the encrypted values aren't needed anymore. This protects us in case the
* database and private key become compromised, the attacker will not be able to
* decrypt values for completed workflows.
* have been replaced by the string "cleared".
*
* Used to protect encrypted values from being stored in metadata
*/
def clearEncryptedValues: String = {
val revoked = jsObject.fields map {
case (k, v: JsObject) if isEncryptedField(v) => k -> JsString("cleared")
case (k, v) => k -> v
def clearEncryptedValues: WorkflowOptions = {

def revoke(o: JsObject): JsObject = {
val newFields = o.fields map {
case (k, v: JsObject) if isEncryptedField(v) => k -> JsString("cleared")
case (k, v: JsObject) => k -> revoke(v)
case (k, v) => k -> v
}
JsObject(newFields)
}
JsObject(revoked).prettyPrint

WorkflowOptions(revoke(jsObject))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ sealed trait WorkflowSourceFilesCollection {
def workflowUrl: Option[WorkflowUrl]
def workflowRoot: Option[String]
def inputsJson: WorkflowJson
def workflowOptionsJson: WorkflowOptionsJson
def workflowOptions: WorkflowOptions
def labelsJson: WorkflowJson
def workflowType: Option[WorkflowType]
def workflowTypeVersion: Option[WorkflowTypeVersion]
Expand All @@ -24,9 +24,12 @@ sealed trait WorkflowSourceFilesCollection {
case w: WorkflowSourceFilesWithDependenciesZip => Option(w.importsZip) // i.e. Some(importsZip) if our wiring is correct
}

def copyOptions(workflowOptions: WorkflowOptionsJson) = this match {
case w: WorkflowSourceFilesWithoutImports => w.copy(workflowOptionsJson = workflowOptions)
case w: WorkflowSourceFilesWithDependenciesZip => w.copy(workflowOptionsJson = workflowOptions)
def setOptions(workflowOptions: WorkflowOptions) = {

this match {
case w: WorkflowSourceFilesWithoutImports => w.copy(workflowOptions = workflowOptions)
case w: WorkflowSourceFilesWithDependenciesZip => w.copy(workflowOptions = workflowOptions)
}
}
}

Expand All @@ -37,7 +40,7 @@ object WorkflowSourceFilesCollection {
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WorkflowJson,
workflowOptionsJson: WorkflowOptionsJson,
workflowOptions: WorkflowOptions,
labelsJson: WorkflowJson,
importsFile: Option[Array[Byte]],
workflowOnHold: Boolean,
Expand All @@ -50,7 +53,7 @@ object WorkflowSourceFilesCollection {
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
inputsJson = inputsJson,
workflowOptionsJson = workflowOptionsJson,
workflowOptions = workflowOptions,
labelsJson = labelsJson,
importsZip = imports,
workflowOnHold = workflowOnHold,
Expand All @@ -63,7 +66,7 @@ object WorkflowSourceFilesCollection {
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
inputsJson = inputsJson,
workflowOptionsJson = workflowOptionsJson,
workflowOptions = workflowOptions,
labelsJson = labelsJson,
workflowOnHold = workflowOnHold,
warnings = warnings)
Expand All @@ -76,7 +79,7 @@ final case class WorkflowSourceFilesWithoutImports(workflowSource: Option[Workfl
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WorkflowJson,
workflowOptionsJson: WorkflowOptionsJson,
workflowOptions: WorkflowOptions,
labelsJson: WorkflowJson,
workflowOnHold: Boolean = false,
warnings: Seq[String]) extends WorkflowSourceFilesCollection
Expand All @@ -87,13 +90,13 @@ final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: Option[W
workflowType: Option[WorkflowType],
workflowTypeVersion: Option[WorkflowTypeVersion],
inputsJson: WorkflowJson,
workflowOptionsJson: WorkflowOptionsJson,
workflowOptions: WorkflowOptions,
labelsJson: WorkflowJson,
importsZip: Array[Byte],
workflowOnHold: Boolean = false,
warnings: Seq[String]) extends WorkflowSourceFilesCollection {
override def toString = {
s"WorkflowSourceFilesWithDependenciesZip($workflowSource, $workflowUrl, $workflowType, $workflowTypeVersion," +
s" $inputsJson, $workflowOptionsJson, $labelsJson, <<ZIP BINARY CONTENT>>, $warnings)"
s""" $inputsJson, ${workflowOptions.asPrettyJson}, $labelsJson, <<ZIP BINARY CONTENT>>, $warnings)"""
}
}
10 changes: 5 additions & 5 deletions core/src/test/scala/cromwell/core/WorkflowOptionsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class WorkflowOptionsSpec extends Matchers with WordSpecLike {
refreshTokenEncrypted shouldBe a [JsObject]
refreshTokenEncrypted.asInstanceOf[JsObject].fields.keys shouldEqual Set("iv", "ciphertext")

options.clearEncryptedValues shouldEqual """{
| "key": "value",
| "refresh_token": "cleared"
|}""".stripMargin
options.clearEncryptedValues.asPrettyJson shouldEqual """{
| "key": "value",
| "refresh_token": "cleared"
|}""".stripMargin
case _ => fail("Expecting workflow options to be parseable")
}
}
}
}
}
6 changes: 3 additions & 3 deletions core/src/test/scala/cromwell/util/SampleWdl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.util
import java.util.UUID

import cromwell.core.path.{DefaultPathBuilder, Path}
import cromwell.core.{WorkflowSourceFilesCollection, WorkflowSourceFilesWithDependenciesZip, WorkflowSourceFilesWithoutImports}
import cromwell.core.{WorkflowOptions, WorkflowSourceFilesCollection, WorkflowSourceFilesWithDependenciesZip, WorkflowSourceFilesWithoutImports}
import spray.json._
import wom.core.{ExecutableInputMap, WorkflowJson, WorkflowSource}
import wom.values._
Expand Down Expand Up @@ -38,7 +38,7 @@ trait SampleWdl extends TestFileUtil {
workflowUrl = None,
workflowRoot = None,
inputsJson = workflowJson,
workflowOptionsJson = workflowOptions,
workflowOptions = WorkflowOptions.fromJsonString(workflowOptions).get,
labelsJson = labels,
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
Expand All @@ -51,7 +51,7 @@ trait SampleWdl extends TestFileUtil {
workflowUrl = None,
workflowRoot = None,
inputsJson = workflowJson,
workflowOptionsJson = workflowOptions,
workflowOptions = WorkflowOptions.fromJsonString(workflowOptions).get,
labelsJson = labels,
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
Expand Down
5 changes: 5 additions & 0 deletions database/migration/src/main/resources/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
<include file="changesets/docker_hash_store_add_size_column.xml" relativeToChangelogFile="true" />
<include file="changesets/enlarge_call_caching_hash_entry_id.xml" relativeToChangelogFile="true" />
<include file="changesets/postgresql_engine_schema.xml" relativeToChangelogFile="true" />
<include file="changesets/add_hog_group_in_workflow_store.xml" relativeToChangelogFile="true" />
<!-- REMINDER!
Before appending here, did you remember to include the 'objectQuotingStrategy="QUOTE_ALL_OBJECTS"' line in your changeset/xyz.xml...?
-->

</databaseChangeLog>
<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog objectQuotingStrategy="QUOTE_ALL_OBJECTS"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="add_hog_group_in_workflow_store" author="cjllanwarne">
<addColumn tableName="WORKFLOW_STORE_ENTRY">
<column name="HOG_GROUP" type="VARCHAR(100)">
<constraints nullable="true" />
</column>
</addColumn>
</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ class ClearMetadataEntryWorkflowOptions extends WorkflowOptionsChange {
override val workflowOptionsColumn = "METADATA_VALUE"
override val additionalReadBatchFilters = "AND METADATA_KEY = 'submittedFiles:options'"

override def migrateWorkflowOptions(workflowOptions: WorkflowOptions) = workflowOptions.clearEncryptedValues
override def migrateWorkflowOptions(workflowOptions: WorkflowOptions) = workflowOptions.clearEncryptedValues.asPrettyJson
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ trait WorkflowStoreEntryComponent {

def heartbeatTimestamp = column[Option[Timestamp]]("HEARTBEAT_TIMESTAMP")

def hogGroup = column[Option[String]]("HOG_GROUP", O.Length(100))

override def * = (workflowExecutionUuid, workflowDefinition, workflowUrl, workflowRoot, workflowType, workflowTypeVersion, workflowInputs, workflowOptions, workflowState,
submissionTime, importsZip, customLabels, cromwellId, heartbeatTimestamp, workflowStoreEntryId.?) <> ((WorkflowStoreEntry.apply _).tupled, WorkflowStoreEntry.unapply)
submissionTime, importsZip, customLabels, cromwellId, heartbeatTimestamp, hogGroup, workflowStoreEntryId.?) <> ((WorkflowStoreEntry.apply _).tupled, WorkflowStoreEntry.unapply)

def ucWorkflowStoreEntryWeu = index("UC_WORKFLOW_STORE_ENTRY_WEU", workflowExecutionUuid, unique = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ case class WorkflowStoreEntry
customLabels: SerialClob,
cromwellId: Option[String],
heartbeatTimestamp: Option[Timestamp],
hogGroup: Option[String],
workflowStoreEntryId: Option[Int] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
with WorkflowInstrumentation with Timers {

implicit val ec = context.dispatcher
private val WorkflowToStart(workflowId, submissionTime, sources, initialStartableState) = workflowToStart
private val WorkflowToStart(workflowId, submissionTime, sources, initialStartableState, hogGroup) = workflowToStart
override val workflowIdForLogging = workflowId.toPossiblyNotRoot
override val rootWorkflowIdForLogging = workflowId.toRoot

Expand Down Expand Up @@ -248,7 +248,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,

when(WorkflowUnstartedState) {
case Event(StartWorkflowCommand, _) =>
val actor = context.actorOf(MaterializeWorkflowDescriptorActor.props(serviceRegistryActor, workflowId, importLocalFilesystem = !serverMode, ioActorProxy = ioActor),
val actor = context.actorOf(MaterializeWorkflowDescriptorActor.props(serviceRegistryActor, workflowId, importLocalFilesystem = !serverMode, ioActorProxy = ioActor, hogGroup = hogGroup),
"MaterializeWorkflowDescriptorActor")
pushWorkflowStart(workflowId)
actor ! MaterializeWorkflowDescriptorCommand(sources, conf)
Expand Down Expand Up @@ -430,7 +430,7 @@ class WorkflowActor(workflowToStart: WorkflowToStart,
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor.
*/
def bruteForceWorkflowOptions: WorkflowOptions = WorkflowOptions.fromJsonString(sources.workflowOptionsJson).getOrElse(WorkflowOptions.fromJsonString("{}").get)
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions
val system = context.system
val ec = context.system.dispatcher
def bruteForcePathBuilders: Future[List[PathBuilder]] = {
Expand Down
Loading

0 comments on commit bde3bf8

Please sign in to comment.