diff --git a/CHANGELOG.md b/CHANGELOG.md index d2abf9db9ef..b6e7a777db5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Cromwell Change Log +## 45 Release Notes + +### BCS backend new Features support + +#### New docker registry +Alibaba Cloud Container Registry is now supported for the `docker` runtime attribute, and the previous `dockerTag` +runtime attribute continues to be available for Alibaba Cloud OSS Registry. +#### Call caching +Cromwell now supports Call caching when using the BCS backend. +#### Workflow output glob +Globs can be used to define outputs for BCS backend. +#### NAS mount +Alibaba Cloud NAS is now supported for the `mounts` runtime attribute. + ## 44 Release Notes ### Improved PAPI v2 Preemptible VM Support diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index b11fa10b1af..489c4f653d1 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -376,6 +376,7 @@ docker { } dockerhub.num-threads = 10 quay.num-threads = 10 + alibabacloudcr.num-threads = 10 } } diff --git a/cromwell.example.backends/BCS.conf b/cromwell.example.backends/BCS.conf index 7dfcc765519..e910947d5e0 100644 --- a/cromwell.example.backends/BCS.conf +++ b/cromwell.example.backends/BCS.conf @@ -45,7 +45,6 @@ backend { #reserveOnFail: true #autoReleaseJob: true #verbose: false - #workerPath: "oss://bcs-bucket/workflow/worker.tar.gz" #systemDisk: "cloud 50" #dataDisk: "cloud 250 /home/data/" #timeout: 3000 diff --git a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala index fda13b4a135..032dbb9e370 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala @@ -12,6 +12,7 @@ import common.validation.Validation._ import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext} import cromwell.core.{Dispatcher, DockerConfiguration} import cromwell.docker.DockerInfoActor._ +import cromwell.docker.registryv2.flows.alibabacloudcrregistry._ import cromwell.docker.registryv2.DockerRegistryV2Abstract import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry import cromwell.docker.registryv2.flows.gcr.GcrRegistry @@ -237,7 +238,8 @@ object DockerInfoActor { List( ("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }), ("gcr", gcrConstructor), - ("quay", { c: DockerRegistryConfig => new QuayRegistry(c) }) + ("quay", { c: DockerRegistryConfig => new QuayRegistry(c) }), + ("alibabacloudcr", {c: DockerRegistryConfig => new AlibabaCloudCRRegistry(c)}) ).traverse[ErrorOr, DockerRegistry]({ case (configPath, constructor) => DockerRegistryConfig.fromConfig(config.as[Config](configPath)).map(constructor) }).unsafe("Docker registry configuration") diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/alibabacloudcr/AlibabaCloudCRRegistry.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/alibabacloudcr/AlibabaCloudCRRegistry.scala new file mode 100644 index 00000000000..5924eeb2a38 --- /dev/null +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/alibabacloudcr/AlibabaCloudCRRegistry.scala @@ -0,0 +1,136 @@ +package cromwell.docker.registryv2.flows.alibabacloudcrregistry + +import akka.stream._ +import cats.effect.IO +import com.aliyuncs.DefaultAcsClient +import com.aliyuncs.auth.{AlibabaCloudCredentials, BasicCredentials, BasicSessionCredentials} +import com.aliyuncs.cr.model.v20160607.GetRepoTagsRequest +import com.aliyuncs.profile.DefaultProfile +import com.aliyuncs.profile.IClientProfile +import cromwell.docker.DockerHashResult +import cromwell.docker.DockerInfoActor._ +import cromwell.docker._ +import cromwell.docker.registryv2.DockerRegistryV2Abstract +import org.http4s.Header +import org.http4s.client.Client +import scala.util.matching.Regex +import scala.util.{Failure, Success, Try} +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +class AlibabaCloudCRRegistry(config: DockerRegistryConfig) extends DockerRegistryV2Abstract(config) { + val ProductName = "cr" + val HashAlg = "sha256" + val regionPattern = """[^\s]+""" + val validAlibabaCloudCRHosts: Regex = s"""registry.($regionPattern).aliyuncs.com""".r + + + def isValidAlibabaCloudCRHost(host: Option[String]): Boolean = { + host.exists { + _ match { + case validAlibabaCloudCRHosts(_) => true + case _ => false + } + } + } + + override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean = isValidAlibabaCloudCRHost(dockerImageIdentifier.host) + + override protected def getToken(dockerInfoContext: DockerInfoContext)(implicit client: Client[IO]): IO[Option[String]] = { + IO.pure(None) + } + + override protected def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String = "" + override protected def authorizationServerHostName(dockerImageIdentifier: DockerImageIdentifier): String = "" + override protected def buildTokenRequestHeaders(dockerInfoContext: DockerInfoContext): List[Header] = List.empty + + override protected def getDockerResponse(token: Option[String], dockerInfoContext: DockerInfoContext)(implicit client: Client[IO]): IO[DockerInfoSuccessResponse] = { + getManifest(dockerInfoContext) match { + case success: DockerInfoSuccessResponse => IO(success) + case fail: DockerInfoFailedResponse => IO.raiseError(new Exception(fail.reason)) + case other => IO.raiseError(new Exception(s"Get manifest failed, $other")) + } + } + + private def getManifest(context: DockerInfoContext): DockerInfoResponse = { + + val regionId = context.dockerImageID.host match { + case Some(validAlibabaCloudCRHosts(region)) => region + case _ => throw new Exception(s"The host ${context.dockerImageID.host} does not have the expected region id") + } + + val endpoint = ProductName + "." + regionId + ".aliyuncs.com" + DefaultProfile.addEndpoint(regionId, ProductName, endpoint) + + val profile: IClientProfile = getAliyunCredentialFromContext(context) match { + case Some(cred: BasicCredentials) => DefaultProfile.getProfile(regionId, cred.getAccessKeyId(), cred.getAccessKeySecret()) + case Some(sCred: BasicSessionCredentials) => DefaultProfile.getProfile(regionId, sCred.getAccessKeyId(), sCred.getAccessKeySecret(), sCred.getSessionToken()) + case _ => throw new Exception(s"Invalid credential from context, ${context}") + } + + val client: DefaultAcsClient = new DefaultAcsClient(profile) + val request: GetRepoTagsRequest = new GetRepoTagsRequest() + val dockerImageID = context.dockerImageID + request.setRepoName(dockerImageID.image) + dockerImageID.repository foreach { repository => request.setRepoNamespace(repository) } + + manifestResponseHandler(client, request, context) + .getOrElse(new Exception(s"handle response fail, please make sure the image id is correct: ${context.dockerImageID}")) match { + case succ: DockerInfoSuccessResponse => succ + case fail: DockerInfoFailedResponse => fail + case ex: Exception => throw new Exception(s"Get AliyunCr manifest failed, ${ex.getMessage}") + } + } + + private[alibabacloudcrregistry] def getAliyunCredentialFromContext(context: DockerInfoContext): Option[AlibabaCloudCredentials] = { + context.credentials find { + _.isInstanceOf[AlibabaCloudCredentials] + } match { + case Some(cred: BasicCredentials) => Some(cred) + case Some(sCred: BasicSessionCredentials) => Some(sCred) + case _ => None + } + } + + private def matchTag(jsObject: JsObject, dockerHashContext: DockerInfoContext): Boolean = { + val tag = dockerHashContext.dockerImageID.reference + jsObject.fields.get("tag") match { + case Some(tagObj: JsString) if tagObj.value == tag => true + case _ => false + } + } + + private[alibabacloudcrregistry] def extractDigestFromBody(jsObject: JsObject, dockerHashContext: DockerInfoContext): DockerInfoResponse = { + val tags = jsObject.fields.get("data") match { + case Some(data) => data.asJsObject().convertTo[Map[String, JsValue]].get("tags") match { + case Some(tag) => tag.convertTo[Seq[JsObject]] + case None => throw new Exception(s"Manifest response did not contain a tags field, ${jsObject}") + } + case None => throw new Exception(s"Manifest response did not contain a data field, Please make sure the existence of image, ${jsObject}") + } + + tags find { matchTag(_, dockerHashContext)} match { + case Some(tagObj) => + tagObj.fields.get("digest") match { + case Some(digest: JsString) => + DockerHashResult.fromString(HashAlg + ":" + digest.value) match { + case Success(r) => DockerInfoSuccessResponse(DockerInformation(r, None), dockerHashContext.request) + case Failure(t) => DockerInfoFailedResponse(t, dockerHashContext.request) + } + case Some(_) => DockerInfoFailedResponse((new Exception(s"Manifest response contains a non-string digest field, ${jsObject}")), dockerHashContext.request) + case None => DockerInfoFailedResponse((new Exception(s"Manifest response did not contain a digest field, ${jsObject}")), dockerHashContext.request) + } + case None => DockerInfoFailedResponse((new Exception(s"Manifest response did not contain a expected tag: ${dockerHashContext.dockerImageID.reference}, ${jsObject}")), dockerHashContext.request) + } + } + + private def manifestResponseHandler(client: DefaultAcsClient, request: GetRepoTagsRequest, dockerHashContext: DockerInfoContext): Try[DockerInfoResponse] = { + for { + response <- Try(client.doAction(request)) + jsObj <- Try(if (response.isSuccess) response.getHttpContentString.parseJson.asJsObject() + else throw new Exception(s"Get manifest request not success: ${response}")) + dockInfoRes <- Try(extractDigestFromBody(jsObj, dockerHashContext)) + } yield dockInfoRes + } +} + diff --git a/dockerHashing/src/test/scala/cromwell/docker/DockerImageIdentifierSpec.scala b/dockerHashing/src/test/scala/cromwell/docker/DockerImageIdentifierSpec.scala index 58962860562..6c67d2ea262 100644 --- a/dockerHashing/src/test/scala/cromwell/docker/DockerImageIdentifierSpec.scala +++ b/dockerHashing/src/test/scala/cromwell/docker/DockerImageIdentifierSpec.scala @@ -11,17 +11,18 @@ class DockerImageIdentifierSpec extends FlatSpec with Matchers with TableDrivenP ("sourceString", "host", "repo", "image", "reference"), // Without tags -> latest ("ubuntu", None, None, "ubuntu", "latest"), - ("broad/cromwell", None, Some("broad"), "cromwell", "latest"), + ("broad/cromwell", None, Option("broad"), "cromwell", "latest"), ("index.docker.io/ubuntu", Option("index.docker.io"), None, "ubuntu", "latest"), - ("broad/cromwell/submarine", None, Some("broad/cromwell"), "submarine", "latest"), - ("gcr.io/google/slim", Option("gcr.io"), Some("google"), "slim", "latest"), + ("broad/cromwell/submarine", None, Option("broad/cromwell"), "submarine", "latest"), + ("gcr.io/google/slim", Option("gcr.io"), Option("google"), "slim", "latest"), // With tags ("ubuntu:latest", None, None, "ubuntu", "latest"), ("ubuntu:1235-SNAP", None, None, "ubuntu", "1235-SNAP"), ("ubuntu:V3.8-5_1", None, None, "ubuntu", "V3.8-5_1"), - ("index.docker.io:9999/ubuntu:170904", Some("index.docker.io:9999"), None, "ubuntu", "170904"), - ("localhost:5000/capture/transwf:170904", Some("localhost:5000"), Some("capture"), "transwf", "170904"), - ("quay.io/biocontainers/platypus-variant:0.8.1.1--htslib1.5_0", Option("quay.io"), Some("biocontainers"), "platypus-variant", "0.8.1.1--htslib1.5_0") + ("index.docker.io:9999/ubuntu:170904", Option("index.docker.io:9999"), None, "ubuntu", "170904"), + ("localhost:5000/capture/transwf:170904", Option("localhost:5000"), Option("capture"), "transwf", "170904"), + ("quay.io/biocontainers/platypus-variant:0.8.1.1--htslib1.5_0", Option("quay.io"), Option("biocontainers"), "platypus-variant", "0.8.1.1--htslib1.5_0"), + ("registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:0.2", Option("registry.cn-shanghai.aliyuncs.com"), Option("batchcompute"), "ubuntu", "0.2") ) forAll(valid) { (dockerString, host, repo, image, reference) => diff --git a/dockerHashing/src/test/scala/cromwell/docker/registryv2/AlibabaCloudCRRegistrySpec.scala b/dockerHashing/src/test/scala/cromwell/docker/registryv2/AlibabaCloudCRRegistrySpec.scala new file mode 100644 index 00000000000..34124cc4ae1 --- /dev/null +++ b/dockerHashing/src/test/scala/cromwell/docker/registryv2/AlibabaCloudCRRegistrySpec.scala @@ -0,0 +1,142 @@ +package cromwell.docker.registryv2.flows.alibabacloudcrregistry + +import com.aliyuncs.auth.{BasicCredentials, BasicSessionCredentials} +import cromwell.docker.DockerInfoActor.{DockerInfoContext, DockerInfoFailedResponse, DockerInfoSuccessResponse, DockerInformation} +import cromwell.docker.{DockerHashResult, DockerImageIdentifier, DockerInfoRequest, DockerRegistryConfig} + +import net.ceedubs.ficus.Ficus._ +import com.typesafe.config.{Config, ConfigFactory} +import cromwell.core.TestKitSuite +import org.scalatest.{BeforeAndAfter, FlatSpecLike, Matchers} +import org.scalatest.mockito.MockitoSugar +import spray.json._ + +object AlibabaCloudCRRegistrySpec { + + val AlibabaCloudCRRegistryConfigString = + s""" + |enable = true + |# How should docker hashes be looked up. Possible values are "local" and "remote" + |# "local": Lookup hashes on the local docker daemon using the cli + |# "remote": Lookup hashes on docker hub and gcr + |method = "remote" + |alibabacloudcr { + | num-threads = 5 + | auth { + | access-id = "test-access-id" + | access-key = "test-access-key" + | security-token = "test-security-token" + | } + |} + | + """.stripMargin + + val AlibabaCloudCRRegistryConfig = ConfigFactory.parseString(AlibabaCloudCRRegistryConfigString) +} + +class AlibabaCloudCRRegistrySpec extends TestKitSuite with FlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter { + behavior of "AlibabaCloudCRRegistry" + + val hashValue = "fcf39ed78ef0fa27bcc74713b85259alop1b12e6a201e3083af50fd8eda1cbe1" + val tag = "0.2" + val notExistTag = "0.3" + val CRResponse = + s""" + |{ + | "data": { + | "total": 2, + | "pageSize": 30, + | "page": 1, + | "tags": [ + | { + | "imageUpdate": 1514432549000, + | "imageId": "2842876c9b98f8c7607c1123ks18ff040b76a1d932c6d60c96aa3c283bd221cd", + | "digest": "83414d2c3b04e0lo1q7693e31aeca95b82c61949ea8de858579bf16bd92490c6", + | "imageSize": 715764, + | "tag": "0.1", + | "imageCreate": 1514432549000, + | "status": "NORMAL" + | }, + | { + | "imageUpdate": 1514372113000, + | "imageId": "414e6daa772a8cd5dfloqpe503e6e313c372d2e15958ab649709daf9b1065479", + | "digest": "$hashValue", + | "imageSize": 715653, + | "tag": "$tag", + | "imageCreate": 1514372044000, + | "status": "NORMAL" + | } + | ] + | }, + | "requestId": "9AFB52D3-6631-4B00-A857-932492097726" + |}""".stripMargin + + + it should "have correct Alibaba Cloud CR image" in { + val configPath = "alibabacloudcr" + val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default)) + + val testCRDockerImage = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag" + val testInvalidCRDockerImage = "registry.cn-not-exist.aliyuncs.com/batchcompute/ubuntu:0.2" + registry.accepts(DockerImageIdentifier.fromString(testCRDockerImage).get) shouldEqual true + registry.isValidAlibabaCloudCRHost(Some(testInvalidCRDockerImage)) shouldEqual false + registry.isValidAlibabaCloudCRHost(None) shouldEqual false + } + + it should "successfully extract digest from body" in { + val configPath = "alibabacloudcr" + val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default)) + + val testCRDockerImage = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag" + + val expectedDockerHashResult = DockerHashResult("sha256", hashValue) + val expectedDockerInfomation = DockerInformation(expectedDockerHashResult, None) + val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImage).get, List.empty) + val expectedDockerResponse = DockerInfoSuccessResponse(expectedDockerInfomation, dockerRequest) + + val context: DockerInfoContext = DockerInfoContext(dockerRequest, null) + registry.extractDigestFromBody(CRResponse.parseJson.asJsObject(), context) shouldEqual expectedDockerResponse + } + + it should "NOT successfully extract digest from body" in { + val configPath = "alibabacloudcr" + val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default)) + + val testCRDockerImageTagNotExist = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$notExistTag" + + val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List.empty) + val context: DockerInfoContext = DockerInfoContext(dockerRequest, null) + + val cRResponseJsObj = CRResponse.parseJson.asJsObject() + registry.extractDigestFromBody(cRResponseJsObj, context) match { + case DockerInfoFailedResponse(t, _) => t.getMessage should be(s"Manifest response did not contain a expected tag: $notExistTag, ${cRResponseJsObj}") + case _ => fail("Failed to get a DockerInfoFailedResponse result.") + } + } + + it should "successfully get the correct credentials from context" in { + val configPath = "alibabacloudcr" + val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default)) + + val testCRDockerImageTagNotExist = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag" + val access_id = "test-access-id" + val access_key = "test-access-key" + val security_token = "test-token" + + val basicCredential = new BasicCredentials(access_id, access_key) + val sessionCredential = new BasicSessionCredentials(access_id, access_key, security_token) + + val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List(basicCredential)) + val context: DockerInfoContext = DockerInfoContext(dockerRequest, null) + registry.getAliyunCredentialFromContext(context) shouldEqual Some(basicCredential) + + val sessionDockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List(sessionCredential)) + val sessionContext: DockerInfoContext = DockerInfoContext(sessionDockerRequest, null) + registry.getAliyunCredentialFromContext(sessionContext) shouldEqual Some(sessionCredential) + + val invalidDockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List.empty) + val invalidContext: DockerInfoContext = DockerInfoContext(invalidDockerRequest, null) + registry.getAliyunCredentialFromContext(invalidContext) shouldEqual None + } + +} diff --git a/docs/backends/BCS.md b/docs/backends/BCS.md index 04c322c056e..32b722a3a32 100644 --- a/docs/backends/BCS.md +++ b/docs/backends/BCS.md @@ -73,6 +73,7 @@ the configuration key `backend.providers.BCS.config.filesystems.auth` in order t - `` - API endpoint to access OSS bucket ``. - `` - Access ID to access Alibaba Cloud services through restful API. - `` - Access key to access Alibaba Cloud services through restful API. +- `` - The interval of auth refreshing if you are using an STS(Alibaba Cloud Security Token Service) way to access the OSS filesystem. ```hocon backend { @@ -88,6 +89,7 @@ backend { access-id = "" access-key = "" } + refresh-interval = 1800 } } @@ -115,12 +117,12 @@ backend { default-runtime-attributes { cluster: "OnDemand ecs.sn1ne.large " mounts: "oss:///inputs/ /home/inputs/ false" - docker: "ubuntu/latest oss:///registry/ubuntu/" + dockerTag: "ubuntu/latest oss:///registry/ubuntu/" + docker: "registry.cn-shanghai.aliyuncs.com/batchcompute/myubuntu:0.2" userData: "key value" reserveOnFail: true autoReleaseJob: true verbose: false - workerPath: "oss:///cromwell_test/worker.tar.gz" systemDisk: "cloud 50" dataDisk: "cloud 250 /home/data/" timeout: 3000 @@ -158,11 +160,13 @@ There are two different ways of specifying an Alibaba Cloud BatchCompute cluster #### mounts -BCS jobs can mount an OSS object or an OSS prefix to local filesystem as a file or a directory in VM. +BCS jobs can mount both OSS and [Alibaba Cloud NAS](https://www.aliyun.com/product/nas) to local filesystem as a file or a directory in VM. It uses distribute-caching and lazy-load techniques to optimize concurrently read requests of the OSS file system. You can mount your OSS objects to VM like this: -- `` - An OSS object path or OSS prefix to mount from. +- `` - An OSS object path or OSS prefix or NAS address to mount from, such as + `oss:///inputs/ /home/inputs/ false` for OSS + and `nas://0266e49fea-yio75.cn-beijing.nas.aliyuncs.com:/ /home/nas/ true` for NAS. See the [NAS mount](https://www.alibabacloud.com/help/doc-detail/50494.htm) for more details of NAS mount. - `` - An unix file path or directory path to mount to in VM. - `` - Writable for mount destination, only works for directory. @@ -176,17 +180,28 @@ default-runtime-attributes { #### docker -This backend supports docker images pulled from OSS registry. +This backend supports docker images pulled from OSS registry or Alibaba Cloud Container Registry. +##### OSS registry ```hocon default-runtime-attributes { - docker: " " + dockerTag: " " } ``` - `` - Docker image name such as: ubuntu:latest. - `` - Image path in OSS filesyetem where you pushed your docker image. + +##### Alibaba Cloud Container Registry + +```hocon +default-runtime-attributes { + docker: "" +} +``` +- `docker-image-with-tag` - Docker image stored in Alibaba Cloud Container Registry, such as `registry.cn-shanghai.aliyuncs.com/batchcompute/myubuntu:0.2`. + #### userData If a runtime cluster is specified, it's possible to pass some environment variables to VM when running BCS jobs. @@ -210,19 +225,6 @@ finishes by setting `autoReleaseJob` to `false`: } ``` -#### workerPath - -This backend needs a worker package to run workflow job. We have prepared it, but it's still necessary for you to upload it to OSS and -specify the object path as the value of runtime attributes key `workerPath`: - -- `` - The oss object path which you upload worker package to. A string like `oss:///worker.tar.gz` - -```hocon - default-runtime-attributes { - workerPath: "" - } -``` - #### systemDisk If it's necessary to run a job with a particular system disk type or disk size, a runtime attribute named `systemDisk` can be used to @@ -250,3 +252,55 @@ The system disk size can support up to 500GB. One can mount another data disk in dataDisk: " " } ``` + +###CallCaching +BCS supports CallCaching feature when the docker image is from Alibaba Cloud Container Registry. +The configuration file will look like the following: +```hocon +call-caching { + enabled = true + invalidate-bad-cache-results = true + +} + +docker { + hash-lookup { + enabled = true + method = "remote" + alibabacloudcr { + num-threads = 5 + auth { + access-id = xxxx + access-key = yyyy + security-token = zzzz + } + } + } +} + +backend { + providers { + BCS { + config { + # BCS and OSS related configurations mentioned above + filesystems { + oss { + caching { + duplication-strategy = "reference" + invalidate-bad-cache-results = true + } + # ... to be filled in + } + } + default-runtime-attributes { + docker: "registry.cn-shanghai.aliyuncs.com/batchcompute/myubuntu:0.2" + # ... to be filled in + } + } + } + } +} +``` + +- `docker.hash-lookup.method` - BCS only supports `remote` method for hash-lookup +- `filesystems.oss.caching.duplication-strategy` - BCS only supports `reference` for duplication strategy. \ No newline at end of file diff --git a/docs/cromwell_features/CallCaching.md b/docs/cromwell_features/CallCaching.md index da0111057b0..57ac984fac0 100644 --- a/docs/cromwell_features/CallCaching.md +++ b/docs/cromwell_features/CallCaching.md @@ -123,12 +123,13 @@ Cromwell provides two methods to lookup a Docker hash from a Docker tag: Docker registry and access levels supported by Cromwell for docker digest lookup in "remote" mode: - | | DockerHub || GCR || ECR || - |:-----:|:---------:|:-------:|:------:|:-------:|:------:|:-------:| - | | Public | Private | Public | Private | Public | Private | - | Pipelines API | X | X | X | X | | | - | AWS Batch | X | | X | | | | - | Other | X | | X | | | | + | | DockerHub || GCR || ECR || AlibabaCloudCR || + |:-----:|:---------:|:-------:|:------:|:-------:|:------:|:-------:|:------:|:-------:| + | | Public | Private | Public | Private | Public | Private | Public | Private | + | Pipelines API | X | X | X | X | | | | | + | AWS Batch | X | | X | | | | | | + | BCS | | | | | | | | X | + | Other | X | | X | | | | | | **Runtime Attributes** diff --git a/docs/tutorials/BCSIntro.md b/docs/tutorials/BCSIntro.md index 6ba9a0b9aa2..11b55075a6d 100644 --- a/docs/tutorials/BCSIntro.md +++ b/docs/tutorials/BCSIntro.md @@ -91,6 +91,8 @@ backend { } default-runtime-attributes { + failOnStderr: false + continueOnReturnCode: 0 cluster: "OnDemand ecs.sn1ne.large img-ubuntu" vpc: "192.168.0.0/16" } diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index e5bc8a2b135..cb57f69eff0 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -15,6 +15,7 @@ import cromwell.engine.io.{IoAttempts, IoCommandContext} import cromwell.filesystems.drs.DrsPath import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.s3.S3Path +import cromwell.filesystems.oss.OssPath import cromwell.util.TryWithResource._ import scala.concurrent.ExecutionContext @@ -111,6 +112,7 @@ class NioFlow(parallelism: Int, case gcsPath: GcsPath => IO { gcsPath.cloudStorage.get(gcsPath.blob).getCrc32c } case drsPath: DrsPath => getFileHashForDrsPath(drsPath) case s3Path: S3Path => IO { s3Path.eTag } + case ossPath: OssPath => IO { ossPath.eTag} case path => IO.fromEither( tryWithResource(() => path.newInputStream) { inputStream => diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilder.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilder.scala index a17d9371d29..8792194f804 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilder.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilder.scala @@ -3,10 +3,15 @@ package cromwell.filesystems.oss import java.net.URI import com.google.common.net.UrlEscapers +import com.typesafe.config.Config +import net.ceedubs.ficus.Ficus._ +import cats.syntax.apply._ +import com.aliyun.oss.OSSClient +import common.validation.Validation._ import cromwell.core.WorkflowOptions import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.oss.OssPathBuilder._ -import cromwell.filesystems.oss.nio.{OssStorageConfiguration, OssStorageFileSystem, OssStoragePath} +import cromwell.filesystems.oss.nio._ import scala.language.postfixOps import scala.util.matching.Regex @@ -78,16 +83,30 @@ object OssPathBuilder { nioPath.getFileSystem.provider().getScheme.equalsIgnoreCase(URI_SCHEME) } - def fromConfiguration(endpoint: String, - accessId: String, - accessKey: String, - securityToken: Option[String], + def fromConfiguration(configuration: OssStorageConfiguration, options: WorkflowOptions): OssPathBuilder = { - - val configuration = OssStorageConfiguration(endpoint, accessId, accessKey, securityToken) - OssPathBuilder(configuration) } + + def fromConfig(config: Config, options: WorkflowOptions): OssPathBuilder = { + val refresh = config.as[Option[Long]](TTLOssStorageConfiguration.RefreshInterval) + + val (endpoint, accessId, accessKey, securityToken) = ( + validate { config.as[String]("auth.endpoint") }, + validate { config.as[String]("auth.access-id") }, + validate { config.as[String]("auth.access-key") }, + validate { config.as[Option[String]]("auth.security-token") } + ).tupled.unsafe("OSS filesystem configuration is invalid") + + refresh match { + case None => + val cfg = DefaultOssStorageConfiguration(endpoint, accessId, accessKey, securityToken) + fromConfiguration(cfg, options) + case Some(_) => + val cfg = TTLOssStorageConfiguration(config) + fromConfiguration(cfg, options) + } + } } final case class OssPathBuilder(ossStorageConfiguration: OssStorageConfiguration) extends PathBuilder { @@ -95,8 +114,8 @@ final case class OssPathBuilder(ossStorageConfiguration: OssStorageConfiguration validateOssPath(string) match { case ValidFullOssPath(bucket, path) => Try { - val nioPath = OssStorageFileSystem(bucket, ossStorageConfiguration).getPath(path) - OssPath(nioPath) + val ossStorageFileSystem = OssStorageFileSystem(bucket, ossStorageConfiguration) + OssPath(ossStorageFileSystem.getPath(path), ossStorageFileSystem.provider.ossClient) } case PossiblyValidRelativeOssPath => Failure(new IllegalArgumentException(s"$string does not have a oss scheme")) case invalid: InvalidOssPath => Failure(new IllegalArgumentException(invalid.errorMessage)) @@ -108,10 +127,11 @@ final case class OssPathBuilder(ossStorageConfiguration: OssStorageConfiguration final case class BucketAndObj(bucket: String, obj: String) -final case class OssPath private[oss](nioPath: NioPath) extends Path { +final case class OssPath private[oss](nioPath: NioPath, + ossClient: OSSClient) extends Path { override protected def newPath(path: NioPath): OssPath = { - OssPath(path) + OssPath(path, ossClient) } override def pathAsString: String = ossStoragePath.pathAsString @@ -128,6 +148,8 @@ final case class OssPath private[oss](nioPath: NioPath) extends Path { ossStoragePath.key } + lazy val eTag = ossClient.getSimplifiedObjectMeta(bucket, key).getETag + def ossStoragePath: OssStoragePath = nioPath match { case ossPath: OssStoragePath => ossPath case _ => throw new RuntimeException(s"Internal path was not a cloud storage path: $nioPath") diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilderFactory.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilderFactory.scala index a0afe1f9438..666ba1c4849 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilderFactory.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/OssPathBuilderFactory.scala @@ -1,24 +1,14 @@ package cromwell.filesystems.oss import akka.actor.ActorSystem -import cats.syntax.apply._ import com.typesafe.config.Config -import common.validation.Validation._ import cromwell.core.WorkflowOptions import cromwell.core.path.PathBuilderFactory -import net.ceedubs.ficus.Ficus._ import scala.concurrent.{ExecutionContext, Future} final case class OssPathBuilderFactory(globalConfig: Config, instanceConfig: Config) extends PathBuilderFactory { - val (endpoint, accessId, accessKey, securityToken) = ( - validate { instanceConfig.as[String]("auth.endpoint") }, - validate { instanceConfig.as[String]("auth.access-id") }, - validate { instanceConfig.as[String]("auth.access-key") }, - validate { instanceConfig.as[Option[String]]("auth.security-token") } - ).tupled.unsafe("OSS filesystem configuration is invalid") - def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext) = { - Future.successful(OssPathBuilder.fromConfiguration(endpoint, accessId, accessKey, securityToken, options)) + Future.successful(OssPathBuilder.fromConfig(instanceConfig, options)) } } diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchCommandBuilder.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchCommandBuilder.scala new file mode 100644 index 00000000000..70abe012431 --- /dev/null +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchCommandBuilder.scala @@ -0,0 +1,32 @@ +package cromwell.filesystems.oss.batch + +import cromwell.core.io._ +import cromwell.filesystems.oss.OssPath + +private case object PartialOssBatchCommandBuilder extends PartialIoCommandBuilder { + override def sizeCommand = { + case ossPath: OssPath => OssBatchSizeCommand(ossPath) + } + + override def deleteCommand = { + case (ossPath: OssPath, swallowIoExceptions) => OssBatchDeleteCommand(ossPath, swallowIoExceptions) + } + + override def copyCommand = { + case (ossSrc: OssPath, ossDest: OssPath, overwrite) => OssBatchCopyCommand(ossSrc, ossDest, overwrite) + } + + override def hashCommand = { + case ossPath: OssPath => OssBatchEtagCommand(ossPath) + } + + override def touchCommand = { + case ossPath: OssPath => OssBatchTouchCommand(ossPath) + } + + override def existsCommand = { + case ossPath: OssPath => OssBatchExistsCommand(ossPath) + } +} + +case object OssBatchCommandBuilder extends IoCommandBuilder(List(PartialOssBatchCommandBuilder)) diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchIoCommand.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchIoCommand.scala new file mode 100644 index 00000000000..c4f2643a5fd --- /dev/null +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/batch/OssBatchIoCommand.scala @@ -0,0 +1,94 @@ +package cromwell.filesystems.oss.batch + +import com.aliyun.oss.OSSException +import com.aliyun.oss.model._ + +import com.google.api.client.http.HttpHeaders + +import cromwell.core.io._ +import cromwell.filesystems.oss._ + +/** + * Io commands with OSS paths and some logic enabling batching of request. + * @tparam T Return type of the IoCommand + * @tparam U Return type of the OSS response + */ +sealed trait OssBatchIoCommand[T, U] extends IoCommand[T] { + /** + * StorageRequest operation to be executed by this command + */ + def operation: Any + + /** + * Maps the Oss response of type U to the Cromwell Io response of type T + */ + protected def mapOssResponse(response: U): T + + /** + * Method called in the success callback of a batched request to decide what to do next. + * Returns an Either[T, OssBatchIoCommand[T, U]] + * Left(value) means the command is complete, and the result can be sent back to the sender. + * Right(newCommand) means the command is not complete and needs another request to be executed. + * Most commands will reply with Left(value). + */ + def onSuccess(response: U, httpHeaders: HttpHeaders): Either[T, OssBatchIoCommand[T, U]] = { + Left(mapOssResponse(response)) + } + + /** + * Override to handle a failure differently and potentially return a successful response. + */ + def onFailure(ossError: OSSException): Option[Either[T, OssBatchIoCommand[T, U]]] = None +} + +case class OssBatchCopyCommand( + override val source: OssPath, + override val destination: OssPath, + override val overwrite: Boolean + ) extends IoCopyCommand(source, destination, overwrite) with OssBatchIoCommand[Unit, CopyObjectResult] { + override def operation: GenericResult = { + val getObjectRequest = new CopyObjectRequest(source.bucket, source.key, destination.bucket, destination.key) + // TODO: Copy other attributes (encryption, metadata, etc.) + source.ossClient.copyObject(getObjectRequest) + } + + + override def mapOssResponse(response: CopyObjectResult): Unit = () +} + +case class OssBatchDeleteCommand( + override val file: OssPath, + override val swallowIOExceptions: Boolean + ) extends IoDeleteCommand(file, swallowIOExceptions) with OssBatchIoCommand[Unit, Void] { + def operation = file.ossClient.deleteObject(file.bucket, file.key) + override protected def mapOssResponse(response: Void): Unit = () +} + +/** + * Base trait for commands that use the headObject() operation. (e.g: size, crc32, ...) + */ +sealed trait OssBatchHeadCommand[T] extends OssBatchIoCommand[T, ObjectMetadata] { + def file: OssPath + + override def operation: ObjectMetadata = file.ossClient.getObjectMetadata(file.bucket, file.key) +} + +case class OssBatchSizeCommand(override val file: OssPath) extends IoSizeCommand(file) with OssBatchHeadCommand[Long] { + override def mapOssResponse(response: ObjectMetadata): Long = response.getContentLength +} + +case class OssBatchEtagCommand(override val file: OssPath) extends IoHashCommand(file) with OssBatchHeadCommand[String] { + override def mapOssResponse(response: ObjectMetadata): String = response.getETag +} + +case class OssBatchTouchCommand(override val file: OssPath) extends IoTouchCommand(file) with OssBatchHeadCommand[Unit] { + override def mapOssResponse(response: ObjectMetadata): Unit = () +} + +case class OssBatchExistsCommand(override val file: OssPath) extends IoExistsCommand(file) with OssBatchIoCommand[Boolean, Boolean] { + override def operation: Boolean = { + file.ossClient.doesObjectExist(file.bucket, file.key) + } + + override def mapOssResponse(response: Boolean): Boolean = response +} diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssFileReadChannel.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssFileReadChannel.scala index 3bd38efa317..1fb283b85e0 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssFileReadChannel.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssFileReadChannel.scala @@ -61,6 +61,7 @@ final case class OssFileReadChannel(ossClient: OSSClient, pos: Long, path: OssSt val channel = Channels.newChannel(in) val amt = channel.read(dst) + channel.close() internalPosition += amt amt } diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileAttributesView.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileAttributesView.scala index 49993e221ce..cba65649a98 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileAttributesView.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileAttributesView.scala @@ -2,9 +2,10 @@ package cromwell.filesystems.oss.nio import java.nio.file.NoSuchFileException import java.nio.file.attribute.{BasicFileAttributeView, FileTime} +import java.util.Date import com.aliyun.oss.OSSClient -import com.aliyun.oss.model.GenericRequest +import com.aliyun.oss.model.{CopyObjectRequest, CopyObjectResult, GenericRequest} import scala.util.Try @@ -33,5 +34,12 @@ final case class OssStorageFileAttributesView(ossClient: OSSClient, path: OssSto OssStorageObjectAttributes(objectMeta, path) } - override def setTimes(lastModifiedTime: FileTime, lastAccessTime: FileTime, createTime: FileTime): Unit = throw new UnsupportedOperationException("OSS object is immutable") + override def setTimes(lastModifiedTime: FileTime, lastAccessTime: FileTime, createTime: FileTime): Unit = { + val meta = ossClient.getObjectMetadata(path.bucket, path.key) + meta.setLastModified(new Date(lastModifiedTime.toMillis)) + + val copyReq = new CopyObjectRequest(path.bucket, path.key, path.bucket, path.key) + copyReq.setNewObjectMetadata(meta) + val _: CopyObjectResult = ossClient.copyObject(copyReq) + } } diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystem.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystem.scala index bb1199b2317..aa704561156 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystem.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystem.scala @@ -8,6 +8,7 @@ import java.{lang, util} import com.aliyun.oss.common.auth.DefaultCredentialProvider import com.aliyun.oss.{ClientConfiguration, OSSClient} +import cromwell.filesystems.oss.nio.OssStorageConfiguration.{ACCESS_ID_KEY, ACCESS_KEY_KEY, ENDPOINT_KEY, SECURITY_TOKEN_KEY} import scala.collection.JavaConverters._ @@ -52,7 +53,7 @@ object OssStorageConfiguration { case _ => None } - new OssStorageConfiguration(endpoint, accessId, accessKey, securityToken) + new DefaultOssStorageConfiguration(endpoint, accessId, accessKey, securityToken) } def getClient(map: Map[String, String]): OSSClient = { @@ -63,26 +64,28 @@ object OssStorageConfiguration { accessId: String, accessKey: String, stsToken: Option[String]): OSSClient = { - OssStorageConfiguration(endpoint, accessId, accessKey, stsToken).newOssClient() + DefaultOssStorageConfiguration(endpoint, accessId, accessKey, stsToken).newOssClient() } } -final case class OssStorageConfiguration(endpoint: String, - accessId: String, - accessKey: String, - stsToken: Option[String] = None - ) { - import OssStorageConfiguration._ +trait OssStorageConfiguration { + def endpoint: String + + def accessId: String + + def accessKey: String + + def securityToken: Option[String] def toMap: Map[String, String] = { val ret = Map(ENDPOINT_KEY -> endpoint, ACCESS_ID_KEY -> accessId, ACCESS_KEY_KEY -> accessKey) - val token = stsToken map {token => SECURITY_TOKEN_KEY -> token} + val token = securityToken map {token => SECURITY_TOKEN_KEY -> token} ret ++ token } def newOssClient() = { - val credentialsProvider = stsToken match { + val credentialsProvider = securityToken match { case Some(token: String) => new DefaultCredentialProvider(accessId, accessKey, token) case None => @@ -91,8 +94,11 @@ final case class OssStorageConfiguration(endpoint: String, val clientConfiguration = new ClientConfiguration new OSSClient(endpoint, credentialsProvider, clientConfiguration) } + } +case class DefaultOssStorageConfiguration(endpoint: String, accessId: String, accessKey: String, securityToken: Option[String] = None) extends OssStorageConfiguration {} + case class OssStorageFileSystem(bucket: String, config: OssStorageConfiguration) extends FileSystem { var internalProvider: OssStorageFileSystemProvider = OssStorageFileSystemProvider(config) diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystemProvider.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystemProvider.scala index 25ecbbd3cda..0f416b0f920 100644 --- a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystemProvider.scala +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/OssStorageFileSystemProvider.scala @@ -18,7 +18,7 @@ import collection.mutable.ArrayBuffer final case class OssStorageFileSystemProvider(config: OssStorageConfiguration) extends FileSystemProvider { - lazy val ossClient: OSSClient = config.newOssClient() + def ossClient: OSSClient = config.newOssClient() class PathIterator(ossClient: OSSClient, prefix: OssStoragePath, filter: DirectoryStream.Filter[_ >: Path]) extends AbstractIterator[Path] { var nextMarker: Option[String] = None @@ -139,7 +139,7 @@ final case class OssStorageFileSystemProvider(config: OssStorageConfiguration) e for (opt <- options.asScala) { opt match { case StandardOpenOption.READ => - case StandardOpenOption.WRITE => throw new IllegalArgumentException("WRITE byte channel not allowed currently") + case StandardOpenOption.WRITE => throw new IllegalArgumentException(s"WRITE byte channel not allowed currently, $path") case StandardOpenOption.SPARSE | StandardOpenOption.TRUNCATE_EXISTING => case StandardOpenOption.APPEND | StandardOpenOption.CREATE | StandardOpenOption.DELETE_ON_CLOSE | StandardOpenOption.CREATE_NEW | StandardOpenOption.DSYNC | StandardOpenOption.SYNC => throw new UnsupportedOperationException() @@ -149,6 +149,12 @@ final case class OssStorageFileSystemProvider(config: OssStorageConfiguration) e OssFileReadChannel(ossClient, 0, path.asInstanceOf[OssStoragePath]) } + def doesObjectExist(bucket: String, name: String): Boolean = { + val req = new GenericRequest(bucket, name) + req.setLogEnabled(false) + ossClient.doesBucketExist(req) + } + override def createDirectory(dir: Path, attrs: FileAttribute[_]*): Unit = {} override def deleteIfExists(path: Path): Boolean = { diff --git a/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfiguration.scala b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfiguration.scala new file mode 100644 index 00000000000..e87f59c25bb --- /dev/null +++ b/filesystems/oss/src/main/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfiguration.scala @@ -0,0 +1,47 @@ +package cromwell.filesystems.oss.nio + +import com.aliyun.oss.OSSClient +import com.typesafe.config.Config +import net.ceedubs.ficus.Ficus._ + +object TTLOssStorageConfiguration { + def currentTimestamp = System.currentTimeMillis / 1000 + + def defaultRefreshInterval: Long = 30 * 60 + + val RefreshInterval = "refresh-interval" + + def apply(config: Config): TTLOssStorageConfiguration = new TTLOssStorageConfiguration(config) +} + +/* Unsupported. For test purposes only. */ +class TTLOssStorageConfiguration(config: Config) extends OssStorageConfiguration { + + override def endpoint: String = config.as[Option[String]](authPath(OssStorageConfiguration.ENDPOINT_KEY)) getOrElse("") + + override def accessId: String = config.as[Option[String]](authPath(OssStorageConfiguration.ACCESS_ID_KEY)) getOrElse("") + + override def accessKey: String = config.as[Option[String]](authPath(OssStorageConfiguration.ACCESS_KEY_KEY)) getOrElse("") + + override def securityToken: Option[String] = config.as[Option[String]](authPath(OssStorageConfiguration.SECURITY_TOKEN_KEY)) + + def refreshInterval: Long = config.as[Option[Long]](TTLOssStorageConfiguration.RefreshInterval).getOrElse(TTLOssStorageConfiguration.defaultRefreshInterval) + + private def authPath(key: String): String = s"auth.$key" + private var lastClientUpdateTime: Long = 0 + + private var oldClient: Option[OSSClient] = None + + override def newOssClient(): OSSClient = { + val current = TTLOssStorageConfiguration.currentTimestamp + synchronized { + if (lastClientUpdateTime == 0 || current - lastClientUpdateTime > refreshInterval) { + oldClient = Option(super.newOssClient()) + lastClientUpdateTime = current + } + } + + oldClient getOrElse(throw new IllegalArgumentException("Non oss client")) + } +} + diff --git a/filesystems/oss/src/test/scala/cromwell/filesystems/oss/OssPathBuilderSpec.scala b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/OssPathBuilderSpec.scala index c333494581e..4678f28d784 100644 --- a/filesystems/oss/src/test/scala/cromwell/filesystems/oss/OssPathBuilderSpec.scala +++ b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/OssPathBuilderSpec.scala @@ -1,13 +1,47 @@ package cromwell.filesystems.oss +import com.typesafe.config.ConfigFactory import cromwell.core.TestKitSuite import cromwell.filesystems.oss.nio.OssNioUtilSpec import org.scalatest.{BeforeAndAfter, FlatSpecLike, Matchers} import org.scalatest.TryValues._ +object OssPathBuilderSpec { + + val BcsBackendConfigWithRefreshString = + s""" + | refresh-interval = 1800 + | auth { + | endpoint = "oss-cn-shanghai.aliyuncs.com" + | access-id = "test-access-id" + | access-key = "test-access-key" + | security-token = "test-security-token" + | } + | caching { + | duplication-strategy = "reference" + | } + """.stripMargin + + val BcsBackendConfigWithRefresh = ConfigFactory.parseString(BcsBackendConfigWithRefreshString) + + val BcsBackendConfigWithoutRefreshString = + s""" + | auth { + | endpoint = "oss-cn-shanghai.aliyuncs.com" + | access-id = "test-access-id" + | access-key = "test-access-key" + | } + | caching { + | duplication-strategy = "reference" + | } + """.stripMargin + + val BcsBackendConfigWithoutRefresh = ConfigFactory.parseString(BcsBackendConfigWithoutRefreshString) +} + class OssPathBuilderSpec extends TestKitSuite with FlatSpecLike with Matchers with OssNioUtilSpec with BeforeAndAfter { - behavior of s"OssPathBuilerSpec" + behavior of "OssPathBuilerSpec" val testPathBuiler = OssPathBuilder(mockOssConf) it should "throw when no bucket in URI" in { @@ -32,4 +66,15 @@ class OssPathBuilderSpec extends TestKitSuite with FlatSpecLike with Matchers wi path.pathAsString shouldBe s"oss://$bucket$fileName" path.pathWithoutScheme shouldBe s"$bucket$fileName" } + + it should "success from config" in { + val ossPathBuilder = OssPathBuilder.fromConfig(OssPathBuilderSpec.BcsBackendConfigWithRefresh, null) + ossPathBuilder.build(s"oss://$bucket").success.value.bucket shouldBe bucket + ossPathBuilder.build(s"oss://$bucket").success.value.key shouldBe empty + + val ossPathBuilderWithoutRefresh = OssPathBuilder.fromConfig(OssPathBuilderSpec.BcsBackendConfigWithoutRefresh, null) + ossPathBuilderWithoutRefresh.build(s"oss://$bucket").success.value.bucket shouldBe bucket + ossPathBuilderWithoutRefresh.build(s"oss://$bucket").success.value.key shouldBe empty + } + } diff --git a/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/OssNioUtilSpec.scala b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/OssNioUtilSpec.scala index 93536fb2ed1..41e25ba58dd 100644 --- a/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/OssNioUtilSpec.scala +++ b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/OssNioUtilSpec.scala @@ -54,7 +54,7 @@ trait OssNioUtilSpec extends FlatSpecLike with MockitoSugar with Matchers { OssStorageConfiguration.parseMap(ossInfo) } getOrElse(throw new IllegalArgumentException("you should supply oss info before testing oss related operation")) - lazy val mockOssConf: OssStorageConfiguration = new OssStorageConfiguration("mock-endpoint", "mock-id", "mock-key", None) + lazy val mockOssConf: OssStorageConfiguration = new DefaultOssStorageConfiguration("mock-endpoint", "mock-id", "mock-key", None) lazy val ossProvider = OssStorageFileSystemProvider(ossConf) lazy val mockProvider = OssStorageFileSystemProvider(mockOssConf) diff --git a/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfigurationSpec.scala b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfigurationSpec.scala new file mode 100644 index 00000000000..afddcfb0edb --- /dev/null +++ b/filesystems/oss/src/test/scala/cromwell/filesystems/oss/nio/TTLOssStorageConfigurationSpec.scala @@ -0,0 +1,50 @@ +package cromwell.filesystems.oss.nio + +import java.net.URI +import com.typesafe.config.ConfigFactory +import cromwell.core.TestKitSuite +import org.scalatest.{BeforeAndAfter, FlatSpecLike, Matchers} +import org.scalatest.mockito.MockitoSugar + + +object TTLOssStorageConfigurationSpec { + + val BcsBackendConfigString = + s""" + | auth { + | endpoint = "oss-cn-shanghai.aliyuncs.com" + | access-id = "test-access-id" + | access-key = "test-access-key" + | security-token = "test-security-token" + | } + | caching { + | duplication-strategy = "reference" + | } + """.stripMargin + + val BcsBackendConfig = ConfigFactory.parseString(BcsBackendConfigString) +} + +class TTLOssStorageConfigurationSpec extends TestKitSuite with FlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter { + val expectedEndpoint = "oss-cn-shanghai.aliyuncs.com" + val expectedAccessId = "test-access-id" + val expectedAccessKey = "test-access-key" + val expectedToken = Some("test-security-token") + val expectedFullEndpoint = URI.create("http://oss-cn-shanghai.aliyuncs.com") + + behavior of "TTLOssStorageConfiguration" + + + it should "have correct OSS credential info" in { + + val ossConfig = TTLOssStorageConfiguration(TTLOssStorageConfigurationSpec.BcsBackendConfig) + + ossConfig.endpoint shouldEqual expectedEndpoint + ossConfig.accessId shouldEqual expectedAccessId + ossConfig.accessKey shouldEqual expectedAccessKey + ossConfig.securityToken shouldEqual expectedToken + + ossConfig.newOssClient().getEndpoint shouldEqual expectedFullEndpoint + + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a375ca2b47e..0a909cb675a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,8 +4,9 @@ object Dependencies { private val akkaHttpCirceIntegrationV = "1.24.3" private val akkaHttpV = "10.1.7" private val akkaV = "2.5.19" - private val aliyunBcsV = "6.0.6" + private val aliyunBcsV = "6.1.0" private val aliyunCoreV = "4.3.2" + private val aliyunCrV = "3.0.0" private val aliyunOssV = "3.4.0" private val ammoniteOpsV = "1.6.3" private val apacheCommonNetV = "3.6" @@ -321,6 +322,15 @@ object Dependencies { exclude("jakarta.activation", "jakarta.activation-api"), ) + private val aliyunCrDependencies = List( + "com.aliyun" % "aliyun-java-sdk-cr" % aliyunCrV, + "com.aliyun" % "aliyun-java-sdk-core" % aliyunCoreV + exclude("javax.xml.bind", "jaxb-api") + exclude("com.sun.xml.bind", "jaxb-core") + exclude("javax.activation", "activation"), + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV + ) + private val dbmsDependencies = List( "org.hsqldb" % "hsqldb" % hsqldbV, "org.mariadb.jdbc" % "mariadb-java-client" % mariadbV, @@ -446,7 +456,7 @@ object Dependencies { val databaseMigrationDependencies = liquibaseDependencies ++ dbmsDependencies - val dockerHashingDependencies = http4sDependencies ++ circeDependencies + val dockerHashingDependencies = http4sDependencies ++ circeDependencies ++ aliyunCrDependencies val cromwellApiClientDependencies = List( "org.scalaz" %% "scalaz-core" % scalazV, diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsAsyncBackendJobExecutionActor.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsAsyncBackendJobExecutionActor.scala index 38757b6e735..5322d91b405 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsAsyncBackendJobExecutionActor.scala @@ -1,11 +1,10 @@ package cromwell.backend.impl.bcs -import java.io.FileNotFoundException - import better.files.File.OpenOptions import com.aliyuncs.batchcompute.main.v20151111.BatchComputeClient import com.aliyuncs.exceptions.{ClientException, ServerException} import common.collections.EnhancedCollections._ +import common.util.StringUtil._ import cromwell.backend._ import cromwell.backend.async.{ExecutionHandle, FailedNonRetryableExecutionHandle, PendingExecutionHandle} import cromwell.backend.impl.bcs.RunStatus.{Finished, TerminalRunStatus} @@ -15,10 +14,12 @@ import cromwell.core.retry.SimpleExponentialBackoff import cromwell.core.ExecutionEvent import cromwell.filesystems.oss.OssPath import wom.callable.Callable.OutputDefinition +import wom.callable.RuntimeEnvironment import wom.core.FullyQualifiedName import wom.expression.NoIoFunctionSet import wom.types.WomSingleFileType import wom.values._ +import mouse.all._ import scala.concurrent.Future import scala.concurrent.duration._ @@ -65,8 +66,8 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa val tmp = DefaultPathBuilder.get("/" + ossPath.pathWithoutScheme) val dir = tmp.getParent val local = BcsJobPaths.BcsTempInputDirectory.resolve(dir.pathAsString.md5SumShort).resolve(tmp.getFileName) - val ret = BcsInputMount(ossPath, local, writeSupport = false) - if (!inputMounts.exists(mount => mount.src == ossPath && mount.dest == local)) { + val ret = BcsInputMount(Left(ossPath), Left(local), writeSupport = false) + if (!inputMounts.exists(mount => mount.src == Left(ossPath) && mount.dest == Left(local))) { inputMounts :+= ret } @@ -74,7 +75,7 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa } private[bcs] def womFileToMount(file: WomFile): Option[BcsInputMount] = file match { - case path if userDefinedMounts exists(bcsMount => path.valueString.startsWith(bcsMount.src.pathAsString)) => None + case path if userDefinedMounts exists(bcsMount => path.valueString.startsWith(BcsMount.toString(bcsMount.src))) => None case path => PathFactory.buildPath(path.valueString, initializationData.pathBuilders) match { case ossPath: OssPath => Some(ossPathToMount(ossPath)) case _ => None @@ -146,8 +147,8 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa callRawOutputFiles.flatMap(_.flattenFiles).distinct flatMap { womFile => womFile match { case singleFile: WomSingleFile => List(generateBcsSingleFileOutput(singleFile)) - case _: WomGlobFile => throw new RuntimeException(s"glob output not supported currently") - case _: WomUnlistedDirectory => throw new RuntimeException(s"directory output not supported currently") + case globFile: WomGlobFile => generateBcsGlobFileOutputs(globFile) + case unlistedDirectory: WomUnlistedDirectory => generateUnlistedDirectoryOutputs(unlistedDirectory) } } } @@ -161,7 +162,34 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa val src = relativePath(wdlFile.valueString) - BcsOutputMount(src, destination, writeSupport = false) + BcsOutputMount(Left(src), Left(destination), writeSupport = false) + } + + protected def generateBcsGlobFileOutputs(womFile: WomGlobFile): List[BcsOutputMount] = { + val globName = GlobFunctions.globName(womFile.value) + val globDirectory = globName + "/" + val globListFile = globName + ".list" + val bcsGlobDirectoryDestinationPath = callRootPath.resolve(globDirectory) + val bcsGlobListFileDestinationPath = callRootPath.resolve(globListFile) + + // We need both the glob directory and the glob list: + List( + BcsOutputMount(Left(relativePath(globDirectory)), Left(bcsGlobDirectoryDestinationPath), writeSupport = false), + BcsOutputMount(Left(relativePath(globListFile)), Left(bcsGlobListFileDestinationPath), writeSupport = false) + ) + } + + private def generateUnlistedDirectoryOutputs(womFile: WomUnlistedDirectory): List[BcsOutputMount] = { + val directoryPath = womFile.value.ensureSlashed + val directoryListFile = womFile.value.ensureUnslashed + ".list" + val bcsDirDestinationPath = callRootPath.resolve(directoryPath) + val bcsListDestinationPath = callRootPath.resolve(directoryListFile) + + // We need both the collection directory and the collection list: + List( + BcsOutputMount(Left(relativePath(directoryPath)), Left(bcsDirDestinationPath), writeSupport = false), + BcsOutputMount(Left(relativePath(directoryListFile)), Left(bcsListDestinationPath), writeSupport = false) + ) } private[bcs] def getOssFileName(ossPath: OssPath): String = { @@ -174,17 +202,23 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa private[bcs] def localizeOssPath(ossPath: OssPath): String = { if (isOutputOssFileString(ossPath.pathAsString) && !ossPath.isAbsolute) { if (ossPath.exists) { - ossPathToMount(ossPath).dest.normalize().pathAsString + ossPathToMount(ossPath).dest match { + case Left(p) => p.normalize().pathAsString + case _ => throw new RuntimeException("only support oss") + } } else { commandDirectory.resolve(getOssFileName(ossPath)).normalize().pathAsString } } else { userDefinedMounts collectFirst { - case bcsMount: BcsMount if ossPath.pathAsString.startsWith(bcsMount.src.pathAsString) => - bcsMount.dest.resolve(ossPath.pathAsString.stripPrefix(bcsMount.src.pathAsString)).pathAsString + case bcsMount: BcsMount if ossPath.pathAsString.startsWith(BcsMount.toString(bcsMount.src)) => + bcsMount.dest match { + case Left(p) => p.resolve(ossPath.pathAsString.stripPrefix(BcsMount.toString(bcsMount.src))).pathAsString + case _ => throw new RuntimeException("only support oss") + } } getOrElse { val mount = ossPathToMount(ossPath) - mount.dest.pathAsString + BcsMount.toString(mount.dest) } } } @@ -197,7 +231,7 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa } } - override def mapCommandLineWomFile(womFile: WomFile): WomFile = { + private[bcs] def mapWomFile(womFile: WomFile): WomFile = { getPath(womFile.valueString) match { case Success(ossPath: OssPath) => WomFile(WomSingleFileType, localizeOssPath(ossPath)) @@ -207,6 +241,24 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa } } + override def preProcessWomFile(womFile: WomFile): WomFile = mapWomFile(womFile) + + override def mapCommandLineWomFile(womFile: WomFile): WomFile = mapWomFile(womFile) + + override def runtimeEnvironmentPathMapper(env: RuntimeEnvironment): RuntimeEnvironment = { + def localize(path: String): String = (WomSingleFile(path) |> mapRuntimeEnvs).valueString + env.copy(outputPath = env.outputPath |> localize, tempPath = env.tempPath |> localize) + } + + private[bcs] def mapRuntimeEnvs(womFile: WomSingleFile): WomFile = { + getPath(womFile.valueString) match { + case Success(ossPath: OssPath) => + WomFile(WomSingleFileType, BcsJobPaths.BcsCommandDirectory.resolve(ossPath.pathWithoutScheme).pathAsString) + case _ => womFile + } + + } + override def isTerminal(runStatus: RunStatus): Boolean = { runStatus match { case _ : TerminalRunStatus => true @@ -246,33 +298,17 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa } private[bcs] lazy val rcBcsOutput = BcsOutputMount( - commandDirectory.resolve(bcsJobPaths.returnCodeFilename), bcsJobPaths.returnCode, writeSupport = false) + Left(commandDirectory.resolve(bcsJobPaths.returnCodeFilename)), Left(bcsJobPaths.returnCode), writeSupport = false) private[bcs] lazy val stdoutBcsOutput = BcsOutputMount( - commandDirectory.resolve(bcsJobPaths.defaultStdoutFilename), standardPaths.output, writeSupport = false) + Left(commandDirectory.resolve(bcsJobPaths.defaultStdoutFilename)), Left(standardPaths.output), writeSupport = false) private[bcs] lazy val stderrBcsOutput = BcsOutputMount( - commandDirectory.resolve(bcsJobPaths.defaultStderrFilename), standardPaths.error, writeSupport = false) + Left(commandDirectory.resolve(bcsJobPaths.defaultStderrFilename)), Left(standardPaths.error), writeSupport = false) private[bcs] lazy val uploadBcsWorkerPackage = { - getPath(runtimeAttributes.workerPath.getOrElse(bcsJobPaths.workerFileName)) match { - case Success(ossPath: OssPath) => - if (ossPath.notExists) { - throw new FileNotFoundException(s"$ossPath") - } - ossPath - case Success(path: Path) => - if (path.notExists) { - throw new FileNotFoundException(s"$path") - } + bcsJobPaths.workerPath.writeByteArray(BcsJobCachingActorHelper.workerScript.getBytes)(OpenOptions.default) - if (bcsJobPaths.workerPath.notExists) { - val content = path.byteArray - bcsJobPaths.workerPath.writeByteArray(content)(OpenOptions.default) - } - - bcsJobPaths.workerPath - case _ => throw new RuntimeException(s"Invalid worker packer path") - } + bcsJobPaths.workerPath } override def executeAsync(): Future[ExecutionHandle] = { @@ -283,13 +319,15 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa setBcsVerbose() + val envs = bcsEnvs + val bcsJob = new BcsJob( jobName, jobTag, bcsCommandLine, uploadBcsWorkerPackage, bcsMounts, - bcsEnvs, + envs, runtimeAttributes, Some(bcsJobPaths.bcsStdoutPath), Some(bcsJobPaths.bcsStderrPath), @@ -315,7 +353,7 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa private[bcs] def wdlFileToOssPath(bcsOutputs: Seq[BcsMount])(wdlFile: WomFile): WomFile = { bcsOutputs collectFirst { - case bcsOutput if bcsOutput.src.pathAsString.endsWith(wdlFile.valueString) => WomFile(WomSingleFileType, bcsOutput.dest.pathAsString) + case bcsOutput if BcsMount.toString(bcsOutput.src).endsWith(wdlFile.valueString) => WomFile(WomSingleFileType, BcsMount.toString(bcsOutput.dest)) } getOrElse wdlFile } @@ -353,6 +391,7 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa throwable match { case _: ServerException => true case e: ClientException if e.getErrCode == "InternalError" => true + case e: ClientException if e.getErrCode.startsWith("Throttling") => true case _ => false } } @@ -364,16 +403,16 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa } } - private[bcs] lazy val bcsEnvs: Map[String, String] = Map( + private[bcs] lazy val bcsEnvs: Map[String, String] = { + val mount = ossPathToMount(bcsJobPaths.script.asInstanceOf[OssPath]) + + Map( BcsJobPaths.BcsEnvCwdKey -> commandDirectory.pathAsString, - BcsJobPaths.BcsEnvExecKey -> bcsJobPaths.script.pathAsString, + BcsJobPaths.BcsEnvExecKey -> BcsMount.toString(mount.dest), BcsJobPaths.BcsEnvStdoutKey -> commandDirectory.resolve(bcsJobPaths.defaultStdoutFilename).pathAsString, - BcsJobPaths.BcsEnvStderrKey -> commandDirectory.resolve(bcsJobPaths.defaultStderrFilename).pathAsString, - BcsConfiguration.OssEndpointKey -> bcsConfiguration.ossEndpoint, - BcsConfiguration.OssIdKey -> bcsConfiguration.ossAccessId, - BcsConfiguration.OssSecretKey -> bcsConfiguration.ossAccessKey, - BcsConfiguration.OssTokenKey -> bcsConfiguration.ossSecurityToken - ) + BcsJobPaths.BcsEnvStderrKey -> commandDirectory.resolve(bcsJobPaths.defaultStderrFilename).pathAsString + ) + } private[bcs] lazy val bcsMounts: Seq[BcsMount] ={ generateBcsInputs(jobDescriptor) diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsBackendLifecycleActorFactory.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsBackendLifecycleActorFactory.scala index ad6dece2f80..46cd4d13cc4 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsBackendLifecycleActorFactory.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsBackendLifecycleActorFactory.scala @@ -3,8 +3,14 @@ package cromwell.backend.impl.bcs import akka.actor.ActorRef import cromwell.backend.{BackendConfigurationDescriptor, BackendWorkflowDescriptor} import cromwell.backend.standard._ +import cromwell.backend.BackendInitializationData +import cromwell.backend.impl.bcs.callcaching.BcsBackendCacheHitCopyingActor +import cromwell.backend.standard.callcaching.StandardCacheHitCopyingActor import wom.graph.CommandCallNode +import scala.util.{Success, Try} + + final case class BcsBackendLifecycleActorFactory(val name: String, val configurationDescriptor: BackendConfigurationDescriptor) extends StandardLifecycleActorFactory { override lazy val initializationActorClass: Class[_ <: StandardInitializationActor] = classOf[BcsInitializationActor] @@ -17,4 +23,16 @@ final case class BcsBackendLifecycleActorFactory(val name: String, val configura override def workflowInitializationActorParams(workflowDescriptor: BackendWorkflowDescriptor, ioActor: ActorRef, calls: Set[CommandCallNode], serviceRegistryActor: ActorRef, restarting: Boolean): StandardInitializationActorParams = { BcsInitializationActorParams(workflowDescriptor, calls, bcsConfiguration, serviceRegistryActor) } + + override lazy val cacheHitCopyingActorClassOption: Option[Class[_ <: StandardCacheHitCopyingActor]] = { + Option(classOf[BcsBackendCacheHitCopyingActor]) + } + + override def dockerHashCredentials(workflowDescriptor: BackendWorkflowDescriptor, initializationData: Option[BackendInitializationData]) = { + Try(BackendInitializationData.as[BcsBackendInitializationData](initializationData)) match { + case Success(bcsData) => + List(bcsData.bcsConfiguration.dockerCredentials).flatten + case _ => List.empty[Any] + } + } } diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfiguration.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfiguration.scala index 8043491d4c4..15b12a72de8 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfiguration.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfiguration.scala @@ -8,7 +8,8 @@ final case class AutoClusterConfiguration(resourceType: String, instanceType: String, imageId: String, spotStrategy: Option[String] = None, - spotPriceLimit: Option[Float] = None) + spotPriceLimit: Option[Float] = None, + clusterId: Option[String] = None) object BcsClusterIdOrConfiguration { @@ -33,14 +34,23 @@ object BcsClusterIdOrConfiguration { val spotPattern = s"""$resourceAndInstanceAndImagePattern\\s+$spotStrategyPattern\\s+$spotPriceLimitPattern""".r + val attachClusterSimplePattern = s"""$instanceAndImagePattern\\s+$idPattern""".r + + val attachClusterPattern = s"""$resourceAndInstanceAndImagePattern\\s+$idPattern""".r + + val attachCllusterSpotPattern = s"""$spotPattern\\s+$idPattern""".r + def parse(cluster: String): Try[BcsClusterIdOrConfiguration] = { cluster match { case idPattern(clusterId) => Success(Left(clusterId)) case instanceAndImagePattern(instanceType, imageId) => Success(Right(AutoClusterConfiguration(defaultResourceType, instanceType, imageId))) + case attachClusterSimplePattern(instanceType, imageId, clusterId) =>Success(Right(AutoClusterConfiguration(defaultResourceType, instanceType, imageId, clusterId=Option(clusterId)))) case resourceAndInstanceAndImagePattern(resourceType, instanceType, imageId) => Success(Right(AutoClusterConfiguration(resourceType, instanceType, imageId))) - case spotPattern(resourceType, instanceType, imageId, spotStrategy, spotPriceLimit) => Success(Right(AutoClusterConfiguration(resourceType, instanceType, imageId, Some(spotStrategy), Some(spotPriceLimit.toFloat)))) - case _ => Failure(new IllegalArgumentException("must be some string like 'cls-xxxx' or 'OnDemand ecs.s1.large img-ubuntu'")) + case attachClusterPattern(resourceType, instanceType, imageId, clusterId) => Success(Right(AutoClusterConfiguration(resourceType, instanceType, imageId, clusterId = Option(clusterId)))) + case spotPattern(resourceType, instanceType, imageId, spotStrategy, spotPriceLimit) => Success(Right(AutoClusterConfiguration(resourceType, instanceType, imageId, Option(spotStrategy), Option(spotPriceLimit.toFloat)))) + case attachCllusterSpotPattern(resourceType, instanceType, imageId, spotStrategy, spotPriceLimit, clusterId) => Success(Right(AutoClusterConfiguration(resourceType, instanceType, imageId, Option(spotStrategy), Option(spotPriceLimit.toFloat), Option(clusterId)))) + case _ => Failure(new IllegalArgumentException("must be some string like 'cls-xxxx' or 'OnDemand ecs.s1.large img-ubuntu' or 'OnDemand ecs.s1.large img-ubuntu cls-xxxx'")) } } } diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsConfiguration.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsConfiguration.scala index a154e39863f..f0de1049e6e 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsConfiguration.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsConfiguration.scala @@ -1,9 +1,11 @@ package cromwell.backend.impl.bcs -import com.aliyuncs.batchcompute.main.v20151111.{BatchComputeClient} +import com.aliyuncs.auth.BasicCredentials +import com.aliyuncs.batchcompute.main.v20151111.BatchComputeClient import cromwell.backend.BackendConfigurationDescriptor import net.ceedubs.ficus.Ficus._ - +import cromwell.backend.impl.bcs.callcaching.{CopyCachedOutputs, UseOriginalCachedOutputs} +import cromwell.core.DockerConfiguration object BcsConfiguration{ val OssEndpointKey = "ossEndpoint" @@ -31,6 +33,25 @@ final class BcsConfiguration(val configurationDescriptor: BackendConfigurationDe val ossAccessKey = configurationDescriptor.backendConfig.as[Option[String]]("filesystems.oss.auth.access-key").getOrElse("") val ossSecurityToken = configurationDescriptor.backendConfig.as[Option[String]]("filesystems.oss.auth.security-token").getOrElse("") + val duplicationStrategy = { + configurationDescriptor.backendConfig.as[Option[String]]("filesystems.oss.caching.duplication-strategy").getOrElse("reference") match { + case "copy" => CopyCachedOutputs + case "reference" => UseOriginalCachedOutputs + case other => throw new IllegalArgumentException(s"Unrecognized caching duplication strategy: $other. Supported strategies are copy and reference. See reference.conf for more details.") + } + } + + lazy val dockerHashAccessId = DockerConfiguration.dockerHashLookupConfig.as[Option[String]]("alibabacloudcr.auth.access-id") + lazy val dockerHashAccessKey = DockerConfiguration.dockerHashLookupConfig.as[Option[String]]("alibabacloudcr.auth.access-key") + lazy val dockerHashSecurityToken = DockerConfiguration.dockerHashLookupConfig.as[Option[String]]("alibabacloudcr.auth.security-token") + + val dockerCredentials = { + for { + id <- dockerHashAccessId + key <- dockerHashAccessKey + } yield new BasicCredentials(id, key) + } + val bcsClient: Option[BatchComputeClient] = { val userDefinedRegion = for { region <- bcsUserDefinedRegion diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJob.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJob.scala index 62880d49aff..2dd005556c2 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJob.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJob.scala @@ -7,7 +7,7 @@ import cromwell.core.ExecutionEvent import cromwell.core.path.Path import collection.JavaConverters._ -import scala.util.Try +import scala.util.{Failure, Success, Try} object BcsJob{ val BcsDockerImageEnvKey = "BATCH_COMPUTE_DOCKER_IMAGE" @@ -40,7 +40,7 @@ final case class BcsJob(name: String, jobId } - def getStatus(jobId: String): Try[RunStatus] = { + def getStatus(jobId: String): Try[RunStatus] = Try{ val request: GetJobRequest = new GetJobRequest request.setJobId(jobId) val response: GetJobResponse = batchCompute.getJob(request) @@ -48,7 +48,10 @@ final case class BcsJob(name: String, val status = job.getState val message = job.getMessage val eventList = Seq[ExecutionEvent]() - RunStatusFactory.getStatus(jobId, status, Some(message), Some(eventList)) + RunStatusFactory.getStatus(jobId, status, Some(message), Some(eventList)) match { + case Success(status) => status + case Failure(e) => throw e + } } def cancel(jobId: String): Unit = { @@ -129,6 +132,7 @@ final case class BcsJob(name: String, lazyCmd.setEnvVars(environments.asJava) lazyCmd.setCommandLine(commandString) + dockers foreach {docker => lazyCmd.setDocker(docker)} stdoutPath foreach {path => parames.setStdoutRedirectPath(path.normalize().pathAsString + "/")} stderrPath foreach {path => parames.setStderrRedirectPath(path.normalize().pathAsString + "/")} @@ -136,10 +140,26 @@ final case class BcsJob(name: String, parames } - private[bcs] def environments: Map[String, String] = runtime.docker match { - case Some(docker: BcsDockerWithoutPath) => envs + (BcsJob.BcsDockerImageEnvKey -> docker.image) - case Some(docker: BcsDockerWithPath) => envs + (BcsJob.BcsDockerPathEnvKey -> docker.path) + (BcsJob.BcsDockerImageEnvKey -> docker.image) + private[bcs] def environments: Map[String, String] = { + runtime.docker match { + case None => + runtime.dockerTag match { + case Some(docker: BcsDockerWithoutPath) => envs + (BcsJob.BcsDockerImageEnvKey -> docker.image) + case Some(docker: BcsDockerWithPath) => envs + (BcsJob.BcsDockerPathEnvKey -> docker.path) + (BcsJob.BcsDockerImageEnvKey -> docker.image) + case _ => envs + } case _ => envs + } + } + + val dockers: Option[Command.Docker] = { + runtime.docker match { + case Some(docker: BcsDockerWithoutPath) => + val dockers = new Command.Docker + dockers.setImage(docker.image) + Some(dockers) + case _ => None + } } private[bcs] def jobDesc: JobDescription = { @@ -166,21 +186,20 @@ final case class BcsJob(name: String, val cluster = runtime.cluster getOrElse(throw new IllegalArgumentException("cluster id or auto cluster configuration is mandatory")) cluster.fold(handleClusterId, handleAutoCluster) + val mnts = new Mounts mounts foreach { case input: BcsInputMount => - var destStr = input.dest.pathAsString - if (input.src.pathAsString.endsWith("/") && !destStr.endsWith("/")) { - destStr += "/" - } - lazyTask.addInputMapping(input.src.pathAsString, destStr) + mnts.addEntries(input.toBcsMountEntry) case output: BcsOutputMount => - var srcStr = output.src.pathAsString - if (output.dest.pathAsString.endsWith("/") && !srcStr.endsWith("/")) { + var srcStr = BcsMount.toString(output.src) + if (BcsMount.toString(output.dest).endsWith("/") && !srcStr.endsWith("/")) { srcStr += "/" } - lazyTask.addOutputMapping(srcStr, output.dest.pathAsString) + lazyTask.addOutputMapping(srcStr, BcsMount.toString(output.dest)) } + lazyTask.setMounts(mnts) + lazyTask } @@ -192,6 +211,7 @@ final case class BcsJob(name: String, config.spotStrategy foreach {strategy => autoCluster.setSpotStrategy(strategy)} config.spotPriceLimit foreach {priceLimit => autoCluster.setSpotPriceLimit(priceLimit)} + config.clusterId foreach {clusterId => autoCluster.setClusterId(clusterId)} runtime.reserveOnFail foreach {reserve => autoCluster.setReserveOnFail(reserve)} val userData = runtime.userData map {datas => Map(datas map {data => data.key -> data.value}: _*)} userData foreach {datas => autoCluster.setUserData(datas.asJava)} diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobCachingActorHelper.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobCachingActorHelper.scala index 1209b4cc256..e4aa2413f73 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobCachingActorHelper.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobCachingActorHelper.scala @@ -5,13 +5,31 @@ import cromwell.backend.standard.StandardCachingActorHelper import cromwell.core.logging.JobLogging import cromwell.core.path.Path +object BcsJobCachingActorHelper { + val workerScript: String = + s"""|#!/bin/bash + |export script=$$cwd/$$(basename $$exec) + |export rc=$$cwd/rc + | + |( + |mkdir -p $$cwd + |cp -rf $$exec $$script + |cd $$cwd + |/bin/bash -c $$script + |) + """.stripMargin +} + trait BcsJobCachingActorHelper extends StandardCachingActorHelper { this: Actor with JobLogging => + + bcsWorkflowPaths.tag = runtimeAttributes.tag.getOrElse("") + lazy val initializationData: BcsBackendInitializationData = { backendInitializationDataAs[BcsBackendInitializationData] } - lazy val bcsClient = initializationData.bcsConfiguration.bcsClient.getOrElse(throw new RuntimeException("no bcs client available")) + def bcsClient = initializationData.bcsConfiguration.bcsClient.getOrElse(throw new RuntimeException("no bcs client available")) lazy val bcsWorkflowPaths: BcsWorkflowPaths = workflowPaths.asInstanceOf[BcsWorkflowPaths] @@ -30,5 +48,5 @@ trait BcsJobCachingActorHelper extends StandardCachingActorHelper { lazy val bcsStderrFile: Path = standardPaths.error //lazy val bcsCommandLine = "bash -c $(pwd)/cromwell_bcs && sync" - lazy val bcsCommandLine = "python -u cromwell_bcs.py" + lazy val bcsCommandLine = "./worker" } diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobPaths.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobPaths.scala index ea30faacfcd..d0951aee579 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobPaths.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsJobPaths.scala @@ -20,8 +20,7 @@ final case class BcsJobPaths(workflowPaths: BcsWorkflowPaths, jobKey: BackendJob import BcsJobPaths._ - // alibaba cloud's batchcompute service can only support tar.gz formatted package. - val workerFileName = "worker.tar.gz" + val workerFileName = "worker" val workerPath = callRoot.resolve(workerFileName) val bcsStdoutPath = callRoot.resolve(BcsStdoutRedirectPath) val bcsStderrPath = callRoot.resolve(BcsStderrRedirectPath) diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsMount.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsMount.scala index da66778a722..757405d23fb 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsMount.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsMount.scala @@ -3,29 +3,44 @@ package cromwell.backend.impl.bcs import cats.data.Validated._ import cats.syntax.apply._ import cats.syntax.validated._ +import com.aliyuncs.batchcompute.pojo.v20151111.MountEntry import common.exception.MessageAggregation import common.validation.ErrorOr._ -import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder, PathFactory} +import cromwell.backend.impl.bcs.BcsMount.PathType +import cromwell.core.path.{Path, PathBuilder, PathFactory} -import scala.util.Try +import scala.util.{Success, Try} import scala.util.matching.Regex object BcsMount { + type PathType = Either[Path, String] + + def toString(p: PathType): String = { + p match { + case Left(p) => + p.pathAsString + case Right(s) => + return s + } + } + + val supportFileSystemTypes = List("oss", "nas", "smb", "lustre").mkString("|") + var pathBuilders: List[PathBuilder] = List() - val ossPrefix = """oss://[^\s]+""" + val remotePrefix = s"""(?:$supportFileSystemTypes)""" + """://[^\s]+""" val localPath = """/[^\s]+""" val writeSupport = """true|false""" - val inputMountPattern: Regex = s"""($ossPrefix)\\s+($localPath)\\s+($writeSupport)""".r - val outputMountPattern: Regex = s"""($localPath)\\s+($ossPrefix)\\s+($writeSupport)""".r + val inputMountPattern: Regex = s"""($remotePrefix)\\s+($localPath)\\s+($writeSupport)""".r + val outputMountPattern: Regex = s"""($localPath)\\s+($remotePrefix)\\s+($writeSupport)""".r def parse(s: String): Try[BcsMount] = { val validation: ErrorOr[BcsMount] = s match { - case inputMountPattern(oss, local, writeSupport) => - (validateOss(oss), validateLocal(oss, local), validateBoolean(writeSupport)) mapN { (src, dest, ws) => new BcsInputMount(src, dest, ws)} + case inputMountPattern(remote, local, writeSupport) => + (validateRemote(remote), validateLocal(remote, local), validateBoolean(writeSupport)) mapN { (src, dest, ws) => new BcsInputMount(src, dest, ws)} case outputMountPattern(local, oss, writeSupport) => - (validateLocal(oss, local), validateOss(oss), validateBoolean(writeSupport)) mapN { (src, dest, ws) => new BcsOutputMount(src, dest, ws)} + (validateLocal(oss, local), validateRemote(oss), validateBoolean(writeSupport)) mapN { (src, dest, ws) => new BcsOutputMount(src, dest, ws)} case _ => s"Mount strings should be of the format 'oss://my-bucket/inputs/ /home/inputs/ true' or '/home/outputs/ oss://my-bucket/outputs/ false'".invalidNel } @@ -39,12 +54,22 @@ object BcsMount { }) } - private def validateOss(value: String): ErrorOr[Path] = { - PathFactory.buildPath(value, pathBuilders).validNel + private def validateRemote(value: String): ErrorOr[PathType] = { + Try(PathFactory.buildPath(value, pathBuilders)) match { + case Success(p) => + Left(p).validNel + case _ => + Right(value).validNel + } } - private def validateLocal(oss: String, local: String): ErrorOr[Path] = { - if (oss.endsWith("/") == local.endsWith("/")) { - DefaultPathBuilder.get(local).validNel + private def validateLocal(remote: String, local: String): ErrorOr[PathType] = { + if (remote.endsWith("/") == local.endsWith("/")) { + Try(PathFactory.buildPath(local, pathBuilders)) match { + case Success(p) => + Left(p).validNel + case _=> + Right(local).validNel + } } else { "oss and local path type not match".invalidNel } @@ -57,14 +82,46 @@ object BcsMount { case _: IllegalArgumentException => s"$value not convertible to a Boolean".invalidNel } } - } trait BcsMount { - var src: Path - var dest: Path + import BcsMount._ + var src: PathType + var dest: PathType var writeSupport: Boolean + + def toBcsMountEntry: MountEntry } -final case class BcsInputMount(var src: Path, var dest: Path, var writeSupport: Boolean) extends BcsMount -final case class BcsOutputMount(var src: Path, var dest: Path, var writeSupport: Boolean) extends BcsMount \ No newline at end of file +final case class BcsInputMount(var src: PathType, var dest: PathType, var writeSupport: Boolean) extends BcsMount { + def toBcsMountEntry: MountEntry = { + var destStr = BcsMount.toString(dest) + if (BcsMount.toString(src).endsWith("/") && !destStr.endsWith("/")) { + destStr += "/" + } + + val entry = new MountEntry + entry.setSource(BcsMount.toString(src)) + entry.setDestination(destStr) + entry.setWriteSupport(writeSupport) + + entry + } + +} +final case class BcsOutputMount(var src: PathType, var dest: PathType, var writeSupport: Boolean) extends BcsMount { + def toBcsMountEntry: MountEntry = { + var srcStr = BcsMount.toString(src) + if (BcsMount.toString(dest).endsWith("/") && !srcStr.endsWith("/")) { + srcStr += "/" + } + + + val entry = new MountEntry + entry.setSource(srcStr) + entry.setDestination(BcsMount.toString(dest)) + entry.setWriteSupport(writeSupport) + + entry + } +} \ No newline at end of file diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributes.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributes.scala index a2b5ff1e0ac..8f6d02e99af 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributes.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributes.scala @@ -28,6 +28,7 @@ trait OptionalWithDefault[A] { } final case class BcsRuntimeAttributes(continueOnReturnCode: ContinueOnReturnCode, + dockerTag: Option[BcsDocker], docker: Option[BcsDocker], failOnStderr: Boolean, mounts: Option[Seq[BcsMount]], @@ -37,7 +38,6 @@ final case class BcsRuntimeAttributes(continueOnReturnCode: ContinueOnReturnCode dataDisk: Option[BcsDataDisk], reserveOnFail: Option[Boolean], autoReleaseJob: Option[Boolean], - workerPath: Option[String], timeout: Option[Int], verbose: Option[Boolean], vpc: Option[BcsVpcConfiguration], @@ -68,7 +68,9 @@ object BcsRuntimeAttributes { private def clusterValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[BcsClusterIdOrConfiguration] = ClusterValidation.optionalWithDefault(runtimeConfig) + private def dockerTagValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[BcsDocker] = DockerTagValidation.optionalWithDefault(runtimeConfig) private def dockerValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[BcsDocker] = DockerValidation.optionalWithDefault(runtimeConfig) + private def userDataValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Seq[BcsUserData]] = UserDataValidation.optionalWithDefault(runtimeConfig) private def systemDiskValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[BcsSystemDisk] = SystemDiskValidation.optionalWithDefault(runtimeConfig) @@ -80,8 +82,6 @@ object BcsRuntimeAttributes { private def mountsValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Seq[BcsMount]] = MountsValidation.optionalWithDefault(runtimeConfig) - private def workerPathValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[String] = WorkerPathValidation.optionalWithDefault(runtimeConfig) - private def timeoutValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Int] = TimeoutValidation.optionalWithDefault(runtimeConfig) private def verboseValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Boolean] = VerboseValidation.optionalWithDefault(runtimeConfig) @@ -90,6 +90,7 @@ object BcsRuntimeAttributes { private def tagValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[String] = TagValidation.optionalWithDefault(runtimeConfig) + def runtimeAttributesBuilder(backendRuntimeConfig: Option[Config]): StandardValidatedRuntimeAttributesBuilder = { val defaults = StandardValidatedRuntimeAttributesBuilder.default(backendRuntimeConfig).withValidation( mountsValidation(backendRuntimeConfig), @@ -99,7 +100,6 @@ object BcsRuntimeAttributes { dataDiskValidation(backendRuntimeConfig), reserveOnFailValidation(backendRuntimeConfig), autoReleaseJobValidation(backendRuntimeConfig), - workerPathValidation(backendRuntimeConfig), timeoutValidation(backendRuntimeConfig), verboseValidation(backendRuntimeConfig), vpcValidation(backendRuntimeConfig), @@ -110,7 +110,10 @@ object BcsRuntimeAttributes { if (backendRuntimeConfig.exists(_.getOrElse("ignoreDocker", false))) { defaults } else { - defaults.withValidation(dockerValidation(backendRuntimeConfig)) + defaults.withValidation( + dockerTagValidation(backendRuntimeConfig), + dockerValidation(backendRuntimeConfig) + ) } } @@ -123,13 +126,13 @@ object BcsRuntimeAttributes { val userData: Option[Seq[BcsUserData]] = RuntimeAttributesValidation.extractOption(userDataValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val cluster: Option[BcsClusterIdOrConfiguration] = RuntimeAttributesValidation.extractOption(clusterValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) + val dockerTag: Option[BcsDocker] = RuntimeAttributesValidation.extractOption(dockerTagValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val docker: Option[BcsDocker] = RuntimeAttributesValidation.extractOption(dockerValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val systemDisk: Option[BcsSystemDisk] = RuntimeAttributesValidation.extractOption(systemDiskValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val dataDisk: Option[BcsDataDisk] = RuntimeAttributesValidation.extractOption(dataDiskValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val reserveOnFail: Option[Boolean] = RuntimeAttributesValidation.extractOption(reserveOnFailValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val autoReleaseJob: Option[Boolean] = RuntimeAttributesValidation.extractOption(autoReleaseJobValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) - val workerPath: Option[String] = RuntimeAttributesValidation.extractOption(workerPathValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val timeout: Option[Int] = RuntimeAttributesValidation.extractOption(timeoutValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val verbose: Option[Boolean] = RuntimeAttributesValidation.extractOption(verboseValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val vpc: Option[BcsVpcConfiguration] = RuntimeAttributesValidation.extractOption(vpcValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) @@ -137,6 +140,7 @@ object BcsRuntimeAttributes { new BcsRuntimeAttributes( continueOnReturnCode, + dockerTag, docker, failOnStderr, mounts, @@ -146,7 +150,6 @@ object BcsRuntimeAttributes { dataDisk, reserveOnFail, autoReleaseJob, - workerPath, timeout, verbose, vpc, @@ -202,6 +205,8 @@ object UserDataValidation { class UserDataValidation(override val config: Option[Config]) extends RuntimeAttributesValidation[Seq[BcsUserData]] with OptionalWithDefault[Seq[BcsUserData]]{ override def key: String = BcsRuntimeAttributes.UserDataKey + override def usedInCallCaching: Boolean = true + override def coercion: Traversable[WomType] = Set(WomStringType, WomArrayType(WomStringType)) override protected def validateValue: PartialFunction[WomValue, ErrorOr[Seq[BcsUserData]]] = { @@ -235,24 +240,6 @@ class UserDataValidation(override val config: Option[Config]) extends RuntimeAtt s"Expecting $key runtime attribute to be a comma separated String or Array[String]" } -object WorkerPathValidation { - def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[String] = new WorkerPathValidation(config).optional -} - -class WorkerPathValidation(override val config: Option[Config]) extends StringRuntimeAttributesValidation("workerPath") with OptionalWithDefault[String] { - override protected def usedInCallCaching: Boolean = false - - override protected def missingValueMessage: String = "Can't find an attribute value for key worker path" - - override protected def invalidValueMessage(value: WomValue): String = super.missingValueMessage - - override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = { - case WomString(value) => value.validNel - } -} - - - object ReserveOnFailValidation { def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[Boolean] = new ReserveOnFailValidation(config).optional } @@ -286,6 +273,8 @@ class ClusterValidation(override val config: Option[Config]) extends RuntimeAttr { override def key: String = "cluster" + override def usedInCallCaching: Boolean = true + override def coercion: Traversable[WomType] = Set(WomStringType) override def validateValue: PartialFunction[WomValue, ErrorOr[BcsClusterIdOrConfiguration]] = { @@ -328,22 +317,32 @@ class DataDiskValidation(override val config: Option[Config]) extends RuntimeAtt } } -object DockerValidation { - def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[BcsDocker] = new DockerValidation(config).optional +object DockerTagValidation { + def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[BcsDocker] = new DockerTagValidation(config).optional } -class DockerValidation(override val config: Option[Config]) extends RuntimeAttributesValidation[BcsDocker] with OptionalWithDefault[BcsDocker] +class DockerTagValidation(override val config: Option[Config]) extends RuntimeAttributesValidation[BcsDocker] with OptionalWithDefault[BcsDocker] { - override def key: String = "docker" + override def key: String = "dockerTag" override def coercion: Traversable[WomType] = Set(WomStringType) override def validateValue: PartialFunction[WomValue, ErrorOr[BcsDocker]] = { case WomString(s) => BcsDocker.parse(s.toString) match { case Success(docker: BcsDocker) => docker.validNel - case _ => s"docker must be 'dockeImage dockerPath' like".invalidNel + case _ => s"docker must be 'dockerImage dockerPath' like".invalidNel } } } +object DockerValidation { + def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[BcsDocker] = new DockerValidation(config).optional +} + +class DockerValidation(override val config: Option[Config]) extends DockerTagValidation(config) +{ + override def key: String = "docker" + override def usedInCallCaching: Boolean = true +} + object VpcValidation { def optionalWithDefault(config: Option[Config]): OptionalRuntimeAttributesValidation[BcsVpcConfiguration] = new VpcValidation(config).optional } diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsWorkflowPaths.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsWorkflowPaths.scala index 11ad805788d..1082acaa0f9 100644 --- a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsWorkflowPaths.scala +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/BcsWorkflowPaths.scala @@ -3,23 +3,36 @@ package cromwell.backend.impl.bcs import com.typesafe.config.Config import cromwell.backend.io.WorkflowPaths import cromwell.backend.{BackendJobDescriptorKey, BackendWorkflowDescriptor} -import cromwell.core.path.{PathBuilder} +import cromwell.core.path.{Path, PathBuilder} +object BcsWorkflowPaths { + val WorkFlowTagKey = "bcs_workflow_tag" +} case class BcsWorkflowPaths(override val workflowDescriptor: BackendWorkflowDescriptor, override val config: Config, override val pathBuilders: List[PathBuilder] = WorkflowPaths.DefaultPathBuilders) extends WorkflowPaths { - + import BcsWorkflowPaths._ override def toJobPaths(workflowPaths: WorkflowPaths, jobKey: BackendJobDescriptorKey): BcsJobPaths = { new BcsJobPaths(workflowPaths.asInstanceOf[BcsWorkflowPaths], jobKey) } override protected def withDescriptor(workflowDescriptor: BackendWorkflowDescriptor): WorkflowPaths = this.copy(workflowDescriptor = workflowDescriptor) + override protected def workflowPathBuilder(root: Path): Path = { + workflowDescriptor.breadCrumbs.foldLeft(root)((acc, breadCrumb) => { + breadCrumb.toPath(acc) + }).resolve(workflowDescriptor.callable.name).resolve(tag).resolve(workflowDescriptor.id.toString + "/") + } + + var tag: String = { + workflowDescriptor.workflowOptions.get(WorkFlowTagKey).getOrElse("") + } + private[bcs] def getWorkflowInputMounts: BcsInputMount = { val src = workflowRoot val dest = BcsJobPaths.BcsTempInputDirectory.resolve(src.pathWithoutScheme) - BcsInputMount(src, dest, true) + BcsInputMount(Left(src), Left(dest), true) } } diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActor.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActor.scala new file mode 100644 index 00000000000..b225f32aa08 --- /dev/null +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActor.scala @@ -0,0 +1,69 @@ +package cromwell.backend.impl.bcs.callcaching + +import com.google.cloud.storage.contrib.nio.CloudStorageOptions +import common.util.TryUtil +import cromwell.backend.BackendInitializationData +import cromwell.backend.impl.bcs.BcsBackendInitializationData +import cromwell.backend.io.JobPaths +import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardCacheHitCopyingActorParams} +import cromwell.core.CallOutputs +import cromwell.core.io.{IoCommand, IoTouchCommand} +import cromwell.core.path.Path +import cromwell.core.simpleton.{WomValueBuilder, WomValueSimpleton} +import cromwell.filesystems.oss.batch.{OssBatchCommandBuilder} +import wom.values.WomFile + +import scala.language.postfixOps +import scala.util.Try + +class BcsBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams) { + override protected val commandBuilder = OssBatchCommandBuilder + private val cachingStrategy = BackendInitializationData + .as[BcsBackendInitializationData](standardParams.backendInitializationDataOption) + .bcsConfiguration.duplicationStrategy + + override def processSimpletons(womValueSimpletons: Seq[WomValueSimpleton], sourceCallRootPath: Path) = cachingStrategy match { + case CopyCachedOutputs => super.processSimpletons(womValueSimpletons, sourceCallRootPath) + case UseOriginalCachedOutputs => + val touchCommands: Seq[Try[IoTouchCommand]] = womValueSimpletons collect { + case WomValueSimpleton(_, wdlFile: WomFile) => getPath(wdlFile.value) map OssBatchCommandBuilder.touchCommand + } + + TryUtil.sequence(touchCommands) map { + WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, womValueSimpletons) -> _.toSet + } + } + + override def processDetritus(sourceJobDetritusFiles: Map[String, String]) = cachingStrategy match { + case CopyCachedOutputs => super.processDetritus(sourceJobDetritusFiles) + case UseOriginalCachedOutputs => + // apply getPath on each detritus string file + val detritusAsPaths = detritusFileKeys(sourceJobDetritusFiles).toSeq map { key => + key -> getPath(sourceJobDetritusFiles(key)) + } toMap + + // Don't forget to re-add the CallRootPathKey that has been filtered out by detritusFileKeys + TryUtil.sequenceMap(detritusAsPaths, "Failed to make paths out of job detritus") map { newDetritus => + (newDetritus + (JobPaths.CallRootPathKey -> destinationCallRootPath)) -> newDetritus.values.map(OssBatchCommandBuilder.touchCommand).toSet + } + } + + override protected def additionalIoCommands(sourceCallRootPath: Path, + originalSimpletons: Seq[WomValueSimpleton], + newOutputs: CallOutputs, + originalDetritus: Map[String, String], + newDetritus: Map[String, Path]): List[Set[IoCommand[_]]] = { + cachingStrategy match { + case UseOriginalCachedOutputs => + val content = + s""" + |This directory does not contain any output files because this job matched an identical job that was previously run, thus it was a cache-hit. + |Cromwell is configured to not copy outputs during call caching. To change this, edit the filesystems.gcs.caching.duplication-strategy field in your backend configuration. + |The original outputs can be found at this location: ${sourceCallRootPath.pathAsString} + """.stripMargin + + List(Set(OssBatchCommandBuilder.writeCommand(jobPaths.callExecutionRoot / "call_caching_placeholder.txt", content, Seq(CloudStorageOptions.withMimeType("text/plain"))))) + case CopyCachedOutputs => List.empty + } + } +} diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsCacheHitDuplicationStrategy.scala b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsCacheHitDuplicationStrategy.scala new file mode 100644 index 00000000000..55ba639d6fd --- /dev/null +++ b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/callcaching/BcsCacheHitDuplicationStrategy.scala @@ -0,0 +1,6 @@ +package cromwell.backend.impl.bcs.callcaching + +sealed trait BcsCacheHitDuplicationStrategy + +case object CopyCachedOutputs extends BcsCacheHitDuplicationStrategy +case object UseOriginalCachedOutputs extends BcsCacheHitDuplicationStrategy diff --git a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/worker.tar.gz b/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/worker.tar.gz deleted file mode 100644 index 3e1328f7417..00000000000 Binary files a/supportedBackends/bcs/src/main/scala/cromwell/backend/impl/bcs/worker.tar.gz and /dev/null differ diff --git a/supportedBackends/bcs/src/test/resources/application.conf b/supportedBackends/bcs/src/test/resources/application.conf new file mode 100644 index 00000000000..218d12ee470 --- /dev/null +++ b/supportedBackends/bcs/src/test/resources/application.conf @@ -0,0 +1,77 @@ +#include required(classpath("application")) + + +call-caching { + # Allows re-use of existing results for jobs you've already run + # (default: false) + enabled = true + + # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies + # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): + # (default: true) + invalidate-bad-cache-results = true + +} + +docker { + hash-lookup { + enable = true + + # How should docker hashes be looked up. Possible values are "local" and "remote" + # "local": Lookup hashes on the local docker daemon using the cli + # "remote": Lookup hashes on docker hub and gcr + method = "remote" + alibabacloudcr { + num-threads = 5 + auth { + access-id = "test-access-id" + access-key = "test-access-key" + security-token = "test-security-token" + } + } + } +} + +backend { + default = "BCS" + + providers { + BCS { + actor-factory = "cromwell.backend.impl.bcs.BcsBackendLifecycleActorFactory" + config { + root = "oss://my-bucket/cromwell_dir" + region = "cn-shanghai" + access-id = "test-access-id" + access-key = "test-access-key" + security-token = "test-security-token" + + filesystems { + oss { + auth { + endpoint = "oss-cn-shanghai.aliyuncs.com" + access-id = "test-access-id" + access-key = "test-access-key" + security-token = "test-security-token" + } + + caching { + # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs + # Possible values: "copy", "reference". Defaults to "copy" + # "copy": Copy the output files + # "reference": DO NOT copy the output files but point to the original output files instead. + # Will still make sure than all the original output files exist and are accessible before + # going forward with the cache hit. + duplication-strategy = "reference" + } + } + } + + default-runtime-attributes { + failOnStderr: false + continueOnReturnCode: 0 + vpc: "192.168.0.0/16" + } + } + } + } +} diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfigurationSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfigurationSpec.scala index 000a2556c7c..b57fa60344b 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfigurationSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsClusterIdOrConfigurationSpec.scala @@ -12,7 +12,7 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { val clusterIdTable = Table( ("unparsed", "parsed"), - ("cls-xxxx", Some("cls-xxxx")), + ("cls-xxxx", Option("cls-xxxx")), ("job-xxxx", None) ) @@ -24,8 +24,8 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { val resourceTypeTable = Table( ("unparsed", "parsed"), - ("OnDemand", Some("OnDemand")), - ("Spot", Some("Spot")), + ("OnDemand", Option("OnDemand")), + ("Spot", Option("Spot")), ("Other", None) ) @@ -38,8 +38,8 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { val instanceTypeTable = Table( ("unparsed", "parsed"), - ("ecs.s1.large", Some("ecs.s1.large")), - ("bcs.s1.large", Some("bcs.s1.large")) + ("ecs.s1.large", Option("ecs.s1.large")), + ("bcs.s1.large", Option("bcs.s1.large")) ) it should "parse correct instance type" in { @@ -50,8 +50,8 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { val spotStrategyTable = Table( ("unparsed", "parsed"), - ("SpotWithPriceLimit", Some("SpotWithPriceLimit")), - ("SpotAsPriceGo", Some("SpotAsPriceGo")) + ("SpotWithPriceLimit", Option("SpotWithPriceLimit")), + ("SpotAsPriceGo", Option("SpotAsPriceGo")) ) @@ -63,10 +63,10 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { val spotPriceLimitTable = Table( ("unparsed", "parsed"), - ("1.0", Some(1.0.toFloat)), - ("0.1", Some(0.1.toFloat)), - ("0.12", Some(0.12.toFloat)), - ("0.123", Some(0.123.toFloat)) + ("1.0", Option(1.0.toFloat)), + ("0.1", Option(0.1.toFloat)), + ("0.12", Option(0.12.toFloat)), + ("0.123", Option(0.123.toFloat)) ) it should "parse correct spot price limit" in { @@ -79,9 +79,14 @@ class BcsClusterIdOrConfigurationSpec extends BcsTestUtilSpec { ("unparsed", "parsed"), ("cls-id", Left("cls-id")), ("OnDemand ecs.s1.large img-test", Right(AutoClusterConfiguration("OnDemand", "ecs.s1.large", "img-test"))), + ("OnDemand ecs.s1.large img-test cls-test", Right(AutoClusterConfiguration("OnDemand", "ecs.s1.large", "img-test", clusterId = Option("cls-test")))), ("ecs.s1.large img-test", Right(AutoClusterConfiguration("OnDemand", "ecs.s1.large", "img-test"))), - ("Spot ecs.s1.large img-test SpotWithPriceLimit 0.1", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Some("SpotWithPriceLimit"), Some(0.1.toFloat)))), - ("Spot ecs.s1.large img-test SpotAsPriceGo 0.1", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Some("SpotAsPriceGo"), Some(0.1.toFloat)))), + ("ecs.s1.large img-test cls-test", Right(AutoClusterConfiguration("OnDemand", "ecs.s1.large", "img-test", clusterId = Option("cls-test")))), + ("Spot ecs.s1.large img-test SpotWithPriceLimit 0.1", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Option("SpotWithPriceLimit"), Option(0.1.toFloat)))), + ("Spot ecs.s1.large img-test SpotWithPriceLimit 0.1 cls-test", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Option("SpotWithPriceLimit"), Option(0.1.toFloat), Option("cls-test")))), + ("Spot ecs.s1.large img-test SpotAsPriceGo 0.1", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Option("SpotAsPriceGo"), Option(0.1.toFloat)))), + ("Spot ecs.s1.large img-test SpotAsPriceGo 0.1 cls-test", Right(AutoClusterConfiguration("Spot", "ecs.s1.large", "img-test", Option("SpotAsPriceGo"), Option(0.1.toFloat), Option("cls-test")))), + ) diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsConfigurationSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsConfigurationSpec.scala index 7575a6e04fe..ad32eec7cce 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsConfigurationSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsConfigurationSpec.scala @@ -1,9 +1,10 @@ package cromwell.backend.impl.bcs import com.typesafe.config.ConfigValueFactory +import cromwell.backend.impl.bcs.callcaching.UseOriginalCachedOutputs class BcsConfigurationSpec extends BcsTestUtilSpec { - behavior of s"BcsConfiguration" + behavior of "BcsConfiguration" type ValueOrDelete = Either[Boolean, AnyRef] def backendConfiguration = BcsTestUtilSpec.BcsBackendConfigurationDescriptor @@ -25,6 +26,14 @@ class BcsConfigurationSpec extends BcsTestUtilSpec { conf.bcsAccessKey shouldEqual Some(key) } + it should "have correct bcs callcaching strategy" in { + val region = "cn-hangzhou" + val configs = Map("region" -> Right(region)) + val conf = withConfig(configs) + conf.duplicationStrategy shouldEqual UseOriginalCachedOutputs + } + + private def withConfig(configs: Map[String, ValueOrDelete]) = { var descriptor = BcsTestUtilSpec.BcsBackendConfigurationDescriptor.copy() for ((key, value) <- configs) { diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobPathsSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobPathsSpec.scala index 6c1e610e22d..02339302d94 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobPathsSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobPathsSpec.scala @@ -6,7 +6,7 @@ import org.mockito.Mockito.when class BcsJobPathsSpec extends BcsTestUtilSpec { behavior of s"BcsJobPathsSpec" - var root: OssPath = mockPathBuiler.build("oss://bcs-test/root/").getOrElse(throw new IllegalArgumentException()) + var root: OssPath = mockPathBuilder.build("oss://bcs-test/root/").getOrElse(throw new IllegalArgumentException()) var workflowPath = { val workflowPaths = mock[BcsWorkflowPaths] diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobSpec.scala index 6e6490715fc..589b1860856 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsJobSpec.scala @@ -13,7 +13,7 @@ class BcsJobSpec extends BcsTestUtilSpec { val name = "cromwell" val description = name val command = "python main.py" - val packagePath = mockPathBuiler.build("oss://bcs-test/worker.tar.gz").get + val packagePath = mockPathBuilder.build("oss://bcs-test/worker.tar.gz").get val mounts = Seq.empty[BcsMount] val envs = Map.empty[String, String] @@ -45,7 +45,10 @@ class BcsJobSpec extends BcsTestUtilSpec { val dest = "/home/inputs/" val writeSupport = false val runtime = Map("mounts" -> WomString(s"$src $dest $writeSupport")) - taskWithRuntime(runtime).getInputMapping.get(src) shouldEqual dest + taskWithRuntime(runtime).getMounts().getEntries should have size(1) + taskWithRuntime(runtime).getMounts().getEntries.get(0).getSource shouldBe src + taskWithRuntime(runtime).getMounts().getEntries.get(0).getDestination shouldBe dest + taskWithRuntime(runtime).getMounts().getEntries.get(0).isWriteSupport shouldBe writeSupport } it should "have correct cluster id" in { @@ -57,9 +60,9 @@ class BcsJobSpec extends BcsTestUtilSpec { it should "have correct docker option" in { val dockerImage = "ubuntu/latest" val dockerPath = "oss://bcs-reg/ubuntu/"toLowerCase() - val runtime = Map("docker" -> WomString(s"$dockerImage $dockerPath")) - taskWithRuntime(runtime).getParameters.getCommand.getEnvVars.get(BcsJob.BcsDockerImageEnvKey) shouldEqual dockerImage - taskWithRuntime(runtime).getParameters.getCommand.getEnvVars.get(BcsJob.BcsDockerPathEnvKey) shouldEqual dockerPath + val runtime = Map("dockerTag" -> WomString(s"$dockerImage $dockerPath")) + taskWithRuntime(runtime).getParameters.getCommand.getEnvVars.get(BcsJob.BcsDockerImageEnvKey) shouldEqual null + taskWithRuntime(runtime).getParameters.getCommand.getEnvVars.get(BcsJob.BcsDockerPathEnvKey) shouldEqual null } it should "have correct auto cluster configuration" in { diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsMountSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsMountSpec.scala index 8ead4fe1d0a..c27d6488965 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsMountSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsMountSpec.scala @@ -14,8 +14,8 @@ class BcsMountSpec extends BcsTestUtilSpec { entry shouldBe a [BcsInputMount] - entry.src.pathAsString shouldEqual ossObject - entry.dest.pathAsString shouldEqual localFile.stripSuffix("/") + BcsMount.toString(entry.src) shouldEqual ossObject + BcsMount.toString(entry.dest) shouldEqual localFile entry.writeSupport shouldEqual writeSupport writeSupport = false @@ -23,8 +23,8 @@ class BcsMountSpec extends BcsTestUtilSpec { entryString = s"$ossObject $localFile $writeSupport" entry = BcsMount.parse(entryString).success.value entry shouldBe a [BcsInputMount] - entry.src.pathAsString shouldEqual ossObject - entry.dest.pathAsString shouldEqual localFile.stripSuffix("/") + BcsMount.toString(entry.src) shouldEqual ossObject + BcsMount.toString(entry.dest) shouldEqual localFile entry.writeSupport shouldEqual writeSupport } @@ -35,8 +35,8 @@ class BcsMountSpec extends BcsTestUtilSpec { entry shouldBe a [BcsOutputMount] - entry.src.pathAsString shouldEqual localFile.stripSuffix("/") - entry.dest.pathAsString shouldEqual ossObject + BcsMount.toString(entry.src) shouldEqual localFile + BcsMount.toString(entry.dest) shouldEqual ossObject entry.writeSupport shouldEqual writeSupport writeSupport = false @@ -44,8 +44,8 @@ class BcsMountSpec extends BcsTestUtilSpec { entryString = s"$localFile $ossObject $writeSupport" entry = BcsMount.parse(entryString).success.value entry shouldBe a [BcsOutputMount] - entry.src.pathAsString shouldEqual localFile.stripSuffix("/") - entry.dest.pathAsString shouldEqual ossObject + BcsMount.toString(entry.src) shouldEqual localFile + BcsMount.toString(entry.dest) shouldEqual ossObject entry.writeSupport shouldEqual writeSupport } diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributesSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributesSpec.scala index 770cfa59175..399c9c854ca 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributesSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsRuntimeAttributesSpec.scala @@ -1,10 +1,9 @@ package cromwell.backend.impl.bcs -import cromwell.core.path.DefaultPathBuilder import wom.values._ class BcsRuntimeAttributesSpec extends BcsTestUtilSpec { - behavior of s"BcsRuntimeAttributes" + behavior of "BcsRuntimeAttributes" it should "build correct default runtime attributes from config string" in { val runtime = Map.empty[String, WomValue] @@ -12,23 +11,29 @@ class BcsRuntimeAttributesSpec extends BcsTestUtilSpec { defaults shouldEqual expectedRuntimeAttributes } - it should "parse docker without docker path" in { - val runtime = Map("docker" -> WomString("ubuntu/latest")) - val expected = expectedRuntimeAttributes.copy(docker = Some(BcsDockerWithoutPath("ubuntu/latest"))) + it should "parse dockerTag without docker path" in { + val runtime = Map("dockerTag" -> WomString("ubuntu/latest")) + val expected = expectedRuntimeAttributes.copy(dockerTag = Some(BcsDockerWithoutPath("ubuntu/latest"))) createBcsRuntimeAttributes(runtime) shouldEqual(expected) } - it should "parse docker with path" in { - val runtime = Map("docker" -> WomString("centos/latest oss://bcs-dir/registry/")) - val expected = expectedRuntimeAttributes.copy(docker = Some(BcsDockerWithPath("centos/latest", "oss://bcs-dir/registry/"))) + it should "parse dockerTag with path" in { + val runtime = Map("dockerTag" -> WomString("centos/latest oss://bcs-dir/registry/")) + val expected = expectedRuntimeAttributes.copy(dockerTag = Some(BcsDockerWithPath("centos/latest", "oss://bcs-dir/registry/"))) createBcsRuntimeAttributes(runtime) shouldEqual(expected) } - it should "parse docker fail if an empty string value" in { - val runtime = Map("docker" -> WomString("")) + it should "parse dockerTag fail if an empty string value" in { + val runtime = Map("dockerTag" -> WomString("")) an [Exception] should be thrownBy createBcsRuntimeAttributes(runtime) } + it should "parse docker" in { + val runtime = Map("docker" -> WomString("registry.cn-beijing.aliyuncs.com/test/testubuntu:0.2")) + val expected = expectedRuntimeAttributes.copy(docker = Some(BcsDockerWithoutPath("registry.cn-beijing.aliyuncs.com/test/testubuntu:0.2"))) + createBcsRuntimeAttributes(runtime) shouldEqual(expected) + } + it should "parse correct user data" in { val runtime = Map("userData" -> WomString("key value1")) val expected = expectedRuntimeAttributes.copy(userData = Some(Vector(BcsUserData("key", "value1")))) @@ -42,13 +47,13 @@ class BcsRuntimeAttributesSpec extends BcsTestUtilSpec { it should "parse correct input mount" in { val runtime = Map("mounts" -> WomString("oss://bcs-dir/bcs-file /home/inputs/input_file false")) - val expected = expectedRuntimeAttributes.copy(mounts = Some(Vector(BcsInputMount(mockPathBuiler.build("oss://bcs-dir/bcs-file").get, DefaultPathBuilder.build("/home/inputs/input_file").get, false)))) + val expected = expectedRuntimeAttributes.copy(mounts = Some(Vector(BcsInputMount(Left(mockPathBuilder.build("oss://bcs-dir/bcs-file").get), Right("/home/inputs/input_file"), false)))) createBcsRuntimeAttributes(runtime) shouldEqual expected } it should "parse correct out mount" in { val runtime = Map("mounts" -> WomString("/home/outputs/ oss://bcs-dir/outputs/ true")) - val expected = expectedRuntimeAttributes.copy(mounts = Some(Vector(BcsOutputMount(DefaultPathBuilder.build("/home/outputs/").get, mockPathBuiler.build("oss://bcs-dir/outputs/").get, true)))) + val expected = expectedRuntimeAttributes.copy(mounts = Some(Vector(BcsOutputMount(Right("/home/outputs/"), Left(mockPathBuilder.build("oss://bcs-dir/outputs/").get), true)))) createBcsRuntimeAttributes(runtime) shouldEqual expected } diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsTestUtilSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsTestUtilSpec.scala index 57300cd69fa..7116d7826a4 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsTestUtilSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsTestUtilSpec.scala @@ -3,12 +3,11 @@ package cromwell.backend.impl.bcs import com.typesafe.config.ConfigFactory import common.collections.EnhancedCollections._ import cromwell.backend.{BackendConfigurationDescriptor, BackendJobDescriptorKey, RuntimeAttributeDefinition} -import cromwell.backend.BackendSpec.{buildWdlWorkflowDescriptor} +import cromwell.backend.BackendSpec.buildWdlWorkflowDescriptor import cromwell.backend.validation.ContinueOnReturnCodeSet -import cromwell.core.path.DefaultPathBuilder import cromwell.core.{TestKitSuite, WorkflowOptions} import cromwell.filesystems.oss.OssPathBuilder -import cromwell.filesystems.oss.nio.OssStorageConfiguration +import cromwell.filesystems.oss.nio.DefaultOssStorageConfiguration import cromwell.util.SampleWdl import org.scalatest.{BeforeAndAfter, FlatSpecLike, Matchers} import org.scalatest.mockito.MockitoSugar @@ -25,12 +24,12 @@ object BcsTestUtilSpec { | continueOnReturnCode: 0 | cluster: "cls-mycluster" | mounts: "oss://bcs-bucket/bcs-dir/ /home/inputs/ false" - | docker: "ubuntu/latest oss://bcs-reg/ubuntu/" + | dockerTag: "ubuntu/latest oss://bcs-reg/ubuntu/" + | docker: "registry.cn-beijing.aliyuncs.com/test/testubuntu:0.1" | userData: "key value" | reserveOnFail: true | autoReleaseJob: true | verbose: false - | workerPath: "oss://bcs-bucket/workflow/worker.tar.gz" | systemDisk: "cloud 50" | dataDisk: "cloud 250 /home/data/" | timeout: 3000 @@ -57,6 +56,9 @@ object BcsTestUtilSpec { | access-key = "" | security-token = "" | } + | caching { + | duplication-strategy = "reference" + | } | } |} | @@ -114,13 +116,13 @@ object BcsTestUtilSpec { trait BcsTestUtilSpec extends TestKitSuite with FlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter { before { - BcsMount.pathBuilders = List(mockPathBuiler) + BcsMount.pathBuilders = List(mockPathBuilder) } val jobId = "test-bcs-job" - val mockOssConf = OssStorageConfiguration("oss.aliyuncs.com", "test-id", "test-key") - val mockPathBuiler = OssPathBuilder(mockOssConf) - val mockPathBuilders = List(mockPathBuiler) + val mockOssConf = DefaultOssStorageConfiguration("oss.aliyuncs.com", "test-id", "test-key") + val mockPathBuilder = OssPathBuilder(mockOssConf) + val mockPathBuilders = List(mockPathBuilder) lazy val workflowDescriptor = buildWdlWorkflowDescriptor( SampleWdl.HelloWorld.workflowSource(), inputFileAsJson = Option(JsObject(SampleWdl.HelloWorld.rawInputs.safeMapValues(JsString.apply)).compactPrint) @@ -132,25 +134,25 @@ trait BcsTestUtilSpec extends TestKitSuite with FlatSpecLike with Matchers with val expectedContinueOnReturn = ContinueOnReturnCodeSet(Set(0)) - val expectedDocker = Some(BcsDockerWithPath("ubuntu/latest", "oss://bcs-reg/ubuntu/")) + val expectedDockerTag = Some(BcsDockerWithPath("ubuntu/latest", "oss://bcs-reg/ubuntu/")) + val expectedDocker = Some(BcsDockerWithoutPath("registry.cn-beijing.aliyuncs.com/test/testubuntu:0.1")) val expectedFailOnStderr = false val expectedUserData = Some(Vector(new BcsUserData("key", "value"))) - val expectedMounts = Some(Vector(new BcsInputMount(mockPathBuiler.build("oss://bcs-bucket/bcs-dir/").get, DefaultPathBuilder.build("/home/inputs/").get, false))) + val expectedMounts = Some(Vector(new BcsInputMount(Left(mockPathBuilder.build("oss://bcs-bucket/bcs-dir/").get), Right("/home/inputs/"), false))) val expectedCluster = Some(Left("cls-mycluster")) val expectedSystemDisk = Some(BcsSystemDisk("cloud", 50)) val expectedDataDsik = Some(BcsDataDisk("cloud", 250, "/home/data/")) val expectedReserveOnFail = Some(true) val expectedAutoRelease = Some(true) - val expectedWorkerPath = Some("oss://bcs-bucket/workflow/worker.tar.gz") val expectedTimeout = Some(3000) val expectedVerbose = Some(false) val expectedVpc = Some(BcsVpcConfiguration(Some("192.168.0.0/16"), Some("vpc-xxxx"))) val expectedTag = Some("jobTag") - val expectedRuntimeAttributes = new BcsRuntimeAttributes(expectedContinueOnReturn, expectedDocker, expectedFailOnStderr, expectedMounts, expectedUserData, expectedCluster, - expectedSystemDisk, expectedDataDsik, expectedReserveOnFail, expectedAutoRelease, expectedWorkerPath, expectedTimeout, expectedVerbose, expectedVpc, expectedTag) + val expectedRuntimeAttributes = new BcsRuntimeAttributes(expectedContinueOnReturn, expectedDockerTag, expectedDocker, expectedFailOnStderr, expectedMounts, expectedUserData, expectedCluster, + expectedSystemDisk, expectedDataDsik, expectedReserveOnFail, expectedAutoRelease, expectedTimeout, expectedVerbose, expectedVpc, expectedTag) protected def createBcsRuntimeAttributes(runtimeAttributes: Map[String, WomValue]): BcsRuntimeAttributes = { diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsWorkflowPathsSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsWorkflowPathsSpec.scala index a3909cb554a..4222055ab1a 100644 --- a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsWorkflowPathsSpec.scala +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/BcsWorkflowPathsSpec.scala @@ -12,10 +12,10 @@ class BcsWorkflowPathsSpec extends BcsTestUtilSpec { val workflowInput = paths.getWorkflowInputMounts workflowInput shouldBe a[BcsInputMount] - workflowInput.src shouldEqual(paths.workflowRoot) - workflowInput.dest.pathAsString.startsWith(BcsJobPaths.BcsTempInputDirectory.pathAsString) shouldBe true + workflowInput.src shouldEqual(Left(paths.workflowRoot)) + BcsMount.toString(workflowInput.dest).startsWith(BcsJobPaths.BcsTempInputDirectory.pathAsString) shouldBe true // DefaultPathBuilder always remove ending '/' from directory path. - workflowInput.dest.pathAsString.endsWith(paths.workflowRoot.pathWithoutScheme.stripSuffix("/")) shouldBe true + BcsMount.toString(workflowInput.dest).endsWith(paths.workflowRoot.pathWithoutScheme.stripSuffix("/")) shouldBe true } it should "have correct job paths" in { diff --git a/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActorSpec.scala b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActorSpec.scala new file mode 100644 index 00000000000..3ee71686602 --- /dev/null +++ b/supportedBackends/bcs/src/test/scala/cromwell/backend/impl/bcs/callcaching/BcsBackendCacheHitCopyingActorSpec.scala @@ -0,0 +1,88 @@ +package cromwell.backend.impl.bcs.callcaching + + +import akka.actor.Props +import akka.testkit.TestActorRef +import com.typesafe.config.ConfigValueFactory +import cromwell.backend.impl.bcs.{BcsBackendInitializationData, BcsConfiguration, BcsRuntimeAttributes, BcsTestUtilSpec, BcsWorkflowPaths} +import cromwell.backend.standard.callcaching.StandardCacheHitCopyingActorParams +import cromwell.core.path.{Path} +import wom.values._ +import cromwell.backend.impl.bcs.BcsTestUtilSpec.BcsBackendConfig +import cromwell.backend.standard.callcaching.DefaultStandardCacheHitCopyingActorParams +import cromwell.core.simpleton.WomValueSimpleton +import cromwell.filesystems.oss.OssPath +import org.mockito.Mockito.when + +import scala.util.Try + + +class BcsBackendCacheHitCopyingActorSpec extends BcsTestUtilSpec { + behavior of "BcsBackendCacheHitCopyingActor" + type ValueOrDelete = Either[Boolean, AnyRef] + + val workflowPaths = BcsWorkflowPaths(workflowDescriptor, BcsBackendConfig, mockPathBuilders) + + + private def buildInitializationData(configuration: BcsConfiguration) = { + + val runtimeAttributesBuilder = BcsRuntimeAttributes.runtimeAttributesBuilder(BcsTestUtilSpec.BcsBackendConfigurationDescriptor.backendRuntimeAttributesConfig) + BcsBackendInitializationData(workflowPaths, runtimeAttributesBuilder, configuration, null) + } + + val runtimeAttributesBuilder = BcsRuntimeAttributes.runtimeAttributesBuilder(BcsTestUtilSpec.BcsBackendConfigurationDescriptor.backendRuntimeAttributesConfig) + + + private def withConfig(configs: Map[String, ValueOrDelete]) = { + var descriptor = BcsTestUtilSpec.BcsBackendConfigurationDescriptor.copy() + for ((key, value) <- configs) { + value match { + case Left(_) => descriptor = BcsTestUtilSpec.BcsBackendConfigurationDescriptor.copy(backendConfig = descriptor.backendConfig.withoutPath(key)) + case Right(v) => descriptor = BcsTestUtilSpec.BcsBackendConfigurationDescriptor.copy(backendConfig = descriptor.backendConfig.withValue(key, ConfigValueFactory.fromAnyRef(v))) + } + } + new BcsConfiguration(descriptor) + } + + + var cacheHitCopyingActorParams = { + val mockCacheHitCopyingActorParams = mock[DefaultStandardCacheHitCopyingActorParams] + val id = "test-access-id" + val key = "test-access-key" + val configs = Map("access-id" -> Right(id), "access-key" -> Right(key)) + val conf = withConfig(configs) + when(mockCacheHitCopyingActorParams.backendInitializationDataOption).thenReturn(Option(buildInitializationData(conf))) + mockCacheHitCopyingActorParams + } + + class TestableBcsCacheHitCopyingActor(params: StandardCacheHitCopyingActorParams) + extends BcsBackendCacheHitCopyingActor(params) { + + val id = "test-access-id" + val key = "test-access-key" + val configs = Map("access-id" -> Right(id), "access-key" -> Right(key)) + val conf = withConfig(configs) + + + def this() = { + this(cacheHitCopyingActorParams) + } + + override def getPath(str: String): Try[Path] = mockPathBuilder.build("oss://bcs-dir/outputs/") + override def destinationJobDetritusPaths: Map[String, Path] = Map("stdout" + -> mockPathBuilder.build("oss://my-bucket/cromwell_dir/wf_echo/14e5dcd2-0c94-4035-aa7b-b90d7008202c/call-echo/stdout.log").get) + } + + it should "process simpleton and detritus correctly" in { + val simpleton = WomValueSimpleton("txt_files", WomSingleFile("oss://my-bucket/cromwell_dir/wf_echo/14e5dcd2-0c94-4035-aa7b-b90d7008202c/call-echo/abc.log")) + val detritus = Map("stdout" -> "oss://my-bucket/cromwell_dir/wf_echo/14e5dcd2-0c94-4035-aa7b-b90d7008202c/call-echo/stdout.log") + val sourceCallRootPath: OssPath = mockPathBuilder.build("oss://bcs-test/root/abc.log").getOrElse(throw new IllegalArgumentException()) + + val props = Props(new TestableBcsCacheHitCopyingActor()) + val cacheHitActor = TestActorRef[TestableBcsCacheHitCopyingActor]( + props, "TestableBcsCacheHitCopyingActor") + + noException should be thrownBy cacheHitActor.underlyingActor.processSimpletons(List(simpleton), sourceCallRootPath) + noException should be thrownBy cacheHitActor.underlyingActor.processDetritus(detritus) + } +}